多重处理的数值模拟比预期慢得多:我做错什么了吗?我能加快速度吗?

2024-06-26 14:02:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在进行一系列数值模拟。我需要对结果进行一些敏感性分析,即计算并显示当某些输入在给定范围内变化时,某些输出的变化程度。基本上我需要创建一个这样的表,其中每一行都是一个模型运行的结果:

+-------------+-------------+-------------+-------------+
|   Input 1   |   Input 2   |  Output 1   |  Output 2   |
+-------------+-------------+-------------+-------------+
| 0.708788979 | 0.614576315 | 0.366315092 | 0.476088865 |
| 0.793662551 | 0.938622754 | 0.898870204 | 0.014915374 |
| 0.366560694 | 0.244354275 | 0.740988568 | 0.197036087 |
+-------------+-------------+-------------+-------------+

每个模型的运行都很难并行化,但是让每个CPU运行一个具有不同输入的不同模型并不难并行化。你知道吗

我已经将一些东西与多处理库结合起来,但比我希望的要慢得多。你对我做错了什么/如何加快速度有什么建议吗?我愿意使用多处理以外的库。你知道吗

这与负载平衡有关吗? 我必须承认,我对Python中的多处理是新手,对map、apply和apply\u async之间的区别不是很清楚。

我做了一个玩具例子来说明我的意思:我从对数正态分布创建随机样本,并计算样本的平均值随着分布的平均值和sigma的变化而变化的程度。这只是一个平庸的例子,因为这里重要的不是模型本身,而是并行运行多个模型。你知道吗

在我的示例中,时间(以秒为单位)为:

+-----------------+-----------------+---------------------+
| Million records | Time (parallel) | Time (not parallel) |
+-----------------+-----------------+---------------------+
|               5 | 24.4            | 18                  |
|              10 | 26.5            | 35.8                |
|              20 | 32.2            | 71                  |
+-----------------+-----------------+---------------------+

只有在500万到1000万的样本量之间,并行才能带来任何好处。这是意料之中的吗?

另外,我知道敏感性分析的SALib库,但据我所知,它并不能满足我的需求。你知道吗

我的代码:

import numpy as np
import pandas as pd
import time
import multiprocessing
from multiprocessing import Pool

# I store all the possible inputs in a dataframe
tmp = {}
i = 0
for mysigma in np.linspace(0,1,10):
    for mymean in np.linspace(0,1,10):
        i += 1
        tmp[i] = pd.DataFrame({'mean':[mymean],\
           'sigma':[mysigma]})
par_inputs = pd.concat( [tmp[x] for x in tmp], axis=0, ignore_index=True)      


def not_parallel(df):
    for row in df.itertuples(index=True):
        myindex = row[0]
        mymean = row[1]
        mysigma = row[2]
        dist = np.random.lognormal(mymean, mysigma, size = n)
        empmean = dist.mean()
        df.loc[myindex,'empirical mean'] = empmean

    df.to_csv('results not parallel.csv')

# splits the dataframe and sets up the parallelisation
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    conc_df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    conc_df.to_csv('results parallelized.csv')
    return conc_df

# the actual function being parallelised
def parallel_sensitivities(data):   
    for row in data.itertuples(index=True):
        myindex = row[0]
        mymean = row[1]
        mysigma = row[2]
        dist = np.random.lognormal(mymean, mysigma, size = n)
        empmean = dist.mean()
        print(empmean)
        data.loc[myindex,'empirical mean'] = empmean
    return data


num_cores = multiprocessing.cpu_count()
num_partitions = num_cores
n = int(5e6)

if __name__ == '__main__':

    start = time.time()
    not_parallel(par_inputs)
    time_np = time.time() - start

    start = time.time()
    parallelize_dataframe(par_inputs, parallel_sensitivities)
    time_p = time.time() - start

Tags: in模型importdffortimeparallelnp
1条回答
网友
1楼 · 发布于 2024-06-26 14:02:18

时间差用于启动多个进程。启动每个进程需要几秒钟的时间。实际处理时间比非并行处理要好得多,但多处理速度提高的一部分是接受启动每个进程所需的时间。你知道吗

在本例中,示例函数的秒数相对较快,因此在少量记录上看不到时间立即增加。对于每个记录上更密集的操作,并行化会带来更大的时间收益。你知道吗

Keep in mind that parallelization is both costly, and time-consuming due to the overhead of the subprocesses that is needed by your operating system. Compared to running two or more tasks in a linear way, doing this in parallel you may save between 25 and 30 percent of time per subprocess, depending on your use-case. For example, two tasks that consume 5 seconds each need 10 seconds in total if executed in series, and may need about 8 seconds on average on a multi-core machine when parallelized. 3 of those 8 seconds may be lost to overhead, limiting your speed improvements.

this article.

编辑:

当使用Pool()时,有几个选项可以将任务分配给池。你知道吗

multiprocessing.apply_asynch()docs用于分配单个任务,以避免在等待该任务完成时阻塞。你知道吗

multiprocessing.map_asyncdocs将通过chunk_size对iterable进行分块,并将每个分块添加到要完成的池中。你知道吗

在您的例子中,这将取决于您正在使用的真实场景,但是它们不能基于时间进行交换,而是基于您需要运行的函数。我不确定你需要哪一个,因为你用了一个假的例子。如果需要运行每个函数并且函数是自包含的,我猜您可以使用apply_asynch。如果函数可以在iterable上并行运行,则需要map_asynch。你知道吗

相关问题 更多 >