我正在尝试将两个文件数据合并到一个rdd中。假设我有两个文件file1.txt
,一个是json格式的大文件,file2.txt
,一个是csv格式的小文件。你知道吗
file1.txt
(json格式)如下所示:
{
{"a":1 , "b":{"ba":1 , "bb":"babc", "bc":"babc2", "d":"babc3"}, "c":"abc2"},
{"a":2 , "b":{"ba":2 , "bb":"babc2", "bc":"babc22", "d":"babc32"}, "c":"abc22"}
}
file2.txt
(csv格式)如下所示:
key value
xyz xyz1
pqr pqr1
现在我希望我的输出是rdd,如下所示:
{
{"a":1 , "b":{"ba":1 , "bb":"babc", "bc":"babc2", "d":"babc3", "e":{{"key": "xyz", "value":"xyz1"},{"key": "pqr", "value":"pqr1"}}, "c":"abc2"},
{"a":2 , "b":{"ba":2 , "bb":"babc2", "bc":"babc22", "d":"babc32", "e":{{"key": "xyz", "value":"xyz1"},{"key": "pqr", "value":"pqr1"}}, "c":"abc22"}
}
我尝试的是将file2.txt
转换为json格式,然后执行如下操作:
output_rdd = file1_rdd.map(lambda x: joinfunc(x, file2_rdd))
然后试着写output_rdd
。你知道吗
但这给了我一些错误,比如
cPickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o72.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist
有没有建议把这两个文件合并成一个输出rdd?任何建议都会有帮助。你知道吗
附言:我对星火完全陌生。你知道吗
编辑:
我的joinfunc是这样的:
def joinfunc(file1_json, file2_json):
file1_detail = json.loads(file1_json)
file2_detail = json.loads(file2_json)
b_file1_detail_list = file1_detail["b"]
append_detail =[]
if b_file1_detail_list is not None:
for b_file1_detail in b_file1_detail_list:
append_detail = {"e" : file2_detail}
b_file1_detail.append(append_detail)
return json.dumps(dict(file1_detail))
目前没有回答
相关问题 更多 >
编程相关推荐