使用wholeTextFiles处理.txt文件&希望提取文件名

2024-09-28 01:24:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在用python spark中的wholeTextFiles()读取一个.txt文件。我知道在读取wholeTextFiles()之后,结果rdd的格式将是(文件路径、内容)。我有多个文件要读。我想从文件路径中剪切文件名,并将其保存到spark数据框中,并将文件名的一部分作为HDFS位置中的日期文件夹保存。但是在保存时,我没有得到相应的文件名。有什么办法吗?下面是我的代码

base_data = sc.wholeTextFiles("/user/nikhil/raw_data/")

data1 = base_data.map(lambda x : x[0]).flatMap(lambda x : x.split('/')).filter(lambda x : x.startswith('CH'))

data2=data1.flatMap(lambda x : x.split('F_')).filter(lambda x : x.startswith('2'))

print(data1.collect())

print(data2.collect())

df.repartition(1).write.mode('overwrite').parquet(outputLoc + "/xxxxx/" + data2)

logdf = sqlContext.createDataFrame(
    [(data1, pstrt_time, pend_time, 'DeltaLoad Completed')],
    ["filename","process_start_time", "process_end_time", "status"])`

输出:

data1: ['CHNC_P0BCDNAF_20200217', 'CHNC_P0BCDNAF_20200227', 'CHNC_P0BCDNAF_20200615', 'CHNC_P0BCDNAF_20200925']

data2: ['20200217', '20200227', '20200615', '20200925']

Tags: 文件lambda路径databasetime文件名spark
1条回答
网友
1楼 · 发布于 2024-09-28 01:24:46

这里有一个Scala版本,您可以轻松地将其转换为pyspark:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType 

val files = sc.wholeTextFiles("/FileStore/tables/*ZZ.txt",0) 
val res1 = files.map(line => (line._1, line._2.split("\n").flatMap(x => x.split(" ")) )).map(elem => {(elem._1, elem._2) })
val res2 = res1.flatMap {
  case (x, y) => {
    y.map(z => (x, z))
}}
val res3 = res2.map(line => (line._1, line._1.split("/")(3), line._2))
val df = res3.toDF()
val df2 = df.withColumn("s", split($"_1", "/"))
            .withColumn("f1", $"s"(3)) 
            .withColumn("f2", $"f1".cast(StringType)) // avoid issues with split subsequently
            .withColumn("filename", substring_index(col("f2"), ".", 1))
df2.show(false)
df2.repartition($"filename").write.mode("overwrite").parquet("my_parquet") // default 200 and add partitionBy as well for good measure on your `write`.

通过.drop或使用select去除一些示例数据:

+                +    -+   -+                  -+    -+    -+    +
|_1                              |_2       |_3     |s                                    |f1       |f2       |filename|
+                +    -+   -+                  -+    -+    -+    +
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww   |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww   |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|rrr    |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|       |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|4445

...

去除标点符号、修剪空格等常用方面。你需要适应你的文件名情况当然,我看不出来

问题是你不能在已经分裂的事情上分裂

相关问题 更多 >

    热门问题