pyspark collect_set或collect_list with groupby

2024-06-02 11:36:24 发布

您现在位置:Python中文网/ 问答频道 /正文

如何在groupby之后的数据帧上使用collect_setcollect_list。例如:df.groupby('key').collect_set('values')。我得到一个错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'


Tags: 数据keynodfobject错误attributelist
2条回答

如果数据帧很大,可以尝试使用pandas udf(GROUPED_AGG)来避免内存错误。它也快得多。

Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with groupBy().agg() and pyspark.sql.Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window. pandas udf

示例:

import pyspark.sql.functions as F

@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
    return ', '.join(name)

grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))

你需要使用agg。示例:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

注意在上面你必须创建一个HiveContext。有关处理不同Spark版本的信息,请参见https://stackoverflow.com/a/35529093/690430

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

相关问题 更多 >