我有一个pyspark代码示例,我试图在其中生成一个json结构。 下面是代码
def func(row):
temp=row.asDict()
headDict = {}
headDict['type'] = "record"
headDict['name'] = "source"
headDict['namespace'] = "com.streaming.event"
headDict['doc'] = "SCD signals from source"
fieldslist = []
headDict['fields'] = fieldslist
for i in temp:
fieldslist.append({i:temp[i]})
return (json.dumps(headDict))
if __name__ == "__main__":
spark = SparkSession.builder.master("local[*]").appName("PythonWordCount").getOrCreate()
payload=udf(func,StringType())
data = spark.createDataFrame(
[
(1, "a", 'foo1'), # create your data here, be consistent in the types.
(2, "b", 'bar'),
(3, "c", 'mnc')
],
['id', 'nm', 'txt'] # add your columns label here
)
df=data.withColumn("payload1",payload(struct([data[x] for x in data.columns])))
df.show(3,False)
将数据插入数据帧时出错
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple '{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"}' with StructType
如果我试图打印json有效负载,我会得到正确的输出
{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"}
我还验证了这是一个有效的json
我不确定我在这里错过了什么
这可能是python版本的问题吗?我使用的是python 2.7
更新—我尝试使用Python3.7运行完全相同的代码,现在运行良好
它在spark 3.x和python 2.7.x中适用于我
Pypark shell的结果
相关问题 更多 >
编程相关推荐