如何在并行进程池中池对象?

2024-09-29 23:29:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个函数,可以借助可变对象进行计算,如下例所示:

def fun(obj: MutableObject, input_a, input_b):
    obj.a = input_a
    return obj.do_stuff(input_b)

我需要多次这样做,目前正在使用for循环,如下所示:

obj = MutableObject()
output = []

for input_a, input_b in inputs:
    output.append(fun(obj, input_a, input_b))

为了加快进程,我想使用python多处理并并行执行多个fun调用。我看到的一种常见方法是使用multiproccesing.Pool映射列表。对于我来说,这样一个实现的问题是,我有一个需要在进程之间共享的可变对象。我希望每个进程都能访问对象的克隆,而不必创建不必要的多个克隆

天真的尝试是为每个输入复制对象:

import multiprocessing
import copy

obj = MutableObject()
def map_fun(arg):
    input_a, input_b = arg
    temp_obj = copy.deepcopy(obj)
    return fun(temp_obj, input_a, input_b)

pool = multiprocessing.Pool()
outputs = pool.map(map_fun, inputs)

但这似乎浪费了CPU和内存

我有没有办法创建一个对象副本的临时池,每个并行进程一个,而不是为每个输入对创建一个

编辑:

有人在评论中指出,内存可能不会成为问题,因为垃圾收集将清理未使用的副本。我仍然担心复制将需要大量资源,尽管我的可变对象实际上是一个相当大的Keras模型(神经网络)


Tags: 对象importobjmapforinputoutputreturn
1条回答
网友
1楼 · 发布于 2024-09-29 23:29:45

下面是一个解决方案,它删除池并管理线程本身,确保每个进程只有一个对象:

from multiprocessing import Process, cpu_count, JoinableQueue


class MuteableObj:
    def method(self, data):
        data["processed"] = True
        return data


class Worker(Process):
    def __init__(self, task_queue, result_queue):
        super().__init__()
        print("Started", self._name)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self._obj = MuteableObj()
        self._open = True

    def run(self):
        while self._open:
            task = self.task_queue.get()
            print(f"Processing {task['id']}")
            result = self._obj.method(task)
            self.task_queue.task_done()
            self.result_queue.put(result)
        print("over")

    def terminate(self):
        print("Stopped", self._name)
        super().terminate()


task_queue = JoinableQueue()
result_queue = JoinableQueue()

NTHREADS = cpu_count()

for i in range(200):
    task_queue.put(dict(id=i))


threads = [Worker(task_queue, result_queue) for i in range(NTHREADS)]
for t in threads:
    t.start()

task_queue.join()

for t in threads:
    t.terminate()

results = []
while not result_queue.empty():
    results.append(result_queue.get())

print(results)

首先,我们有一个可静音对象的模拟,这里只有一个类,其中有一个我们关心的方法

我们将Process子类化为我们自己,并在初始化时为每个进程提供一个对象。然后,我们用所需的任务填充一个JoinableQueue,等待它们全部完成,然后从另一个队列中获得所有结果(虽然我们实际上可以使用列表和Lock,但我认为这更容易阅读)

请注意,结果不能保证按发送顺序发送。如果这很重要,你应该给他们一个我这里有的身份证,然后按身份证排序

如果需要池无限期运行并对每个结果执行特定操作,则可能需要编写回调,将join()移动到代码末尾(因为它会阻塞,直到所有任务都被处理),然后有一个循环等待结果并调用回调:

from time import sleep

while running:
    while not results_queue.empty():
        callback(results_queue.get())

    while results_queue.empty():
        sleep(0.1)

在本例中,我将把所有这些都封装在另一个类中,称为类似TaskRunner,以保持状态(类似running)的隔离

顺便说一句,这么多年前我第一次看到这个食谱,从那时起我就一直在使用它

相关问题 更多 >

    热门问题