如何使用Python multiprocessing Pool.map在for循环中填充numpy数组

2024-05-19 13:25:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在for循环中填充一个2D numpy数组,并通过使用多处理来加速计算。

import numpy
from multiprocessing import Pool


array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)

def fill_array(start_val):
    return range(start_val,start_val+10)

list_start_vals = range(40,60)
for line in xrange(20):
    array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()

print array_2D

执行它的效果是Python运行4个子进程,占用4个CPU核,但是执行没有完成,数组没有打印出来。如果我尝试将阵列写入磁盘,则不会发生任何事情。

有人能告诉我为什么吗?


Tags: fromimportnumpyforlinerangeval数组
3条回答

问题是由于在for循环中运行pool.map,map()方法的结果在功能上等同于内置的map(),只是个别任务是并行运行的。 因此,在您的情况下,pool.map(fill_array,list_start_vals)将被调用20次,并开始为for循环的每个迭代并行运行,下面的代码应该可以工作

代码:

#!/usr/bin/python

import numpy
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val,start_val+10)

if __name__ == "__main__":
    array_2D = numpy.zeros((20,10))
    pool = Pool(processes = 4)    
    list_start_vals = range(40,60)

    # running the pool.map in a for loop is wrong
    #for line in xrange(20):
    #    array_2D[line,:] = pool.map(fill_array,list_start_vals)

    # get the result of pool.map (list of values returned by fill_array)
    # in a pool_result list 
    pool_result = pool.map(fill_array,list_start_vals)

    # the pool is processing its inputs in parallel, close() and join() 
    #can be used to synchronize the main process 
    #with the task processes to ensure proper cleanup.
    pool.close()
    pool.join()

    # Now assign the pool_result to your numpy
    for line,result in enumerate(pool_result):
        array_2D[line,:] = result

    print array_2D

如果仍要使用数组填充,可以使用pool.apply_async,而不是pool.map。根据索洛的回答:

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val, start_val+10)

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.zeros((20,10))
    for line, val in enumerate(list_start_vals):
        result = pool.apply_async(fill_array, [val])
        array_2D[line,:] = result.get()
    pool.close()
    print array_2D

这比map运行得慢一些。但它不会产生像我对map版本的测试那样的运行时错误:Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

下面的工作。首先,最好在主块中保护代码的主要部分,以避免奇怪的副作用。poo.map()的结果是一个列表,其中包含迭代器list_start_vals中每个值的求值,这样您就不必在之前创建array_2D

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return list(range(start_val, start_val+10))

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.array(pool.map(fill_array, list_start_vals))
    pool.close() # ATTENTION HERE
    print array_2D

也许你在使用pool.close()时会遇到问题,从@hpaulj的注释中可以删除这一行,以防遇到问题。。。

相关问题 更多 >