python中的并行函数调用

2024-10-04 01:29:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我对python还很陌生,我一直在考虑编写下面的代码来并行调用,其中doj值的列表是在lambda的帮助下格式化的

m_df[['doj']] = m_df[['doj']].apply(lambda x: formatdoj(*x), axis=1)

def formatdoj(doj):
    doj = str(doj).split(" ")[0]
    doj = datetime.strptime(doj, '%Y' + "-" + '%m' + "-" + "%d")
    return doj

因为这个列表有数百万条记录,所以格式化这些记录所需的时间非常长。你知道吗

如何使python中的并行函数调用类似于平行Foreach在c#?你知道吗


Tags: lambda代码df列表datetimedef记录split
3条回答

我认为在你的例子中使用并行计算有点过分了。缓慢来自代码,而不是使用单个处理器。我将通过一些步骤向您展示如何使其更快,猜测您正在使用熊猫数据帧以及您的数据帧包含的内容(请遵循SO准则并提供一个完整的工作示例!!)你知道吗

在我的测试中,我使用了以下10万行的随机数据帧(向上扩展时间以获得您的案例):

N=int(1e5)
m_df = pd.DataFrame([['{}-{}-{}'.format(y,m,d)]
                        for y,m,d in zip(np.random.randint(2007,2019,N),
                        np.random.randint(1,13,N),
                        np.random.randint(1,28,N))],
                    columns=['doj'])

下面是您的代码:

tstart = time()
m_df[['doj']] = m_df[['doj']].apply(lambda x: formatdoj(*x), axis=1)
print("Done in {:.3f}s".format(time()-tstart))

在我的机器上,它大约运行5.1秒。它有几个问题。第一个是使用dataframes而不是series,尽管您只处理一列,并且创建了一个无用的lambda函数。简单地做:

m_df['doj'].apply(formatdoj)

将时间缩短到1.6秒。在python中,用“+”连接字符串也很慢,您可以将formatdoj更改为:

def faster_formatdoj(doj):
    return datetime.strptime(doj.split()[0], '%Y-%m-%d')
m_df['doj'] = m_df['doj'].apply(faster_formatdoj)

这不是一个很大的改进,但是确实减少了一点到1.5s。如果您需要真正地连接字符串(例如,它们不是固定的),那么使用'-'.join('%Y','%m','%d')会更快。你知道吗

但真正的瓶颈来自于使用日期时间.strtime很多次。这本质上是一个缓慢的命令-日期是一个庞大的事情。另一方面,如果你有数以百万计的日期,并且假设它们从人类开始就不是均匀分布的,那么它们很可能被大量复制。因此,以下是您应该如何真正做到这一点:

tstart = time()
# Create a new column with only the first word
m_df['doj_split'] = m_df['doj'].apply(lambda x: x.split()[0])
converter = {
    x: faster_formatdoj(x) for x in m_df['doj_split'].unique()
}
m_df['doj'] = m_df['doj_split'].apply(lambda x: converter[x])
# Drop the column we added
m_df.drop(['doj_split'], axis=1, inplace=True)
print("Done in {:.3f}s".format(time()-tstart))

这在0.2/0.3s左右工作,比原始实现快10倍多。你知道吗

在所有这些之后,如果运行速度仍然很慢,可以考虑并行工作(而不是分别并行第一条“split”指令和apply lambda部分,否则您将创建许多不同的“converter”字典来消除增益)。但我认为这是最后一步而不是第一步。。。你知道吗

[编辑]:最初在最后一个代码框的第一步中,我使用了m_df['doj_split'] = m_df['doj'].str.split().apply(lambda x: x[0]),这在功能上是等效的,但比m_df['doj_split'] = m_df['doj'].apply(lambda x: x.split()[0])慢一点。我不太清楚为什么,可能是因为它实际上应用了两个函数而不是一个。你知道吗

由于我不确定您的示例,我将使用multiprocessing库为您提供另一个示例:

# -*- coding: utf-8 -*-
import multiprocessing as mp

input_list = ["str1", "str2", "str3", "str4"]

def format_str(str_input):
    str_output = str_input + "_test"
    return str_output

if __name__ == '__main__':
    with mp.Pool(processes = 2) as p:
        result = p.map(format_str, input_list)

    print (result)

现在,假设您想映射一个具有多个参数的函数,那么应该使用starmap()

# -*- coding: utf-8 -*-
import multiprocessing as mp

input_list = ["str1", "str2", "str3", "str4"]

def format_str(str_input, i):
    str_output = str_input + "_test" + str(i)
    return str_output

if __name__ == '__main__':
    with mp.Pool(processes = 2) as p:
        result = p.starmap(format_str, [(input_list, i) for i in range(len(input_list))])

    print (result)

不要忘记将池放在if __name__ == '__main__':中,multiprocessing将无法在像spyder(或其他)这样的IDE中工作,因此您需要在cmd中运行脚本。你知道吗

要保留结果,您可以将它们保存到文件中,或者在Linux上使用os.system("pause")(Windows)或input()保持cmd在末尾打开。你知道吗

在python中使用多处理是一种相当简单的方法。你知道吗

最好的办法是使用dask。Dask有一个数据帧类型,您可以用它来创建一个类似的数据帧,但是,在执行compute函数时,您可以用num_worker参数指定核心数。这将使任务并行化

相关问题 更多 >