在类方法Python中调用多处理

2024-06-15 08:29:35 发布

您现在位置:Python中文网/ 问答频道 /正文

最初,我有一个类来存储一些已处理的值,并将这些值与其他方法一起重用。

问题是,当我试图将类方法划分为多个进程以加快速度时,python生成了进程,但它似乎不起作用(正如我在Task Manager中看到的,只有一个进程在运行),结果永远不会被传递。

我做了几次搜索,发现pathos.multiprocessing可以代替它,但我想知道标准库是否可以解决这个问题?

from multiprocessing import Pool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return t

a = A(2)

a.run(list(range(10)))

Tags: 方法runselftaskreturn进程defdt
2条回答

Question: it seems didn't work (as I saw in Task Manager that only 1 process was running) and result is never delivered.

您只看到1个进程作为Pool计算使用的进程数如下:
你给出了range(10)=任务索引0..9,因此Pool计算(10 / 4) * 4 = 8+1 = 9
首先启动process就没有任务了。
使用range(32),您将看到4process正在运行。

您返回的是return t,而不是rs = pool.map(...的结果。


例如,这将起作用

def cal(self, nb):
    import os
    print('pid:{} cal({})'.format(os.getpid(), nb))
    return nb * self.vl

def run(self,df):
    with mp.Pool(processes=4) as pool:
       return pool.map(self.cal, df)

if __name__ == '__main__':
    a = A(2)
    result = a.run(list(range(32)))
    print(result)

使用Python测试:3.4.2

您的代码失败,因为它不能pickle实例方法(self.cal),这是Python在通过将多个进程映射到multiprocessing.Pool来生成多个进程时试图做的事情(好吧,有一种方法可以做到这一点,但它太复杂了,无论如何也不是非常有用)-因为没有共享内存访问,它必须“打包”数据并将其发送到要解包的派生进程。如果您尝试pickle a实例,同样的情况也会发生。

multiprocessing包中唯一可用的共享内存访问是鲜为人知的multiprocessing.pool.ThreadPool,因此如果您真的想这样做:

from multiprocessing.pool import ThreadPool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = ThreadPool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return rs

a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

但这不会给您并行化,因为它本质上映射到您的常规线程,这些线程可以访问共享内存。您应该将类/静态方法(如果需要调用它们)与希望它们使用的数据一起传递(在您的例子中是self.vl)。如果您需要跨进程共享这些数据,那么您必须使用一些共享内存抽象,比如multiprocessing.Value,当然,在这一过程中应用互斥。

更新

我说过你可以这样做(比如,有些模块或多或少都在这样做,检查一下pathos.multiprocessing),但我认为这不值得麻烦——当你到了必须欺骗你的系统去做你想做的事情的地步时,很有可能你要么使用了错误的系统,要么你应该重新考虑你的设计。但是为了信息性,这里有一种方法可以在多处理设置中实现您的要求:

import sys
from multiprocessing import Pool

def parallel_call(params):  # a helper for calling 'remote' instances
    cls = getattr(sys.modules[__name__], params[0])  # get our class type
    instance = cls.__new__(cls)  # create a new instance without invoking __init__
    instance.__dict__ = params[1]  # apply the passed state to the new instance
    method = getattr(instance, params[2])  # get the requested method
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
    return method(*args)  # expand arguments, call our method and return the result

class A(object):

    def __init__(self, vl):
        self.vl = vl

    def cal(self, nb):
        return nb * self.vl

    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(parallel_call, self.prepare_call("cal", dt))
        t.close()
        return rs

    def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
        for arg in args:
            yield [self.__class__.__name__, self.__dict__, name, arg]

if __name__ == "__main__":  # important protection for cross-platform use
    a = A(2)
    print(a.run(list(range(10))))
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

我认为它的工作原理非常简单,但简而言之,它将类的名称、当前状态(sans signals,tho)、要调用的方法和用来调用它的参数传递给一个parallel_call函数,该函数为Pool中的每个进程调用。Python会自动pickle和unpickle所有这些数据,因此parallel_call需要做的就是重建原始对象,在其中找到所需的方法,并使用提供的参数调用它。

这样,我们只传递数据,而不传递活动对象,这样Python就不会抱怨了(在本例中,尝试向类参数中添加对实例方法的引用,看看会发生什么),一切都正常。

如果你想增加“魔力”,你可以让它看起来和你的代码一模一样(创建你自己的Pool处理程序,从函数中提取名字并将名字发送给实际的进程,等等),但是这应该为你的例子提供足够的函数。

但是,在提高期望值之前,请记住,只有在共享“静态”实例(一个在多处理上下文中开始调用它时不会更改其初始状态的实例)时,这才起作用。如果A.cal方法要更改vl属性的内部状态,那么它只会影响它更改的实例(除非它在调用之间调用Pool的主实例中更改)。如果还想共享状态,可以在调用后升级parallel_call以获取instance.__dict__,并将其与方法调用结果一起返回,然后在调用端,必须使用返回的数据更新本地__dict__以更改原始状态。这还不够——实际上,您必须创建一个共享的dict并处理所有的互斥工作人员,以便所有进程同时访问它(您可以为此使用multiprocessing.Manager)。

所以,正如我所说,麻烦比它的价值还多。。。

相关问题 更多 >