(Python2.7)
我正在尝试做一种扫描器,它必须遍历CFG节点,并在分支的不同进程中进行拆分,以实现并行性。在
扫描器由scanner类的对象表示。这个类有一个方法遍历,该方法遍历所述图并在必要时拆分。在
下面是它的样子:
class Scanner(object):
def __init__(self, atrb1, ...):
self.attribute1 = atrb1
self.process_pool = Pool(processes=4)
def traverse(self, ...):
[...]
if branch:
self.process_pool.map(my_func, todo_list).
我的问题是: 如何创建多处理.池,在我的所有进程之间共享?我希望它被共享,因为一个路径可以再次被分割,我不想以一种分叉炸弹结束,而拥有相同的池将帮助我限制同时运行的进程的数量。在
上面的代码不起作用,因为池不能被pickle。因此,我尝试过:
^{pr2}$但很明显,这会导致self.process_池未在创建的流程中定义。在
然后,我尝试创建一个池作为模块属性:
process_pool = Pool(processes=4)
def my_func(x):
[...]
class Scanner(object):
def __init__(self, atrb1, ...):
self.attribute1 = atrb1
def traverse(self, ...):
[...]
if branch:
process_pool.map(my_func, todo_list)
它不起作用,这个answer解释了原因。 但事情来了,无论我在哪里创建我的游泳池,都少了一些东西。如果我在文件末尾创建这个池,它不会看到自我属性1,方法与它没有看到answer相同,并因AttributeError而失败。在
我甚至还没打算分享它,而且我已经习惯了多处理方式来处理事情。在
我不知道我是否没有正确地思考整个事情,但我不敢相信,处理像“拥有一个员工团队并给他们分配任务”这样简单的事情会如此复杂。在
谢谢你
编辑: 我解决了我的第一个问题(AttributeError),我的类有一个回调作为它的属性,这个回调是在导入scanner模块之后在主脚本文件中定义的。。。但是并发和“不要叉子炸弹”的事情仍然是个问题。在
你想做的事不可能安全地完成。想想您是否有一个共享的
Pool
在父进程和工作进程之间共享,比如说,有两个工作进程。父进程运行一个map
,它试图执行两个任务,每个任务需要map
另外两个任务。两个被分派的父任务被分配到每个工作线程,而父进程块。每个工作线程再向共享池发送两个任务并阻止它们完成。但是现在所有的工人都被占用了,等待着一个工人自由;你陷入僵局了。在在父任务调度中,返回额外的信息是足够安全的。然后你可以做一些类似的事情:
函数是什么取决于你,当有更多的事情要做时,它们只会在
MoreWork
中返回信息,而不是直接启动任务。关键是要确保通过让父进程单独负责任务调度,而工人单独负责任务完成,您不会因为所有工作线程都被阻塞,等待队列中的任务,但没有被处理而导致死锁。在这一点也没有得到优化;理想情况下,如果队列中的其他项目已完成,则不会阻止等待队列中的第一个项目;使用} 来等待来自任意数量的未完成任务的第一个可用结果,但您需要第三方PyPI包才能在python2.7上获得
concurrent.futures
模块来完成这项工作要容易得多,特别是使用^{concurrent.futures
。在相关问题 更多 >
编程相关推荐