使用Python的多进程检查空队列

2024-05-18 12:34:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个程序使用python的包进行多处理和队列。我的一个功能具有以下结构:

from multiprocessing import Process, Queue
def foo(queue):
   while True:
       try:
           a = queue.get(block = False)
           doAndPrintStuff(a)
       except:
           print "the end"
           break

   if __name__ == "__main__"
     nthreads = 4
     queue = Queue.Queue()
     # put stuff in the queue here 
     for stuff in moreStuff:
         queue.put(stuff)
     procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
     for p in procs:
       p.start()
     for p in procs:
       p.join()

其思想是,当我尝试从队列中提取时,如果队列为空,它将引发异常并终止循环。所以我有两个问题:

1)这是一个安全的成语吗?有更好的办法吗?

2)我试图找到当我试图从空队列中.get()时引发的异常。目前,我的程序正在捕获所有异常,当错误出现在其他地方时,这很糟糕,我只收到一条“结束”消息。

我试过:

  import Queue
  queue = Queue.Queue()
  [queue.put(x) for x in xrange(10)]
  try: 
       print queue.get(block = False)
  except Queue.Empty:
       print "end"
       break

但我得到了错误,好像我没有抓到异常。要捕获的正确异常是什么?


Tags: inimport程序forgetfoo队列queue
3条回答

尝试阅读queue库文档。你不是在找Queue.empty()吗?

在刷新放置缓冲区之前,队列似乎是空的,这可能需要一段时间。

我们的问题的解决方案是to usesentinels,或者可能是内置的task_done()调用:

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

异常应该是Queue.Empty。但你确定你犯了同样的错误吗?在第二个示例中,还将队列本身从multiprocessing.Queue切换到Queue.Queue,我认为这可能是问题所在。

这可能看起来很奇怪,但是您必须使用multiprocessing.Queue类,但必须使用Queue.Empty异常(您必须从Queue模块导入该异常)

相关问题 更多 >

    热门问题