我正在进行一系列数值模拟。我需要对结果进行一些敏感性分析,即计算并显示当某些输入在给定范围内变化时,某些输出的变化程度。基本上我需要创建一个这样的表,其中每一行都是一个模型运行的结果:
+-------------+-------------+-------------+-------------+
| Input 1 | Input 2 | Output 1 | Output 2 |
+-------------+-------------+-------------+-------------+
| 0.708788979 | 0.614576315 | 0.366315092 | 0.476088865 |
| 0.793662551 | 0.938622754 | 0.898870204 | 0.014915374 |
| 0.366560694 | 0.244354275 | 0.740988568 | 0.197036087 |
+-------------+-------------+-------------+-------------+
每个模型的运行都很难并行化,但是让每个CPU运行一个具有不同输入的不同模型并不难并行化。你知道吗
我已经将一些东西与多处理库结合起来,但比我希望的要慢得多。你对我做错了什么/如何加快速度有什么建议吗?我愿意使用多处理以外的库。你知道吗
这与负载平衡有关吗? 我必须承认,我对Python中的多处理是新手,对map、apply和apply\u async之间的区别不是很清楚。
我做了一个玩具例子来说明我的意思:我从对数正态分布创建随机样本,并计算样本的平均值随着分布的平均值和sigma的变化而变化的程度。这只是一个平庸的例子,因为这里重要的不是模型本身,而是并行运行多个模型。你知道吗
在我的示例中,时间(以秒为单位)为:
+-----------------+-----------------+---------------------+
| Million records | Time (parallel) | Time (not parallel) |
+-----------------+-----------------+---------------------+
| 5 | 24.4 | 18 |
| 10 | 26.5 | 35.8 |
| 20 | 32.2 | 71 |
+-----------------+-----------------+---------------------+
只有在500万到1000万的样本量之间,并行才能带来任何好处。这是意料之中的吗?
另外,我知道敏感性分析的SALib库,但据我所知,它并不能满足我的需求。你知道吗
我的代码:
import numpy as np
import pandas as pd
import time
import multiprocessing
from multiprocessing import Pool
# I store all the possible inputs in a dataframe
tmp = {}
i = 0
for mysigma in np.linspace(0,1,10):
for mymean in np.linspace(0,1,10):
i += 1
tmp[i] = pd.DataFrame({'mean':[mymean],\
'sigma':[mysigma]})
par_inputs = pd.concat( [tmp[x] for x in tmp], axis=0, ignore_index=True)
def not_parallel(df):
for row in df.itertuples(index=True):
myindex = row[0]
mymean = row[1]
mysigma = row[2]
dist = np.random.lognormal(mymean, mysigma, size = n)
empmean = dist.mean()
df.loc[myindex,'empirical mean'] = empmean
df.to_csv('results not parallel.csv')
# splits the dataframe and sets up the parallelisation
def parallelize_dataframe(df, func):
df_split = np.array_split(df, num_partitions)
pool = Pool(num_cores)
conc_df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
conc_df.to_csv('results parallelized.csv')
return conc_df
# the actual function being parallelised
def parallel_sensitivities(data):
for row in data.itertuples(index=True):
myindex = row[0]
mymean = row[1]
mysigma = row[2]
dist = np.random.lognormal(mymean, mysigma, size = n)
empmean = dist.mean()
print(empmean)
data.loc[myindex,'empirical mean'] = empmean
return data
num_cores = multiprocessing.cpu_count()
num_partitions = num_cores
n = int(5e6)
if __name__ == '__main__':
start = time.time()
not_parallel(par_inputs)
time_np = time.time() - start
start = time.time()
parallelize_dataframe(par_inputs, parallel_sensitivities)
time_p = time.time() - start
时间差用于启动多个进程。启动每个进程需要几秒钟的时间。实际处理时间比非并行处理要好得多,但多处理速度提高的一部分是接受启动每个进程所需的时间。你知道吗
在本例中,示例函数的秒数相对较快,因此在少量记录上看不到时间立即增加。对于每个记录上更密集的操作,并行化会带来更大的时间收益。你知道吗
从this article.
编辑:
当使用
Pool()
时,有几个选项可以将任务分配给池。你知道吗multiprocessing.apply_asynch()
docs用于分配单个任务,以避免在等待该任务完成时阻塞。你知道吗multiprocessing.map_async
docs将通过chunk_size
对iterable进行分块,并将每个分块添加到要完成的池中。你知道吗在您的例子中,这将取决于您正在使用的真实场景,但是它们不能基于时间进行交换,而是基于您需要运行的函数。我不确定你需要哪一个,因为你用了一个假的例子。如果需要运行每个函数并且函数是自包含的,我猜您可以使用
apply_asynch
。如果函数可以在iterable上并行运行,则需要map_asynch
。你知道吗相关问题 更多 >
编程相关推荐