Python中的多处理队列

2024-05-18 12:04:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在Python的多处理库中使用队列。在执行下面的代码之后(print语句可以工作),但是在我对队列调用join之后,进程不会退出,并且仍然存在。如何终止其余进程?

谢谢!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()

Tags: truemessagefor队列进程defitemmultiprocessing
3条回答

试试这个:

import multiprocessing

num_procs = 4
def do_work(message):
  print "work",message ,"completed"

def worker():
  for item in iter( q.get, None ):
    do_work(item)
    q.task_done()
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  procs.append( multiprocessing.Process(target=worker) )
  procs[-1].daemon = True
  procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

q.join()

for p in procs:
  q.put( None )

q.join()

for p in procs:
  p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()

在加入进程之前必须清除队列,但是q.empty()不可靠。

清除队列的最好方法是计算成功获取或循环的次数,直到收到一个sentinel值,就像一个具有可靠网络的套接字一样。

你的工作人员需要一个哨兵来终止,否则他们只能坐在块读上。请注意,使用Q上的sleep而不是P上的join可以显示状态信息等。
我首选的模板是:

def worker(q,nameStr):
  print 'Worker %s started' %nameStr
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful
     q.task_done()
  print 'Worker %s Finished' % nameStr
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  nameStr = 'Worker_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  p.daemon = True
  p.start()
  procs.append(p)

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

for i in range(num_procs):
  q.put(None) # send termination sentinel, one for each process

while not q.empty(): # wait for processing to finish
  sleep(1)   # manage timeouts and status updates etc.

相关问题 更多 >