使用面向对象的python进程之间的共享池映射

2024-09-29 23:27:13 发布

您现在位置:Python中文网/ 问答频道 /正文

Python2.7

我正在尝试做一种扫描器,它必须遍历CFG节点,并在分支的不同进程中进行拆分,以实现并行性。在

扫描器由scanner类的对象表示。这个类有一个方法遍历,该方法遍历所述图并在必要时拆分。在

下面是它的样子:

class Scanner(object):
    def __init__(self, atrb1, ...):
       self.attribute1 = atrb1
       self.process_pool = Pool(processes=4)
    def traverse(self, ...):
        [...]
        if branch:
           self.process_pool.map(my_func, todo_list).

我的问题是: 如何创建多处理.池,在我的所有进程之间共享?我希望它被共享,因为一个路径可以再次被分割,我不想以一种分叉炸弹结束,而拥有相同的池将帮助我限制同时运行的进程的数量。在

上面的代码不起作用,因为池不能被pickle。因此,我尝试过:

^{pr2}$

但很明显,这会导致self.process_池未在创建的流程中定义。在

然后,我尝试创建一个池作为模块属性:

process_pool = Pool(processes=4)

def my_func(x):
    [...]

class Scanner(object):
    def __init__(self, atrb1, ...):
       self.attribute1 = atrb1
    def traverse(self, ...):
        [...]
        if branch:
           process_pool.map(my_func, todo_list)

它不起作用,这个answer解释了原因。 但事情来了,无论我在哪里创建我的游泳池,都少了一些东西。如果我在文件末尾创建这个池,它不会看到自我属性1,方法与它没有看到answer相同,并因AttributeError而失败。在

我甚至还没打算分享它,而且我已经习惯了多处理方式来处理事情。在

我不知道我是否没有正确地思考整个事情,但我不敢相信,处理像“拥有一个员工团队并给他们分配任务”这样简单的事情会如此复杂。在

谢谢你

编辑: 我解决了我的第一个问题(AttributeError),我的类有一个回调作为它的属性,这个回调是在导入scanner模块之后在主脚本文件中定义的。。。但是并发和“不要叉子炸弹”的事情仍然是个问题。在


Tags: 方法self属性进程mydef事情process
1条回答
网友
1楼 · 发布于 2024-09-29 23:27:13

你想做的事不可能安全地完成。想想您是否有一个共享的Pool在父进程和工作进程之间共享,比如说,有两个工作进程。父进程运行一个map,它试图执行两个任务,每个任务需要map另外两个任务。两个被分派的父任务被分配到每个工作线程,而父进程块。每个工作线程再向共享池发送两个任务并阻止它们完成。但是现在所有的工人都被占用了,等待着一个工人自由;你陷入僵局了。在

在父任务调度中,返回额外的信息是足够安全的。然后你可以做一些类似的事情:

class MoreWork(object):
    def __init__(self, func, *args):
        self.func = func
        self.args = args

pool = multiprocessing.Pool()
try:
    base_task = somefunc, someargs
    outstanding = collections.deque([pool.apply_async(*base_task)])
    while outstanding:
        result = outstanding.popleft().get()
        if isinstance(result, MoreWork):
            outstanding.append(pool.apply_async(result.func, result.args))
        else:
            ... do something with a "final" result, maybe breaking the loop ...
finally:
     pool.terminate()

函数是什么取决于你,当有更多的事情要做时,它们只会在MoreWork中返回信息,而不是直接启动任务。关键是要确保通过让父进程单独负责任务调度,而工人单独负责任务完成,您不会因为所有工作线程都被阻塞,等待队列中的任务,但没有被处理而导致死锁。在

这一点也没有得到优化;理想情况下,如果队列中的其他项目已完成,则不会阻止等待队列中的第一个项目;使用concurrent.futures模块来完成这项工作要容易得多,特别是使用^{}来等待来自任意数量的未完成任务的第一个可用结果,但您需要第三方PyPI包才能在python2.7上获得concurrent.futures。在

相关问题 更多 >

    热门问题