多处理、atexit和global d出错

2024-10-02 00:42:12 发布

您现在位置:Python中文网/ 问答频道 /正文

抱歉,这会很长时间。。。在

可能相关:

Python Multiprocessing atexit Error "Error in atexit._run_exitfuncs"

绝对相关:

python parallel map (multiprocessing.Pool.map) with global data

Keyboard Interrupts with python's multiprocessing Pool

这是一个“简单”的脚本,我一起破解来说明我的问题。。。在

import time
import multiprocessing as multi
import atexit

cleanup_stuff=multi.Manager().list([])

##################################################
# Some code to allow keyboard interrupts  
##################################################
was_interrupted=multi.Manager().list([])
class _interrupt(object):
    """
    Toy class to allow retrieval of the interrupt that triggered it's execution
    """
    def __init__(self,interrupt):
        self.interrupt=interrupt

def interrupt():
    was_interrupted.append(1)

def interruptable(func):
    """
    decorator to allow functions to be "interruptable" by
    a keyboard interrupt when in python's multiprocessing.Pool.map
    **Note**, this won't actually cause the Map to be interrupted,
    It will merely cause the following functions to be not executed.
    """
    def newfunc(*args,**kwargs):
        try:
            if(not was_interrupted):
                return func(*args,**kwargs)
            else:
                return False
        except KeyboardInterrupt as e:
            interrupt()
            return _interrupt(e)  #If we really want to know about the interrupt...
    return newfunc

@atexit.register
def cleanup():
    for i in cleanup_stuff:
        print(i)
    return

@interruptable
def func(i):
    print(i)
    cleanup_stuff.append(i)
    time.sleep(float(i)/10.)
    return i

#Must wrap func here, otherwise it won't be found in __main__'s dict
#Maybe because it was created dynamically using the decorator?
def wrapper(*args):
    return func(*args)


if __name__ == "__main__":

    #This is an attempt to use signals -- I also attempted something similar where
    #The signals were only caught in the child processes...Or only on the main process...
    #
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)

    #Try 2 with signals (only catch signal on main process)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)
    #def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN)
    #p=multi.Pool(processes=4,initializer=startup)

    #Try 3 with signals (only catch signal on child processes)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,signal.SIG_IGN)
    #def startup(): signal.signal(signal.SIGINT,onSigInt)
    #p=multi.Pool(processes=4,initializer=startup)


    p=multi.Pool(4)
    try:
        out=p.map(wrapper,range(30))
        #out=p.map_async(wrapper,range(30)).get()  #This doesn't work either...

        #The following lines don't work either
        #Effectively trying to roll my own p.map() with p.apply_async 
        # results=[p.apply_async(wrapper,args=(i,)) for i in range(30)]
        # out = [ r.get() for r in results() ]
    except KeyboardInterrupt:
        print ("Hello!")
        out=None
    finally:
        p.terminate()
        p.join()

    print (out)

如果没有触发键盘中断,这就可以正常工作。但是,如果我引发一个,会发生以下异常:

^{pr2}$

有趣的是,代码确实退出了池.map函数而不调用任何附加函数。。。问题似乎是键盘中断在某些时候没有得到正确的处理,但是它在哪里,以及为什么它没有在interruptable中被处理,有点令人困惑。谢谢。在

注意,如果我使用out=p.map_async(wrapper,range(30)).get(),同样的问题也会发生

编辑1

再靠近一点。。。如果我将out=p.map(...)括在try,except,finally子句中,它将消除第一个异常。。。然而,其他的仍然是在atexit中培养出来的。上面的代码和回溯已经更新。在

编辑2

其他不起作用的内容已作为注释添加到上面的代码中。(相同的错误)。这一尝试的灵感来自:

http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/

编辑3

另一次使用添加到上述代码中的信号的尝试失败。在

编辑4

我已经找到了如何重新构造我的代码,这样就不再需要上面的代码了。如果(不太可能)有人无意中发现了与我相同的用例的线程,我将描述我的解决方案。。。在

用例

我有一个函数,它使用tempfile模块生成临时文件。我想在程序退出时清理这些临时文件。我最初的尝试是将每个临时文件名打包到一个列表中,然后使用通过atexit.register注册的函数删除列表中的所有元素。问题是更新后的列表没有跨多个进程更新。在这里,我想到了使用multiprocessing.Manager来管理列表数据。不幸的是,无论我如何努力,这在KeyboardInterrupt上都失败了,因为进程之间的通信套接字由于某种原因被破坏了。这个问题的解决办法很简单。在使用多处理之前,请设置临时文件目录。。。类似tempfile.tempdir=tempfile.mkdtemp()的内容,然后注册一个函数来删除临时目录。每个进程都写入同一个临时目录,因此它可以工作。当然,此解决方案只适用于共享数据是程序生命周期结束时需要删除的文件列表。


Tags: thetoinimportmapsignalreturndef

热门问题