将键/值对的Pyspark RDD解析为.csv格式

2024-09-18 01:49:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在构建一个解析器,它接受“key”=“value”对的原始文本文件,并使用PySpark写入tablar/.csv结构。在

我遇到的困难是,我可以访问函数中的键和值来构造每个csv_row,甚至检查这些键是否等于预期键的列表(col_list),但当我在lambda中调用该函数processCsv时,我不知道如何将每个csv_row附加到列表的全局列表l_of_l中,该列表旨在保存.csv行的最终列表。在

如何以键/值格式迭代RDD的每条记录并将其解析为.csv格式?如您所见,我最后的列表列表(l_of_l)是空的,但是我可以获得循环中的每一行。。。令人沮丧的。在

感谢所有建议!在

原始文本结构(食品日志)公司名称:

"A"="foo","B"="bar","C"="baz"
"A"="oof","B"="rab","C"="zab"
"A"="aaa","B"="bbb","C"="zzz"

目前的方法:

^{pr2}$

输出:

Records pre-transform:
[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab'], [u'A=aaa', u'B=bbb', u'C=zzz']]
------------------------------

[u'foo', u'bar', u'baz']
[u'oof', u'rab', u'zab']
[u'aaa', u'bbb', u'zzz']

Final list of lists:
[]

Tags: ofcsv函数列表foobarbaz结构
1条回答
网友
1楼 · 发布于 2024-09-18 01:49:34

尝试此功能:

def processRecord(record, col_list):    
    csv_row=list()
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            # print 'Key-to-Column Mismatch, dropping value.'
    return csv_row

然后呢

^{pr2}$

应该给予

Final list of lists: 
[[u'foo', u'bar', u'baz'], [u'oof', u'rab', u'zab'], [u'aaa', u'bbb', u'zzz']]

相关问题 更多 >