我正在用python spark中的wholeTextFiles()
读取一个.txt文件。我知道在读取wholeTextFiles()
之后,结果rdd的格式将是(文件路径、内容)。我有多个文件要读。我想从文件路径中剪切文件名,并将其保存到spark数据框中,并将文件名的一部分作为HDFS位置中的日期文件夹保存。但是在保存时,我没有得到相应的文件名。有什么办法吗?下面是我的代码
base_data = sc.wholeTextFiles("/user/nikhil/raw_data/")
data1 = base_data.map(lambda x : x[0]).flatMap(lambda x : x.split('/')).filter(lambda x : x.startswith('CH'))
data2=data1.flatMap(lambda x : x.split('F_')).filter(lambda x : x.startswith('2'))
print(data1.collect())
print(data2.collect())
df.repartition(1).write.mode('overwrite').parquet(outputLoc + "/xxxxx/" + data2)
logdf = sqlContext.createDataFrame(
[(data1, pstrt_time, pend_time, 'DeltaLoad Completed')],
["filename","process_start_time", "process_end_time", "status"])`
输出:
data1: ['CHNC_P0BCDNAF_20200217', 'CHNC_P0BCDNAF_20200227', 'CHNC_P0BCDNAF_20200615', 'CHNC_P0BCDNAF_20200925']
data2: ['20200217', '20200227', '20200615', '20200925']
这里有一个Scala版本,您可以轻松地将其转换为pyspark:
通过
.drop
或使用select
去除一些示例数据:去除标点符号、修剪空格等常用方面。你需要适应你的文件名情况当然,我看不出来
问题是你不能在已经分裂的事情上分裂
相关问题 更多 >
编程相关推荐