def tranform_doc(docs):
json_list = []
print(docs)
for doc in docs:
json_doc = {}
json_doc["customKey"] = doc
json_list.append(json_doc)
return json_list
df.groupBy("colA") \
.agg(custom_udf(collect_list(col("colB"))).alias("customCol"))
First Hurdle:
Input: ["str1","str2","str3"]
Output: [{"customKey":"str1"},{"customKey":"str2"},{"customKey":"str3"}]
Second Hurdle:
columns in agg collect_list are changing dynamically. So, how to adjust schema dynamically.
当列表中的元素更改时,接收到错误 输入行没有架构所需的预期值数。需要1个字段,但提供3个值
我做了什么:
def tranform_doc(agg_docs):
return json_list
## When I failed to get a list of JSON I tried just return the original list of strings to the list of json
schema = StructType([{StructField("col1",StringType()),StructField("col2",StringType()),StructField("col3",StringType())}])
custom_udf = udf(tranform_doc,schema)
df.groupBy("colA") \
.agg(custom_udf(collect_list(col("colB"))).alias("customCol"))
我得到的输出: {“col2”:“str1”,“col1”:“str2”,“col3”:“str3”}
努力获取所需的JSON字符串列表,并使其与列表中的元素数量保持动态关系
不需要UDF。您可以将
colB
转换为struct
之前的collect_list
相关问题 更多 >
编程相关推荐