如何解决洗牌时Dask数据帧过多使用硬盘(>>100GB)的问题

2024-07-07 07:56:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要计算每个大(15-20 GB)的CSV文件段的统计数字。我用Dask数据帧中的groupby()来做这个。你知道吗

问题是我需要自定义函数,因为我需要峰度和偏斜,它们不是Dask的一部分。因此我使用groupby().apply()。然而,这使得Dask在我的Temp目录中使用了大量的磁盘驱动器空间:仅运行一次脚本就超过150gb!这导致我的硬盘空间不足,使脚本崩溃。你知道吗

有没有一种方法可以重写代码,从而避免在我的Temp目录中写入大量垃圾?你知道吗

示例代码如下:

  • 示例1运行得比较快,不会生成大量的Temp输出,但不支持峰度或偏斜。你知道吗
  • 示例2也计算峰度和偏斜,但是如果我运行它来获取完整的数据集,它会填满我的硬盘。你知道吗

任何帮助都将不胜感激!你知道吗

顺便说一下:这个页面(https://docs.dask.org/en/latest/dataframe-groupby.html)建议为groupby()使用索引列。但不幸的是,Dask数据帧不支持多索引,所以这并不能解决我的问题。你知道吗

import dask.dataframe as dd
import numpy as np
import scipy.stats as sps

ddf = dd.read_csv('18_GB_csv_file.csv')

segmentations = { 'seg1' : ['col1', 'col2'],
                 'seg2' : ['col1', 'col2', 'col3', 'col4'],
                 'seg3' : ['col3', 'col4'],
                 'seg4' : ['col1', 'col2', 'col5']
               }
data_cols = [ 'datacol1', 'datacol2', 'datacol3' ]


# Example 1: this runs fast and doesn't generate needless temp output.
# But it does not support "kurt" or "skew":

dd_comp = {}
for seg_group, seg_cols in segmentations.items():
   df_grouped = df.groupby(seg_cols)[data_cols]
   dd_comp[seg_group] = df_grouped.aggregate( ['mean', 'std', 'min', 'max'])

with ProgressBar():
   segmented_stats = dd.compute(dd_comp)



# Example 2: includes also "kurt" and "skew". But it is painfully slow 
# and generates >150 GB of Temp output before running out of disk space

empty_segment = pd.DataFrame( index=data_cols,
                             columns=['mean', 'three_sigma',
                                      'min', 'max', 'kurt', 'skew']
                           )
def segment_statistics(segment):
   stats = empty_segment.copy()
   for col in data_cols:
       stats.loc[col]['mean'] = np.mean(segment[col])
       stats.loc[col]['std'] = np.std(segment[col])
       stats.loc[col]['min'] = np.min(segment[col])
       stats.loc[col]['max'] = np.max(segment[col])
       stats.loc[col]['skew'] = sps.skew(segment[col])
       stats.loc[col]['kurt'] = sps.kurtosis(segment[col]) + 3
   return stats

dd_comp = {}
for seg_group, seg_cols in segmentations.items():
   df_grouped = df.groupby(seg_cols)[data_cols]
   dd_comp[seg_group] = df_grouped.apply( segment_statistics,
                                          meta=empty_segment )

with ProgressBar():
   segmented_stats = dd.compute(dd_comp)

Tags: dfdatastatsnpsegmentcoltemploc