我有以下python代码,每个线程都在执行以下角色。在
Producer-生成json文本
处理器-解析json并生成json对象列表
持久化并将消费者的数据保存到列表中
生产者发送json,消费者得到json列表,问题是消息计数在生产者和消费者之间不匹配。一些信息在中间消失了。不过,我看到消息计数在生产者和解析器之间是同步的。在
解决方法:我必须在将对象发布到队列/列表后创建新对象。在我之前的代码中,我只是通过清除list和dict对象来重用它们(可能是我想得太多了:)。这导致对象数据在队列中被清除,因为它还没有被下游线程使用。。。在
下面是工作代码
import threading
import sys, time
import logging
import random
import Queue, json
from datetime import datetime
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-9s) %(message)s', )
MAX_SIZE = -1
producerq = Queue.Queue(maxsize=MAX_SIZE)
consumerq = Queue.Queue(maxsize=MAX_SIZE)
class ProduceJson(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None, producerq=producerq):
super(ProduceJson, self).__init__()
self.target = target
self.name = name
self.producerq = producerq
logging.debug(self.name +" Started")
return
def run(self):
cnt = 1
try:
while True:
#time.sleep(2)
if not self.producerq.full():
timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S6Z")
item = json.dumps({
'seq': cnt,
'timestamp': timestamp
})
logging.debug(self.name + " Posting " + str(item))
self.producerq.put(item)
cnt = cnt + 1
#logging.info(self.name + ' current count : ' + str(cnt))
except Exception as e:
logging.error("ERROR " + self.name + e.message)
return
class ParseJson(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None, producerq=producerq, consumerq=consumerq):
super(ParseJson, self).__init__()
self.target = target
self.name = name
self.producerq = producerq
self.consumerq = consumerq
logging.debug(self.name +" Started")
return
def run(self):
cnt = 0
itemlist = []
try:
while True:
#time.sleep(2)
if not self.producerq.empty():
item = json.loads(self.producerq.get())
#logging.debug(self.name+" Received "+ str(item))
pi = dict()
pi['pseq'] = item['seq']
pi['ptime'] = item['timestamp']
itemlist.append(pi)
#
if len(itemlist) == 5: # Load lits of 10 elements to queue
self.consumerq.put(itemlist)
logging.debug(self.name + " Posting " + str(itemlist) )
itemlist = []
cnt = cnt + 1
#logging.debug(self.name + ' current count : ' + str(cnt))
except Exception as e:
logging.error("ERROR " + self.name + e.message)
return
class ConsumeJsonList(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None, consumerq=consumerq):
super(ConsumeJsonList, self).__init__()
self.target = target
self.name = name
self.consumerq = consumerq
logging.debug(self.name +" Started")
return
def run(self):
cnt = 0
cl = []
try:
while True:
#time.sleep(2)
if not self.consumerq.empty():
cl = self.consumerq.get()
cnt = cnt + len(cl)
logging.info(" Consumed items "+ str(cl) )
cl = []
#else:
# logging.debug(self.name + '****Is EMPTY**')
#
#logging.info(self.name + ' current count : ' + str(cnt))
except Exception as e:
logging.error("ERROR " + self.name + e.message)
return
if __name__ == '__main__':
producer1 = ProduceJson(name='PRODUCER1', producerq=producerq)
#producer2 = ProduceJson(name='PRODUCER1', producerq=producerq)
parser1 = ParseJson(name='parser1', producerq=producerq, consumerq=consumerq)
parser2 = ParseJson(name='parser2', producerq=producerq, consumerq=consumerq)
#parser3 = ParseJson(name='parser2', producerq=producerq, consumerq=consumerq)
consumer1 = ConsumeJsonList(name='CONSUMER1', consumerq=consumerq)
#consumer2 = ConsumeJsonList(name='consumer2', consumerq=consumerq)
#consumer3 = ConsumeJsonList(name='consumer3', consumerq=consumerq)
######
producer1.start()
parser1.start()
parser2.start()
consumer1.start()
#consumer2.start()
#consumer3.start()
目前没有回答
相关问题 更多 >
编程相关推荐