pyspark rdd将多个json文件数据合并到一个rdd中

2024-10-01 09:18:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将两个文件数据合并到一个rdd中。假设我有两个文件file1.txt,一个是json格式的大文件,file2.txt,一个是csv格式的小文件。你知道吗

file1.txt(json格式)如下所示:

{

{"a":1 , "b":{"ba":1 , "bb":"babc", "bc":"babc2", "d":"babc3"}, "c":"abc2"}, 

{"a":2 , "b":{"ba":2 , "bb":"babc2", "bc":"babc22", "d":"babc32"}, "c":"abc22"}

}

file2.txt(csv格式)如下所示:

key   value

xyz   xyz1

pqr   pqr1

现在我希望我的输出是rdd,如下所示:

{

{"a":1 , "b":{"ba":1 , "bb":"babc", "bc":"babc2", "d":"babc3", "e":{{"key": "xyz", "value":"xyz1"},{"key": "pqr", "value":"pqr1"}}, "c":"abc2"}, 

{"a":2 , "b":{"ba":2 , "bb":"babc2", "bc":"babc22", "d":"babc32", "e":{{"key": "xyz", "value":"xyz1"},{"key": "pqr", "value":"pqr1"}}, "c":"abc22"}

}

我尝试的是将file2.txt转换为json格式,然后执行如下操作:

output_rdd = file1_rdd.map(lambda x: joinfunc(x, file2_rdd))

然后试着写output_rdd。你知道吗

但这给了我一些错误,比如

cPickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o72.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist

有没有建议把这两个文件合并成一个输出rdd?任何建议都会有帮助。你知道吗

附言:我对星火完全陌生。你知道吗

编辑:

我的joinfunc是这样的:

def joinfunc(file1_json, file2_json):

    file1_detail = json.loads(file1_json)
    file2_detail = json.loads(file2_json)

    b_file1_detail_list = file1_detail["b"]
    append_detail =[]

    if b_file1_detail_list is not None:
        for b_file1_detail in b_file1_detail_list:
            append_detail = {"e" : file2_detail}
            b_file1_detail.append(append_detail)

    return json.dumps(dict(file1_detail))

Tags: 文件keytxtjsonvalue格式file1file2