<p>您真正想要的是某种方式将异常传递给父进程,对吧?然后你可以随心所欲地处理它们。</p>
<p>如果使用<a href="http://docs.python.org/dev/library/concurrent.futures.html">^{<cd1>}</a>,这是自动的。如果你使用<a href="http://docs.python.org/dev/library/multiprocessing.html#using-a-pool-of-workers">^{<cd2>}</a>,这很简单。如果你使用显式的<code>Process</code>和<code>Queue</code>,你需要做一点工作,但不是太多。</p>
<p>例如:</p>
<pre><code>def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
except Exception as e:
self.outputQueue.put(e)
</code></pre>
<p>然后,您的调用代码就可以像其他代码一样从队列中读取<code>Exception</code>。而不是这个:</p>
<pre><code>yield outq.pop()
</code></pre>
<p>执行以下操作:</p>
<pre><code>result = outq.pop()
if isinstance(result, Exception):
raise result
yield result
</code></pre>
<p>(我不知道实际的父进程队列读取代码是做什么的,因为最小的示例忽略了队列。但希望这能解释这个想法,即使你真正的代码并不是这样工作的。)</p>
<p>这假设您要中止任何未处理的异常,使其成为<code>run</code>。如果要传回异常并继续下一个<code>i in iter</code>,只需将<code>try</code>移到<code>for</code>中,而不是围绕它。</p>
<p>这还假设<code>Exception</code>s不是有效值。如果这是一个问题,最简单的解决方案是只推<code>(result, exception)</code>元组:</p>
<pre><code>def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put((result, None))
except Exception as e:
self.outputQueue.put((None, e))
</code></pre>
<p>然后,弹出代码会执行以下操作:</p>
<pre><code>result, exception = outq.pop()
if exception:
raise exception
yield result
</code></pre>
<p>您可能会注意到这类似于node.js回调样式,在这种样式中,您将<code>(err, result)</code>传递给每个回调。是的,这很烦人,而且你会把这种风格的代码弄乱。但实际上除了包装器之外,您并没有在任何地方使用它;所有从队列中获取值或在<code>run</code>内部调用的“应用程序级”代码只看到正常的返回/产生和引发的异常。</p>
<p>您甚至可以考虑根据<code>concurrent.futures</code>的规范构建<code>Future</code>(或按原样使用该类),即使您正在执行作业队列并手动执行。这并不难,而且它提供了一个非常好的API,特别是用于调试。</p>
<p>最后,值得注意的是,使用executor/pool设计可以使大多数围绕worker s和queue构建的代码变得简单得多,即使您绝对确定每个队列只需要一个worker。只需废弃所有的样板文件,并将<code>Worker.run</code>方法中的循环转换为一个函数(它只是<code>return</code>s或<code>raise</code>s,而不是附加到队列中)。在调用端,再次废弃所有样板文件,只使用job函数及其参数<code>submit</code>或<code>map</code>。</p>
<p>你的整个例子可以简化为:</p>
<pre><code>def job(i):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
return result
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
results = executor.map(job, range(10))
</code></pre>
<p>它会自动正确处理异常。</p>
<hr/>
<p>正如您在注释中提到的,异常的回溯不会追溯到子进程;它只会追溯到手动调用(或者,如果您使用的是池或执行器,则是池或执行器的勇气)。</p>
<p>原因是<code>multiprocessing.Queue</code>构建在<code>pickle</code>之上,而pickling异常不会pickle它们的回溯。这是因为你不能腌制回溯。其原因是,回溯充满了对本地执行上下文的引用,因此让它们在另一个进程中工作将非常困难。</p>
<p>所以…你能怎么办?不要去寻找一个全面的解决方案。相反,想想你真正需要什么。90%的情况下,您需要的是“记录异常,使用回溯,然后继续”或“使用回溯,将异常打印到<code>stderr</code>和<code>exit(1)</code>,就像默认的未处理异常处理程序一样”。对于这两种情况,您根本不需要传递异常;只需在子端格式化它并传递一个字符串。如果你确实需要一些更新奇的东西,找出你所需要的,并传递足够的信息来手动组合起来。如果不知道如何格式化回溯和异常,请参见<a href="http://docs.python.org/2/library/traceback.html">^{<cd26>}</a>模块。很简单。这意味着你根本不需要进入腌菜机。(这并不是说用<code>copyreg</code>pickler或用<code>__reduce__</code>方法或其他方法编写holder类是非常困难的,但是如果不需要,为什么要学习这些呢?)</p>