我有一个函数,可以借助可变对象进行计算,如下例所示:
def fun(obj: MutableObject, input_a, input_b):
obj.a = input_a
return obj.do_stuff(input_b)
我需要多次这样做,目前正在使用for循环,如下所示:
obj = MutableObject()
output = []
for input_a, input_b in inputs:
output.append(fun(obj, input_a, input_b))
为了加快进程,我想使用python多处理并并行执行多个fun
调用。我看到的一种常见方法是使用multiproccesing.Pool
映射列表。对于我来说,这样一个实现的问题是,我有一个需要在进程之间共享的可变对象。我希望每个进程都能访问对象的克隆,而不必创建不必要的多个克隆
天真的尝试是为每个输入复制对象:
import multiprocessing
import copy
obj = MutableObject()
def map_fun(arg):
input_a, input_b = arg
temp_obj = copy.deepcopy(obj)
return fun(temp_obj, input_a, input_b)
pool = multiprocessing.Pool()
outputs = pool.map(map_fun, inputs)
但这似乎浪费了CPU和内存
我有没有办法创建一个对象副本的临时池,每个并行进程一个,而不是为每个输入对创建一个
编辑:
有人在评论中指出,内存可能不会成为问题,因为垃圾收集将清理未使用的副本。我仍然担心复制将需要大量资源,尽管我的可变对象实际上是一个相当大的Keras模型(神经网络)
下面是一个解决方案,它删除池并管理线程本身,确保每个进程只有一个对象:
首先,我们有一个可静音对象的模拟,这里只有一个类,其中有一个我们关心的方法
我们将
Process
子类化为我们自己,并在初始化时为每个进程提供一个对象。然后,我们用所需的任务填充一个JoinableQueue
,等待它们全部完成,然后从另一个队列中获得所有结果(虽然我们实际上可以使用列表和Lock
,但我认为这更容易阅读)请注意,结果不能保证按发送顺序发送。如果这很重要,你应该给他们一个我这里有的身份证,然后按身份证排序
如果需要池无限期运行并对每个结果执行特定操作,则可能需要编写回调,将
join()
移动到代码末尾(因为它会阻塞,直到所有任务都被处理),然后有一个循环等待结果并调用回调:在本例中,我将把所有这些都封装在另一个类中,称为类似
TaskRunner
,以保持状态(类似running
)的隔离顺便说一句,这么多年前我第一次看到这个食谱,从那时起我就一直在使用它
相关问题 更多 >
编程相关推荐