我有一段多线程代码-3线程,用于轮询来自SQS的数据并将其添加到python队列中。5个线程从python队列获取消息,处理它们并将其发送到后端系统。在
代码如下:
python_queue = Queue.Queue()
class GetDataFromSQS(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
def run(self):
while True:
time.sleep(0.5) //sleep for a few secs before querying again
try:
msgs = sqs_queue.get_messages(10)
if msgs == None:
print "sqs is empty now"!
for msg in msgs:
#place each message block from sqs into python queue for processing
self.python_queue.put(msg)
print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
#delete from sqs
sqs_queue.delete_message(msg)
except Exception as e:
print "Exception in GetDataFromSQS :: " + e
class ProcessSQSMsgs(threading.Thread):
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
self.pool_manager = PoolManager(num_pools=6)
def run(self):
while True:
#grabs the message to be parsed from sqs queue
python_queue_msg = self.python_queue.get()
try:
processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
except Exception as e:
print "Error parsing:: " + e
finally:
self.python_queue.task_done()
def processMsgAndSendToBackend(msg, pool_manager):
if msg != "":
###### All the code related to processing the msg
for individualValue in processedMsg:
try:
response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
if response == None:
print "Error"
else:
response.release_conn()
except Exception as e:
print "Exception! Post data to backend: " + e
def startMyPython():
#spawn a pool of threads, and pass them queue instance
for i in range(3):
sqsThread = GetDataFromSQS(python_queue)
sqsThread.start()
for j in range(5):
parseThread = ProcessSQSMsgs(python_queue)
#parseThread.setDaemon(True)
parseThread.start()
#wait on the queue until everything has been processed
python_queue.join()
# python_queue.close() -- should i do this?
startMyPython()
问题: 每隔几天就有3个python工作人员随机死亡(使用top-p-H进行监视),如果我终止进程并重新启动脚本,一切都会好起来的。我怀疑消失的工人是3个GetDataFromSQS线程。。而且由于GetDataFromSQS死了,其他5个工作线程虽然在运行,但始终处于睡眠状态,因为python队列中没有数据。我不确定我在这里做错了什么,因为我对python相当陌生,并遵循了本教程创建这个队列逻辑和线程-http://www.ibm.com/developerworks/aix/library/au-threadingpython/
提前谢谢你的帮助。希望我把我的问题解释清楚了。在
线程挂起的问题与获取sqs队列的句柄有关。我使用IAM管理凭证,使用boto sdk连接到sqs。在
这个问题的根本原因是boto包正在从AWS读取auth的元数据,并且偶尔会失败。在
修复方法是编辑boto配置,增加对AWS执行auth调用的尝试次数。在
[波图] metadata_service_num_尝试次数=5
(https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E)
相关问题 更多 >
编程相关推荐