Python多线程/队列生产者和消费者竞争问题

2024-06-25 22:57:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下python代码,每个线程都在执行以下角色。在

Producer-生成json文本
处理器-解析json并生成json对象列表
持久化并将消费者的数据保存到列表中

生产者发送json,消费者得到json列表,问题是消息计数在生产者和消费者之间不匹配。一些信息在中间消失了。不过,我看到消息计数在生产者和解析器之间是同步的。在

解决方法:我必须在将对象发布到队列/列表后创建新对象。在我之前的代码中,我只是通过清除list和dict对象来重用它们(可能是我想得太多了:)。这导致对象数据在队列中被清除,因为它还没有被下游线程使用。。。在

下面是工作代码

import threading
import sys, time
import logging
import random
import Queue, json
from datetime import datetime

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s', )

MAX_SIZE = -1
producerq = Queue.Queue(maxsize=MAX_SIZE)
consumerq = Queue.Queue(maxsize=MAX_SIZE)


class ProduceJson(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None, producerq=producerq):
        super(ProduceJson, self).__init__()
        self.target = target
        self.name = name
        self.producerq = producerq
        logging.debug(self.name +" Started")
        return

    def run(self):
        cnt = 1
        try:
            while True:
                #time.sleep(2)
                if not self.producerq.full():
                    timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S6Z")
                    item = json.dumps({
                        'seq': cnt,
                        'timestamp': timestamp
                    })
                    logging.debug(self.name + " Posting " + str(item))
                    self.producerq.put(item)
                    cnt = cnt + 1
                    #logging.info(self.name + ' current count : ' + str(cnt))

        except Exception as e:
            logging.error("ERROR " + self.name + e.message)

        return


class ParseJson(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None, producerq=producerq, consumerq=consumerq):

        super(ParseJson, self).__init__()

        self.target = target
        self.name = name
        self.producerq = producerq
        self.consumerq = consumerq
        logging.debug(self.name +" Started")
        return

    def run(self):
        cnt = 0
        itemlist = []
        try:
            while True:
                #time.sleep(2)
                if not self.producerq.empty():
                    item = json.loads(self.producerq.get())
                    #logging.debug(self.name+" Received "+ str(item))
                    pi = dict()
                    pi['pseq'] = item['seq']
                    pi['ptime'] = item['timestamp']
                    itemlist.append(pi)
                    #
                    if len(itemlist) == 5: # Load lits of 10 elements to queue
                        self.consumerq.put(itemlist)
                        logging.debug(self.name + " Posting " + str(itemlist) )
                        itemlist = []
                    cnt = cnt + 1
                    #logging.debug(self.name + '  current count :  ' + str(cnt))
        except Exception as e:
            logging.error("ERROR " + self.name + e.message)
        return


class ConsumeJsonList(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None, consumerq=consumerq):
        super(ConsumeJsonList, self).__init__()
        self.target = target
        self.name = name
        self.consumerq = consumerq
        logging.debug(self.name +" Started")
        return

    def run(self):
        cnt = 0
        cl = []
        try:
            while True:
                #time.sleep(2)
                if not self.consumerq.empty():
                    cl = self.consumerq.get()
                    cnt = cnt + len(cl)
                    logging.info(" Consumed items  "+  str(cl) )
                    cl = []
                    #else:
                    #    logging.debug(self.name + '****Is EMPTY**')
                    #
                    #logging.info(self.name + ' current count : ' + str(cnt))
        except Exception as e:
            logging.error("ERROR " + self.name + e.message)
        return


if __name__ == '__main__':
    producer1 = ProduceJson(name='PRODUCER1', producerq=producerq)
    #producer2 = ProduceJson(name='PRODUCER1', producerq=producerq)
    parser1 = ParseJson(name='parser1', producerq=producerq, consumerq=consumerq)
    parser2 = ParseJson(name='parser2', producerq=producerq, consumerq=consumerq)
    #parser3 = ParseJson(name='parser2', producerq=producerq, consumerq=consumerq)
    consumer1 = ConsumeJsonList(name='CONSUMER1', consumerq=consumerq)
    #consumer2 = ConsumeJsonList(name='consumer2', consumerq=consumerq)
    #consumer3 = ConsumeJsonList(name='consumer3', consumerq=consumerq)
    ######
    producer1.start()
    parser1.start()
    parser2.start()
    consumer1.start()
    #consumer2.start()
    #consumer3.start()

Tags: namedebugimportselfnonejsontargetinit