Python多处理进程静默崩溃

2024-10-17 06:32:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是Python2.7.3。我已经使用子类multiprocessing.Process对象并行化了一些代码。如果我的子类进程对象中的代码没有错误,那么一切都会正常运行。但是,如果我的子类进程对象中的代码有错误,它们显然会无声地崩溃(没有stacktrace打印到父shell中),CPU使用率将降到零。父代码从不崩溃,给人的印象是执行只是挂起。同时,很难找出代码中的错误在哪里,因为没有给出错误在哪里的指示。

我在stackoverflow上找不到任何其他问题来处理同一个问题。

我想,子类进程对象似乎会无声地崩溃,因为它们无法将错误消息打印到父进程的shell中,但我想知道我可以对此做些什么,这样我至少可以更有效地进行调试(这样我的代码的其他用户也可以在遇到问题时告诉我)。

编辑:我的实际代码太复杂,但是一个包含错误的子类流程对象的小例子如下:

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

Tags: 对象代码self进程initdef错误code
3条回答

这不是一个答案,只是一个扩展的评论。请运行此程序并告诉我们您得到的输出(如果有的话):

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

if __name__ == '__main__':
    inq, outq = Queue(), Queue()
    inq.put(1)
    inq.put('STOP')
    w = Worker(inq, outq)
    w.start()

我得到:

% test.py
Process Worker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/unutbu/pybin/test.py", line 21, in run
    1 / 0 # Dumb error
ZeroDivisionError: integer division or modulo by zero

你什么也得不到我很惊讶。

我建议这样的变通方法来显示进程的异常

from multiprocessing import Process
import traceback


run_old = Process.run

def run_new(*args, **kwargs):
    try:
        run_old(*args, **kwargs)
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        traceback.print_exc(file=sys.stdout)

Process.run = run_new

您真正想要的是某种方式将异常传递给父进程,对吧?然后你可以随心所欲地处理它们。

如果使用^{},这是自动的。如果你使用^{},这很简单。如果你使用显式的ProcessQueue,你需要做一点工作,但不是太多。

例如:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put(result)
    except Exception as e:
        self.outputQueue.put(e)

然后,您的调用代码就可以像其他代码一样从队列中读取Exception。而不是这个:

yield outq.pop()

执行以下操作:

result = outq.pop()
if isinstance(result, Exception):
    raise result
yield result

(我不知道实际的父进程队列读取代码是做什么的,因为最小的示例忽略了队列。但希望这能解释这个想法,即使你真正的代码并不是这样工作的。)

这假设您要中止任何未处理的异常,使其成为run。如果要传回异常并继续下一个i in iter,只需将try移到for中,而不是围绕它。

这还假设Exceptions不是有效值。如果这是一个问题,最简单的解决方案是只推(result, exception)元组:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put((result, None))
    except Exception as e:
        self.outputQueue.put((None, e))

然后,弹出代码会执行以下操作:

result, exception = outq.pop()
if exception:
    raise exception
yield result

您可能会注意到这类似于node.js回调样式,在这种样式中,您将(err, result)传递给每个回调。是的,这很烦人,而且你会把这种风格的代码弄乱。但实际上除了包装器之外,您并没有在任何地方使用它;所有从队列中获取值或在run内部调用的“应用程序级”代码只看到正常的返回/产生和引发的异常。

您甚至可以考虑根据concurrent.futures的规范构建Future(或按原样使用该类),即使您正在执行作业队列并手动执行。这并不难,而且它提供了一个非常好的API,特别是用于调试。

最后,值得注意的是,使用executor/pool设计可以使大多数围绕worker s和queue构建的代码变得简单得多,即使您绝对确定每个队列只需要一个worker。只需废弃所有的样板文件,并将Worker.run方法中的循环转换为一个函数(它只是returns或raises,而不是附加到队列中)。在调用端,再次废弃所有样板文件,只使用job函数及其参数submitmap

你的整个例子可以简化为:

def job(i):
    # (code that does stuff)
    1 / 0 # Dumb error
    # (more code that does stuff)
    return result

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(job, range(10))

它会自动正确处理异常。


正如您在注释中提到的,异常的回溯不会追溯到子进程;它只会追溯到手动调用(或者,如果您使用的是池或执行器,则是池或执行器的勇气)。

原因是multiprocessing.Queue构建在pickle之上,而pickling异常不会pickle它们的回溯。这是因为你不能腌制回溯。其原因是,回溯充满了对本地执行上下文的引用,因此让它们在另一个进程中工作将非常困难。

所以…你能怎么办?不要去寻找一个全面的解决方案。相反,想想你真正需要什么。90%的情况下,您需要的是“记录异常,使用回溯,然后继续”或“使用回溯,将异常打印到stderrexit(1),就像默认的未处理异常处理程序一样”。对于这两种情况,您根本不需要传递异常;只需在子端格式化它并传递一个字符串。如果你确实需要一些更新奇的东西,找出你所需要的,并传递足够的信息来手动组合起来。如果不知道如何格式化回溯和异常,请参见^{}模块。很简单。这意味着你根本不需要进入腌菜机。(这并不是说用copyregpickler或用__reduce__方法或其他方法编写holder类是非常困难的,但是如果不需要,为什么要学习这些呢?)

相关问题 更多 >