Python多进程:如何使用回调函数实现“apply_async”循环为“map_async”

2024-10-01 04:59:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用Python的多处理模块集成一个微分方程组,用于几个参数的组合。因此,系统应该进行集成,存储参数组合,以及它的指标和其中一个变量的最终值。在

当我使用apply_async这已经比在一个简单的for循环中做的更快——但是我不能用看起来比apply_async快的map_async来实现同样的事情。回调函数从未被调用,我不知道为什么。有人能解释一下为什么会发生这种情况,以及如何使用map_async而不是{}获得相同的输出?!在

这是我的代码:

from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time

#my system of differential equations
def myODE (yn,tvec,allpara):

    (x, y, z) = yn

    a, b = allpara['para']

    dx  = -x + a*y + x*x*y
    dy = b - a*y - x*x*y
    dz = x*y

    return (dx, dy, dz) 

#returns the index of the parameter combination, the parameters and the integrated solution
#this way I know which parameter combination belongs to which outcome in the asynch-case
def runMyODE(yn,tvec,allpara):
    return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))

#for reproducibility    
seed(0) 

#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)

numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 5 #number of parameter combinations

INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here

#create some initial conditions and random parameters
for combi in range(numComb):

    INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0

    PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly

#################################using loop over apply####################

#results will be stored in here
asyncResultsApply = []

#my callback function
def saveResultApply(result):
    # storing the index, a, b and the final value of z
    asyncResultsApply.append((result[0], result[1], result[2][2,-1]))

#start the multiprocessing part
pool = mp.Pool(processes=4)
for combi in range(numComb):
    pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply)
pool.close()
pool.join()

for res in asyncResultsApply:
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z

#######################################using map#####################
#the only difference is that the for loop is replaced by a "map_async" call
print "\n\nnow using map\n\n"
asyncResultsMap = []

#my callback function which is never called
def saveResultMap(result):
    # storing the index, a, b and the final value of z
    asyncResultsMap.append((result[0], result[1], result[2][2,-1]))

pool = mp.Pool(processes=4)
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap)
pool.close()
pool.join()

#this does not work yet
for res in asyncResultsMap:
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z

Tags: andoftheinmapforasyncindex
1条回答
网友
1楼 · 发布于 2024-10-01 04:59:07

如果我对你的理解是正确的,那是源于一些经常让人困惑的事情。apply_async的回调是在单个操作之后调用的,但是{}的回调也是如此——它不是对每个元素调用回调,而是对整个结果调用一次回调。在

您注意到mapapply_async快是正确的。如果您希望在每个结果之后发生一些事情,可以使用以下几种方法:

  1. 您可以有效地将回调添加到要对每个元素执行的操作中,map使用它。

  2. 您可以在循环中使用imap(或imap_unordered),并在循环体中执行回调。当然,这意味着所有操作都将在父进程中执行,但作为回调编写的内容的性质意味着这通常不是问题(它往往是廉价函数)。基督教青年会。


例如,假设您有函数fcb,并且您希望在mapf上使用cb对每个操作执行以下操作:

def look_ma_no_cb(e):
    r = f(e)
    cb(r)
    return r

p = multiprocessing.Pool()
p.map(look_ma_no_cb, es)

或者

^{pr2}$

相关问题 更多 >