如何使用多线程加速嵌套循环计算?

2024-09-30 01:18:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在一个大数组上进行数值积分,计算需要很长时间。我试图通过使用numba和jit decorator来加速我的代码,但是纽比·特拉普兹不支持。在

我的新想法是创建n多个线程来并行运行计算,但我想知道如何才能做到这一点,或者它是否可行?在

引用以下代码

我可以让sz[2]多个线程同时运行来调用ZO\u SteadState来计算值吗?在

    for i in range(sz[1]):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)

正在讨论的计算。

^{pr2}$

Tags: 代码infors3range线程bmr3
2条回答

这是我可能会做的整体想法。没有足够的上下文给你一个更可靠的例子。你必须在类中设置所有的变量。在

import multiprocessing

pool = multiprocessing.Pool(processes=12)
runner = mp_Z0(variable=variable, variable2=variable2)

for i, j, v in pool.imap(runner.run, range(sz[1]):
    SS[0, i, j] = v


class mp_Z0:

    def __init__(self, **kwargs):
        for k, v in kwargs:
            setattr(self, k, v)


    def run(self, i):
        phii = self.phi[i]
        for j in range(self.sz[2]):
            s = self.tau[0, i, j, :].reshape(1, self.n4)
            [R3, PHI3, S3] = meshgrid(self.rprime, self.phiprime, s)
            BCoeff = Bessel0(self.bm * R3)

            return (i, j, ZO_SteadyState(self.alpha, self.b, self.bm, BCoeff, Bessel_Denom, self.k2, self.maxt, phii, self.PHI2, PHI3, self.phiprime, R3, self.rprime, self.s, S3, self.T, self.v))

这是一个不使用类执行此操作的示例(假设所有内容都在本地命名空间中):

^{pr2}$

另一个概念实现,流程产生流程(编辑:jit测试): 在

import numpy as np

# better pickling
import pathos 
from contextlib import closing


from numba import jit

#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)




# matrix dimensions
x = 100 # i
y = 500 # j

NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned

SS = np.zeros([x, y], dtype=float)

@jit
def foo(i):
    return (i*i + 1)
@jit
def bar(phii, j):
    return phii*(j+1)

# The code which is implemented down here:
'''
for i in range(x):
    phii = foo(i)
    for j in range(y):
        SS[i, j] = bar(phii, j)
'''

# Threaded version:
# queue is in global scope


def outer_loop(i):

    phii = foo(i)

    # i is in process scope
    def inner_loop(j):
        result = bar(phii,j)
        # the data is coordinates and result
        return (i, j, result)


    with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
        res = list(pool.imap(inner_loop, range(y)))
    return res

with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:    
    results = list(pool.imap(outer_loop, range(x)))

result_list = []
for r in results:
    result_list += r


# read results from queue
for res in result_list:
    if res:
        i, j, val = res
        SS[i,j] = val


# check that all cells filled
print(np.count_nonzero(SS)) # 100*500

编辑:解释。在

这段代码中所有复杂的原因是我想做比OP要求的更多的并行化。如果只有内循环被并行化,那么外部循环仍然存在,因此对于外部循环的每次迭代,都会创建新的进程池,并对内部循环进行计算。在我看来,只要这个公式不依赖于外循环的其他迭代,我就决定并行化所有的事情:现在外循环的计算被分配给池中的进程,之后每个“外循环”进程都会创建自己的新池,生成额外的进程来执行内部循环的计算。在

不过,我可能错了,不能并行化外部循环;在这种情况下,您只能保留内部进程池。在

使用过程池可能不是最佳的解决方案,因为创建和销毁池将消耗时间。更有效(但需要手动模式)的解决方案是一次性地实例化N个进程,然后使用multiple processing Queue()将数据输入其中并接收结果。因此,您应该首先测试这个多处理解决方案是否能提供足够的加速(如果构建和销毁池的时间与Z0_SteadyStaterun相比很小,就会出现这种情况)。在

下一个复杂的问题是人工的无守护进程池。守护进程用于优雅地停止应用程序:当主程序退出时,守护进程将以静默方式终止。但是,守护进程不能派生子进程。在您的示例中,您需要等到每个进程结束后才能检索数据,所以我将它们设置为非守护程序,以允许生成子进程来计算内部循环。在

数据交换:我认为,与实际计算相比,填充矩阵所需的数据量和时间是很小的。所以我使用pools和pool.imap函数(比.map()快一点)。您也可以尝试.imap_unordered(),但是在您的情况下,它不会产生显著的差异)。因此,内部池等待直到计算出所有结果并以列表形式返回它们。因此,外部池返回必须连接的列表列表。然后利用这些结果在单个快速回路中重构矩阵。在

注意with closing()thing:在完成此语句下的操作后,它会自动关闭池,避免僵尸进程消耗内存。在

另外,您可能会注意到,我在另一个函数中定义了一个奇怪的函数,在进程内部,我可以访问一些没有传递到那里的变量:iphii。发生这种情况的原因是进程可以访问全局作用域,从该作用域使用copy-on-change策略(默认fork模式)启动它们。这不涉及酸洗,而且速度快。在

最后一个注释是关于使用pathos库而不是标准的multiprocessingconcurrent.futuressubprocess等。原因是,pathos使用了更好的pickling库,因此它可以序列化标准库不能序列化的函数(例如lambda函数)。我不知道你的功能,所以我用了更强大的工具来避免进一步的问题。在

最后一件事:多处理与线程。您可以将pathos处理池从concurrent.futures更改为标准ThreadPoolExecutor,就像我刚开始那段代码时所做的那样。但是,在执行过程中,我的系统CPU只加载100%(也就是说,使用了一个内核,似乎所有8个内核都以15-20%的速度加载)。我不太擅长理解线程和进程之间的区别,但对我来说,进程允许使用所有内核(每个内核100%,总共800%)。在

相关问题 更多 >

    热门问题