我想用一个名为get_scores_dataframe
的函数来处理file_list
中存储的大量csv文件。此函数接受另一个列表中存储的第二个参数phenotypes
。然后,该函数将结果写回csv文件。我设法使用ProcessPoolExecutor()
来并行化这个任务,因此,它是有效的
with concurrent.futures.ProcessPoolExecutor() as executor:
phenotypes = [phenotype for i in range(len(file_list))]
futures = executor.map(get_scores_dataframe, file_list, phenotypes,
chunksize=25)
filenames = executor.map(os.path.basename, file_list)
for future, filename in zip(futures, filenames):
futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
index = False)
如您所见,我正在为此使用上下文管理器,并且在上下文管理器中使用方法map()
,在该方法中我可以设置选项chunksize
。但是,我希望程序在完成处理每个数据帧时写入csv文件。上下文管理器似乎会等待所有作业完成,然后将结果写入csv文件
你知道我怎样才能做到这一点吗
首先,
executor.map
不返回Future
实例,因此变量futures
的名称不正确。它确实返回一个迭代器,该迭代器生成将get_scores_dataframe
依次应用于file_list
的每个元素的返回值。其次,看看接下来是如何使用的,这些返回值似乎是输入文件(可能与输入参数是同一个文件,也可能不是同一个文件,因为没有显示代码)。另外,使用进程池map
函数而不是内置的map
函数来获取文件名参数的基本名称似乎有些过分。最后,在代码中,它不是futures.to_csv
,而是future.to_csv
。所以我对你的代码是如何工作的感到困惑如果修改函数
get_scores_dataframe
以返回由dataframe和原始传递的文件名参数组成的元组,则我们可以使用as_competed
按完成顺序处理结果:现在,通过使用} (see manual) :
submit
,您将失去“分块”作业提交的能力。我们可以切换到使用multiprocessing.Pool
和imap_unordered
。但是imap_unordered
只能向辅助函数传递一个参数。因此,如果您能够修改worker以更改参数的顺序,我们可以将phenotype
作为第一个参数,并使用^{相关问题 更多 >
编程相关推荐