创建多个dask。具有列表理解的聚合函数将重复相同的计算

2024-06-26 10:25:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在尝试计算大型dask数据帧上的一些聚合统计数据。所需的统计信息包括最小值、最大值、平均值和分位数。ddf的结构使ddf.grouby()函数基于多个列。为了获得准确的分位数值,对ddf进行了分区,以匹配groupby级别

当所有分位数函数都用一个名称定义时,下面的代码似乎工作得很好

当试图通过迭代分位数值列表来定义dd.聚合函数时,就会出现问题(注意,最初的尝试将分位数列表作为函数输入)。以这种方式创建函数后,所有计算的分位数值都会重复。即所有第50、80和95%的文件都是相同的值。当打印这些函数时,它们都有一个不同的“序列号”,以便更好地使用术语

顺便说一句,由于我调用compute的次数,这段代码需要将近1小时才能运行。已尝试将所有DDF放入dask.delayed对象,但出现内存错误。如果您能帮助提高效率,我们将不胜感激

def ddf_stats_calcs(ddf,group=['Param','Run','condition'],fs=[min,max,da.mean],fnms=['min','max','mean'],quantiles=True):


  if quantiles!=False:
    
    prts= dask.delayed(ddf.iloc[:,0].count()//ddf.loc[:,'datetime'].unique().count()).compute()
    print(prts)
    if ddf.npartitions!=prts: ddf.repartition(npartitions=prts)
    
    quants=[0.8,0.5,0.95]

    #fs.extend([dd.Aggregation(f'prnct_{str(quant*100)}',lambda x: x.quantile(quant),lambda x0: x0.quantile(quant)) for quant in quantiles])
    #fnms.extend(f'{str(quant*100)}th%ile' for quant in quantiles )
   # print(f'Functions being calculated are {fs}')

    prcnt80=dd.Aggregation('prcnt80',lambda x: x.quantile(0.8),lambda x0: x0.quantile(0.8))
    prcnt50=dd.Aggregation('prcnt50',lambda x: x.quantile(),lambda x0: x0.quantile())
    prcnt95=dd.Aggregation('prcnt50',lambda x: x.quantile(0.95),lambda x0: x0.quantile(0.95))
    fnms.extend(f'{str(quant*100)}th%ile' for quant in quants )
    fs.extend([prcnt80,prcnt50,prcnt95])

  dict_stats={}
  for nm, fun in zip(fnms,fs) :

    dict_stats[nm]=ddf[[col for col in ddf.columns if col!='datetime']].groupby(group).agg(fun).reset_index().melt(id_vars=group).compute()



  df_stats=pd.concat(dict_stats)

  return df_stats

'''


Tags: lambda函数inforstatsfsddquant