使用分布式调度器,我从许多二进制源文件中摄取数据,这些文件不适合Dask提供的方法(例如read_csv()
、read_parquet()
,等等),并且为每个二进制文件生成一个熊猫数据帧(在延迟修饰函数中)
在我使用Dask的幼年时期,我试图了解如何有效地将所有的pandas.dataframe
连接到一个dask.dataframe
中进行进一步处理。这个dask.dataframe
将比内存大,尽管在我的测试中,到目前为止,我使用的是减少的数据量
我的代码导致只有一个工作进程处于活动状态,并且这个过程需要很长的时间,尽管图形可视化似乎暗示了并行操作。我不明白为什么
import dask.dataframe as dd
def process_data_ddf(filenames):
narrowband_ddf_list = []
for f in filenames:
tdms_data = read_a_file(f)
narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
narrowband_ddf = dd.from_delayed(narrowband_df)
narrowband_ddf_list.append(narrowband_ddf)
narrowbands_ddf = dd.concat(narrowband_ddf_list)
return narrowbands_ddf
result = dask.compute(process_data_ddf(filenames))
我试图修改这段代码,以便只收集一组数据帧,并在最后调用pd.concat()
(下面的代码)。这样一来,所有员工都很活跃,流程很快就完成了,但我认为这不会很好地扩展
def process_data_df(filenames):
narrowband_df_list = []
for f in filenames:
tdms_data = read_a_file(f)
narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
narrowband_df_list.append(narrowband_df)
return narrowband_df_list
result = pd.concat(dask.compute(process_data_df(filenames))[0])
大多数文档似乎专注于使用类似于dd.read_csv('myfiles.*.csv')
的方法在导入时聚合数据。对于我的用例来说,最好的方法是什么
澄清:
calculate_narrowband()
和read_a_file()
有@dask.delayed
装饰符李>
我误解了
from_delayed()
的目的,现在意识到它可以接受delayed
的列表这在初始测试中表现良好:
相关问题 更多 >
编程相关推荐