我有以下自定义函数在pandas数据帧中进行聚合,我想在pyspark中做同样的事情:
def custom_aggregation_pyspark(x,queries):
names={}
for k, v in regles_calcul.items():
plus = x.query(v["plus_credit"])['OBNETCRE'].sum() + x.query(v["plus_debit"])['OBNETDEB'].sum()
minus = x.query(v["minus_credit"])['OBNETCRE'].sum() + x.query(v["minus_debit"])['OBNETDEB'].sum()
names[k]= plus-minus
return pd.Series(names, index=list(names.keys()))
df = df.groupby(['LBUDG']).apply(custom_aggregation_pandas, queries ).sum()
WARE查询是一个类似查询的字典
{'first_queries': {
'plus_credit': "classe_compte_rg2 in ('237', '238')",
'plus_debit': "classe_compte_rg2 in ('237', '238')",
'minus_credit': "classe_compte_rg2 in ('237', '238')",
'minus_debit': "classe_compte_rg1 in ('20', '21', '23')"
}
}
因此,我用pyspark'sql'替换了pandas“query”
def custom_aggregation_pyspark(x,queries):
x.createOrReplaceTempView("df")
names={}
for k , v in queries.items():
plus = spark.sql("SELECT * FROM df WHERE "+ v["plus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["plus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect()
minus= spark.sql("SELECT * FROM df WHERE "+ v["minus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["minus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect()
names[k]= plus-minus
return pd.Series(names, index=list(names.keys()))
df.groupby("LBUDG").agg(custom_aggregation_pyspark(df,queries))
我肯定是走错了方向,因为上面的代码不起作用,你能告诉我应该去哪里看吗
所需的输出是一个按LBUDG
(字符串)分组的表,其他列使用自定义聚合函数
编辑数据帧示例:
预期产出:
其中agg1(例如)对应于OBNETCRE - OBNETDEB
的和,其中classe_compte_rg1
有值,为10或11
您可以使用
epxr
来计算queries
dict中传递的条件,并使用条件聚合来计算总和。下面是一个与您在《熊猫》中给出的示例相同的示例:相关问题 更多 >
编程相关推荐