从一对RDD将文件保存到HDFS中

2024-09-30 22:11:03 发布

您现在位置:Python中文网/ 问答频道 /正文

下面是我用HDFS编写的python脚本。RDD是一对RDD脚本工作正常,但它在HDFS.Is可以删除元组并在HDFS中创建逗号分隔的条目。在

    import sys
from pyspark import SparkContext

if len(sys.argv) < 2:
    print 'Insufficient arguments'
    sys.exit()

sc = SparkContext()
initialrdd1 = sc.textFile(sys.argv[1])
finalRDD1 = initialrdd1.map(lambda x:x.split(',')).map(lambda x :(x[1],x[0])).sortByKey()
print finalRDD1.getNumPartitions()
finalRDD1.saveAsTextFile('/export_dir/result3/')

HDFS中的文件存储格式如下

^{pr2}$

Tags: lambdaimport脚本mapissyshdfs元组
3条回答
finalRDD1 = initialrdd1.map(lambda x:x.split(',')).map(lambda x :(x[1],x[0])).sortByKey()

理解你的代码。在最初的RDD中,您将每个条目映射到一个元组。映射(λx:(x[1],x[0])

^{pr2}$

在sortByKey操作之后,直接将RDD保存为textfile。在

为了将条目另存为CSV,必须像这样显式地指定它-

def csv_format(data):
    return ','.join(str(d) for d in data)

# Rest of the code ...

finalRDD1.map(csv_format).saveAsTextFile('/export_dir/result3/')

我也有类似的问题。问题在于

map(lambda x: ','.join(str(s) for s in x)).saveAsTextFile(....)

它将把'join'保存为一个字符串,这将隐藏逗号,并且如果您计划使用像pandas df加载这样的分析,这可能是一个令人头痛的问题。所以你的绳子看起来像这样

^{pr2}$

简单的解决方案是在saveAsTextFile()之前插入另一个映射拆分

.map(lambda x: x.split(',')).saveAsTextFile(....)

最后的代码应该是这样的

finalRDD1.map(csv_format).map(lambda x: ','.join(str(s) for s in x)).map(lambda x: x.split(',')).saveAsTextFile('/export_dir/result3/')

现在你的csv看起来像这样

[ 'Alpha', 'E03'] 
['Beta', 'E02']
 .....
 .....

为什么不先将元组映射到字符串,然后保存它呢

finalRDD1.map(lambda x: ','.join(str(s) for s in x)).saveAsTextFile('/export_dir/result3/')

相关问题 更多 >