Python多处理返回以chunksize设置的结果

2024-09-29 23:32:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用一个名为get_scores_dataframe的函数来处理file_list中存储的大量csv文件。此函数接受另一个列表中存储的第二个参数phenotypes。然后,该函数将结果写回csv文件。我设法使用ProcessPoolExecutor()来并行化这个任务,因此,它是有效的

   with concurrent.futures.ProcessPoolExecutor() as executor:
        phenotypes = [phenotype for i in range(len(file_list))]
        futures = executor.map(get_scores_dataframe, file_list, phenotypes,
                                    chunksize=25)
        filenames = executor.map(os.path.basename, file_list)
        for future, filename  in zip(futures, filenames):
                futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
                              index = False)

如您所见,我正在为此使用上下文管理器,并且在上下文管理器中使用方法map(),在该方法中我可以设置选项chunksize。但是,我希望程序在完成处理每个数据帧时写入csv文件。上下文管理器似乎会等待所有作业完成,然后将结果写入csv文件

你知道我怎样才能做到这一点吗


Tags: 文件csvpath函数mapdataframe管理器get
1条回答
网友
1楼 · 发布于 2024-09-29 23:32:19

首先,executor.map不返回Future实例,因此变量futures的名称不正确。它确实返回一个迭代器,该迭代器生成将get_scores_dataframe依次应用于file_list的每个元素的返回值。其次,看看接下来是如何使用的,这些返回值似乎是输入文件(可能与输入参数是同一个文件,也可能不是同一个文件,因为没有显示代码)。另外,使用进程池map函数而不是内置的map函数来获取文件名参数的基本名称似乎有些过分。最后,在代码中,它不是futures.to_csv,而是future.to_csv。所以我对你的代码是如何工作的感到困惑

如果修改函数get_scores_dataframe以返回由dataframe和原始传递的文件名参数组成的元组,则我们可以使用as_competed按完成顺序处理结果:

from concurrent.futures import as_completed
import multiprocessing

with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count() - 1) as executor:
    futures = [executor.submit(get_scores_dataframe, file, phenotype) for file in file_list]
    for future in as_completed(futures):
        # it is assumed return value is tuple: (data frame, original filename argument):
        df, file = future.result()
        csv_filename = os.path.basename(file)
        df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)

现在,通过使用submit,您将失去“分块”作业提交的能力。我们可以切换到使用multiprocessing.Poolimap_unordered。但是imap_unordered只能向辅助函数传递一个参数。因此,如果您能够修改worker以更改参数的顺序,我们可以将phenotype作为第一个参数,并使用^{} (see manual)

import multiprocessing
from functools import partial


POOL_SIZE = multiprocessing.cpu_count() - 1 # leave 1 for main process


def compute_chunksize(iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
    if extra:
        chunksize += 1
    return chunksize


with multiprocessing.Pool(POOL_SIZE) as pool:
    chunksize = compute_chunksize(len(file_list))
    worker = partial(get_scores_dataframe, phenotype)
    # it is assumed that start_processing returns a tuple: (data frame, original filename argument)
    for df, file in pool.imap_unordered(worker, file_list, chunksize):
        csv_filename = os.path.basename(file)
        df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)

相关问题 更多 >

    热门问题