CSV加载到数据框中,文件名作为pyspark中的附加列

2024-10-04 07:35:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我正试图从一个包含csv文件的目录中创建一个数据框,但我想将数据框中每个文件的文件名保留为一个附加列,在pyspark上是否可以不使用pandas?,我还想从文件名中删除路径

from pyspark.sql.functions import input_file_name

df = spark.read.option("delimiter", "\t").csv(mount_point_input)
df_.withColumn("filename", input_file_name())

我尝试使用input_file_name(),但数据帧上的所有行都具有相同的文件名

输入:

False    2021-06-05T14:45:09     Server       True
True     2021-06-02T21:32:42     Server       True

输出:

+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-05T14:45:09     Server   True   /2021-06-02-general/c32d3f47.csv
+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-02T21:32:42     Server   True   /2021-06-02-general/c32d3f47.csv
+-----+-----------------------+-------+-------+--------------------------------+

预期产出:

+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-05T14:45:09     Server   True   c32d3f47.csv
+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-02T21:32:42     Server   True   c32d3f48.csv
+-----+-----------------------+-------+-------+--------------------------------+

Tags: 文件csv数据namefalsetruedfinput
1条回答
网友
1楼 · 发布于 2024-10-04 07:35:00

您可以在UDF中使用os.path.basename

>>> from pyspark.sql.functions import input_file_name,udf
>>> from pyspark.sql.types import StringType
>>> from os.path import basename
>>> 
>>> data = [("/home/user/test/File1.txt",10), 
...         ("/home/user/test/File2.txt",20), 
...         ("/home/user/test/File3.txt",30), 
...         ("/home/user/test/File4.txt",40),
...         ("/2021-06-02-general/c32d3f47.csv",50),
...         ("/2021-06-02-general/c32d3f47.csv",50)
...         ]
>>> 
>>> 
>>> cols = ["file_path","dummy_value"]
>>> testDF = spark.createDataFrame(data=data, schema = cols)
>>> 
>>> testDF.show(truncate=False)
+                +     -+
|file_path                       |dummy_value|
+                +     -+
|/home/user/test/File1.txt       |10         |
|/home/user/test/File2.txt       |20         |
|/home/user/test/File3.txt       |30         |
|/home/user/test/File4.txt       |40         |
|/2021-06-02-general/c32d3f47.csv|50         |
|/2021-06-02-general/c32d3f47.csv|50         |
+                +     -+

>>> 
>>> 
>>> @udf(StringType())
... def return_filename(inp):
...     if inp:
...       return basename(inp)
...     else:
...       return None
... 
>>> testDF = testDF.withColumn("file_name", return_filename('file_path'))
>>> testDF.show(truncate=False)
+                +     -+      +
|file_path                       |dummy_value|file_name   |
+                +     -+      +
|/home/user/test/File1.txt       |10         |File1.txt   |
|/home/user/test/File2.txt       |20         |File2.txt   |
|/home/user/test/File3.txt       |30         |File3.txt   |
|/home/user/test/File4.txt       |40         |File4.txt   |
|/2021-06-02-general/c32d3f47.csv|50         |c32d3f47.csv|
|/2021-06-02-general/c32d3f47.csv|50         |c32d3f47.csv|
+                +     -+      +

相关问题 更多 >