我有一个python进程(2.7),它接受一个键,执行一系列计算并返回结果列表。这是一个非常简化的版本。在
我使用多重处理来创建线程,这样可以更快地处理。但是,我的生产数据有几百万行,而且每个循环都需要更长的时间才能完成。上一次我运行这个循环需要6分钟以上才能完成,而在开始时只需要一秒钟或更少的时间。我认为这是因为所有线程都在向resultset中添加结果,并且结果继续增长,直到它包含所有记录为止。在
是否可以使用多处理将每个线程(一个列表)的结果流式传输到csv或批处理结果集,以便在一定数量的行之后写入csv?在
如有任何其他关于加快或优化方法的建议,我们将不胜感激。在
import numpy as np
import pandas as pd
import csv
import os
import multiprocessing
from multiprocessing import Pool
global keys
keys = [1,2,3,4,5,6,7,8,9,10,11,12]
def key_loop(key):
test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
test_list = test_df.ix[0].tolist()
return test_list
if __name__ == "__main__":
try:
pool = Pool(processes=8)
resultset = pool.imap(key_loop,(key for key in keys) )
loaddata = []
for sublist in resultset:
loaddata.append(sublist)
with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
writer = csv.writer(file)
for listitem in loaddata:
writer.writerow(listitem)
file.close
print "finished load"
except:
print 'There was a problem multithreading the key Pool'
raise
我敢打赌,使用appending同时处理大型结构会使它变慢。我通常做的是打开尽可能多的文件作为核心,并使用模来立即写入每个文件,这样流就不会引起麻烦,而不是将它们全部导入同一个文件(写入错误),也不会试图存储大量数据。也许不是最好的解决办法,但确实很简单。最后你只需合并回结果。在
在运行开始时定义:
然后在key_loop函数中:
^{pr2}$之后,别忘了关闭:
[x.close() for x in outFiles]
改进:
迭代注释中提到的块。一次写/处理一行要比写块慢得多。
处理错误(关闭文件)
重要提示:我不确定“keys”变量的含义,但其中的数字不允许modulo确保每个进程都写入每个单独的流(12个键,modulo 8将使2个进程写入同一个文件)
以下非常简单的代码将许多工人的数据收集到一个CSV文件中。worker获取一个键并返回一个行列表。父进程一次处理多个密钥,使用多个工作线程。当每个键完成后,父级将输出行按顺序写入CSV文件。在
注意秩序。如果每个工作人员都直接写入CSV文件,那么他们将出现故障或相互踩踏。让每个工人写入自己的CSV文件会很快,但之后需要将所有数据文件合并在一起。在
来源
输出
^{pr2}$下面是一个综合了我和埃维的建议的答案
同样,这里的变化是
resultset
,而不是不必要地先将其复制到列表中。在keys
列表提供给pool.imap
,而不是从中创建生成器理解。在chunksize
到{chunksize
减少了将keys
内的值传递给池中的子进程所需的进程间通信成本,当keys
非常大时,can give big performance boosts就可以了。您应该试验一下chunksize
的不同值(尝试一些比200大得多的值,比如5000,等等),看看它如何影响性能。我在胡乱猜测200,不过肯定比1好。在相关问题 更多 >
编程相关推荐