我对使用并行处理进行数据分析很陌生。我有一个相当大的数组,我想对数组的每个索引应用一个函数。你知道吗
以下是我目前掌握的代码:
import numpy as np
import statsmodels.api as sm
from statsmodels.regression.quantile_regression import QuantReg
import multiprocessing
from functools import partial
def fit_model(data,q):
#data is a 1-D array holding precipitation values
years = np.arange(1895,2018,1)
res = QuantReg(exog=sm.add_constant(years),endog=data).fit(q=q)
pointEstimate = res.params[1] #output slope of quantile q
return pointEstimate
#precipAll is an array of shape (1405*621,123,12) (longitudes*latitudes,years,months)
#find all indices where there is data
nonNaN = np.where(~np.isnan(precipAll[:,0,0]))[0] #481631 indices
month = 4
#holder array for results
asyncResults = np.zeros((precipAll.shape[0])) * np.nan
def saveResult(result,pos):
asyncResults[pos] = result
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=20) #my server has 24 CPUs
for i in nonNaN:
#use partial so I can also pass the index i so the result is
#stored in the expected position
new_callback_function = partial(saveResult, pos=i)
pool.apply_async(fit_model, args=(precipAll[i,:,month],0.9),callback=new_callback_function)
pool.close()
pool.join()
当我运行这个,我停止它后,它花了更长的时间比我没有使用多处理。函数fit\u model的时间大约为0.02秒,那么apply\u async相关联的挂起是否会导致速度减慢?我需要保持秩序的结果,因为我正在绘制这个数据到地图上后,这个处理完成。任何关于我需要改进的地方的想法都非常感谢!你知道吗
如果您需要使用multiprocessing模块,那么您可能希望将更多的行批处理到您分配给worker池的每个任务中。但是,对于您正在做的事情,我建议尝试Ray,因为它的efficient handling of large numerical data。你知道吗
一些注释
上面的例子是开箱即用的,但请注意,我简化了一点逻辑。特别是,我删除了
NaN
的处理在我有4个物理内核的笔记本电脑上,这大约需要4秒钟。如果你用20个核来代替,把数据放大9000倍,我估计需要7200秒,这是相当长的时间。一种可能的加速方法是使用更多的机器或在每次调用
fit_model
时处理多个行,以便分摊一些开销。你知道吗上面的示例实际上将整个
precip_all
矩阵传递到每个任务中。这很好,因为每个fit_model
任务只有对存储在共享内存中的矩阵副本的读取权限,因此不需要创建自己的本地副本。对ray.put(precip_all)
的调用将数组放在共享内存的前面一次。你知道吗关于differences between Ray and Python multiprocessing。注意我在帮助雷发展。你知道吗
相关问题 更多 >
编程相关推荐