将pyspark中的字符串列表转换为json对象列表

2024-10-03 00:24:14 发布

您现在位置:Python中文网/ 问答频道 /正文

def tranform_doc(docs):
    json_list = []
    print(docs)
    for doc in docs:
        json_doc = {}
        json_doc["customKey"] = doc
        json_list.append(json_doc)
    return json_list


df.groupBy("colA") \
            .agg(custom_udf(collect_list(col("colB"))).alias("customCol"))

First Hurdle:
Input: ["str1","str2","str3"]

Output: [{"customKey":"str1"},{"customKey":"str2"},{"customKey":"str3"}]

Second Hurdle:
columns in agg collect_list are changing dynamically. So, how to adjust schema dynamically.

当列表中的元素更改时,接收到错误 输入行没有架构所需的预期值数。需要1个字段,但提供3个值

我做了什么:

def tranform_doc(agg_docs):
    return json_list
## When I failed to get a list of JSON I tried just return the original list of strings to the list of json


schema = StructType([{StructField("col1",StringType()),StructField("col2",StringType()),StructField("col3",StringType())}])

custom_udf = udf(tranform_doc,schema)

df.groupBy("colA") \
            .agg(custom_udf(collect_list(col("colB"))).alias("customCol"))

我得到的输出: {“col2”:“str1”,“col1”:“str2”,“col3”:“str3”}

努力获取所需的JSON字符串列表,并使其与列表中的元素数量保持动态关系


Tags: tojsondocsdocreturncustomagglist
1条回答
网友
1楼 · 发布于 2024-10-03 00:24:14

不需要UDF。您可以将colB转换为struct之前的collect_list

import pyspark.sql.functions as F

df2 = df.groupBy('colA').agg(
    F.to_json(
        F.collect_list(
            F.struct(F.col('colB').alias('customKey'))
        )
    ).alias('output')
)

相关问题 更多 >