如何有效地连接多个计算数据帧?

2024-09-30 16:30:09 发布

您现在位置:Python中文网/ 问答频道 /正文

使用分布式调度器,我从许多二进制源文件中摄取数据,这些文件不适合Dask提供的方法(例如read_csv()read_parquet(),等等),并且为每个二进制文件生成一个熊猫数据帧(在延迟修饰函数中)

在我使用Dask的幼年时期,我试图了解如何有效地将所有的pandas.dataframe连接到一个dask.dataframe中进行进一步处理。这个dask.dataframe将比内存大,尽管在我的测试中,到目前为止,我使用的是减少的数据量

我的代码导致只有一个工作进程处于活动状态,并且这个过程需要很长的时间,尽管图形可视化似乎暗示了并行操作。我不明白为什么

import dask.dataframe as dd

def process_data_ddf(filenames):
    narrowband_ddf_list = []
    for f in filenames:
        tdms_data = read_a_file(f)
        narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
        narrowband_ddf = dd.from_delayed(narrowband_df)
        narrowband_ddf_list.append(narrowband_ddf)
    narrowbands_ddf = dd.concat(narrowband_ddf_list)
    return narrowbands_ddf

result = dask.compute(process_data_ddf(filenames))

我试图修改这段代码,以便只收集一组数据帧,并在最后调用pd.concat()(下面的代码)。这样一来,所有员工都很活跃,流程很快就完成了,但我认为这不会很好地扩展

def process_data_df(filenames):
    narrowband_df_list = []
    for f in filenames:
        tdms_data = read_a_file(f)
        narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
        narrowband_df_list.append(narrowband_df)
    return narrowband_df_list

result = pd.concat(dask.compute(process_data_df(filenames))[0])

process_data_ddf图: enter image description here

process_data_df图: enter image description here

大多数文档似乎专注于使用类似于dd.read_csv('myfiles.*.csv')的方法在导入时聚合数据。对于我的用例来说,最好的方法是什么

澄清:

  • calculate_narrowband()read_a_file()@dask.delayed装饰符
  • 我尝试连接的所有数据帧都有相同的列,并且没有重复的索引
  • 数据帧索引是日期时间,我不关心排序

Tags: csv数据方法dataframedfreaddataprocess
1条回答
网友
1楼 · 发布于 2024-09-30 16:30:09

我误解了from_delayed()的目的,现在意识到它可以接受delayed的列表

这在初始测试中表现良好:

def process_data_ddf(filenames):
    narrowband_df_list = []
    for f in filenames:
        tdms_data = read_a_file(f)
        narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
        narrowband_df_list.append(narrowband_df)
    narrowband_ddf = dd.from_delayed(narrowband_df_list)
    return narrowband_ddf

相关问题 更多 >