如何从流式处理结果多处理.池到csv?

2024-09-29 22:03:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个python进程(2.7),它接受一个键,执行一系列计算并返回结果列表。这是一个非常简化的版本。在

我使用多重处理来创建线程,这样可以更快地处理。但是,我的生产数据有几百万行,而且每个循环都需要更长的时间才能完成。上一次我运行这个循环需要6分钟以上才能完成,而在开始时只需要一秒钟或更少的时间。我认为这是因为所有线程都在向resultset中添加结果,并且结果继续增长,直到它包含所有记录为止。在

是否可以使用多处理将每个线程(一个列表)的结果流式传输到csv或批处理结果集,以便在一定数量的行之后写入csv?在

如有任何其他关于加快或优化方法的建议,我们将不胜感激。在

import numpy as np
import pandas as pd
import csv
import os
import multiprocessing
from multiprocessing import Pool

global keys
keys = [1,2,3,4,5,6,7,8,9,10,11,12]

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    return test_list

if __name__ == "__main__":
    try:
        pool = Pool(processes=8)      
        resultset = pool.imap(key_loop,(key for key in keys) )

        loaddata = []
        for sublist in resultset:
            loaddata.append(sublist)

        with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
            writer = csv.writer(file)
            for listitem in loaddata:
                writer.writerow(listitem)
        file.close

        print "finished load"
    except:
        print 'There was a problem multithreading the key Pool'
        raise

Tags: csvkeyintestimport列表foras
3条回答

我敢打赌,使用appending同时处理大型结构会使它变慢。我通常做的是打开尽可能多的文件作为核心,并使用模来立即写入每个文件,这样流就不会引起麻烦,而不是将它们全部导入同一个文件(写入错误),也不会试图存储大量数据。也许不是最好的解决办法,但确实很简单。最后你只需合并回结果。在

在运行开始时定义:

num_cores = 8
file_sep = ","
outFiles = [open('out' + str(x) + ".csv", "a") for x in range(num_cores)]

然后在key_loop函数中:

^{pr2}$

之后,别忘了关闭:[x.close() for x in outFiles]

改进:

  • 迭代注释中提到的块。一次写/处理一行要比写块慢得多。

  • 处理错误(关闭文件)

  • 重要提示:我不确定“keys”变量的含义,但其中的数字不允许modulo确保每个进程都写入每个单独的流(12个键,modulo 8将使2个进程写入同一个文件)

以下非常简单的代码将许多工人的数据收集到一个CSV文件中。worker获取一个键并返回一个行列表。父进程一次处理多个密钥,使用多个工作线程。当每个键完成后,父级将输出行按顺序写入CSV文件。在

注意秩序。如果每个工作人员都直接写入CSV文件,那么他们将出现故障或相互踩踏。让每个工人写入自己的CSV文件会很快,但之后需要将所有数据文件合并在一起。在

来源

import csv, multiprocessing, sys

def worker(key):
    return [ [key, 0], [key+1, 1] ]


pool = multiprocessing.Pool()   # default 1 proc per CPU
writer = csv.writer(sys.stdout)

for resultset in pool.imap(worker, [1,2,3,4]):
    for row in resultset:
        writer.writerow(row)

输出

^{pr2}$

下面是一个综合了我和埃维的建议的答案

import numpy as np
import pandas as pd
import csv
from multiprocessing import Pool

keys = [1,2,3,4,5,6,7,8,9,10,11,12]

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    return test_list

if __name__ == "__main__":
    try:
        pool = Pool(processes=8)      
        resultset = pool.imap(key_loop, keys, chunksize=200)

        with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
            writer = csv.writer(file)
            for listitem in resultset:
                writer.writerow(listitem)

        print "finished load"
    except:
        print 'There was a problem multithreading the key Pool'
        raise

同样,这里的变化是

  1. 直接迭代resultset,而不是不必要地先将其复制到列表中。在
  2. 直接将keys列表提供给pool.imap,而不是从中创建生成器理解。在
  3. 提供大于默认值1的chunksize到{}。较大的chunksize减少了将keys内的值传递给池中的子进程所需的进程间通信成本,当keys非常大时,can give big performance boosts就可以了。您应该试验一下chunksize的不同值(尝试一些比200大得多的值,比如5000,等等),看看它如何影响性能。我在胡乱猜测200,不过肯定比1好。在

相关问题 更多 >

    热门问题