如何使用pyspark为1000个文件在csv文件中附加一个头值作为额外的列

2024-10-02 02:26:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直试图过滤掉开头带有Id的标题行,并将id number作为列添加到正在处理的文件名中。下面是要处理的示例文件

文件1:

#sample first line
#Id: abcdef
col1,col2,col3
1,2,3
2,3,3
4,5,6

文件2:

^{pr2}$

当我试图构造数据帧并打印结果时,我可以使用下面的代码片段将文件名添加为列。在

par_df = spark.read.schema(schema) \
                    .option("header", "true") \
                    .format("com.databricks.spark.csv") \
                    .option("mode", "DROPMALFORMED")\
                    .csv("s3a://" + bucket "/"+prefix+"/").withColumn("FileName", func.input_file_name())

这将过滤掉标题信息,下面是打印结果的片段。在

parsed_diff_df = par_df.select(
    par_df['col1'],
    par_df['col2'])    
parsed_diff_df.registerTempTable("parsed_diff_df_table")
results = sqlContext.sql("select col1, col2, FileName from "                        
                             "parsed_diff_df_table").collect()

这是我得到的结果,无法追加Id列,因为它已经被过滤掉了。在

1,2,3,File1
2,3,3,File1
4,5,6,File1
5,1,3,File2
2,5,8,File2
8,0,4,File2

预期结果如下。在

1,2,3,abcdef,File1
2,3,3,abcdef,File1
4,5,6,abcdef,File1
5,1,3,ghjklo,File2
2,5,8,ghjklo,File2
8,0,4,ghjklo,File2

我也试过这个,但没有运气。在

   rdd = sc.textFile("s3a://" + bucket + "/"+prefix+"/").flatMap(lambda line: line.split("\n")).filter(lambda line: '#' in line)

   results = rdd.collect()
   for row in results:
       print row

Tags: 文件id标题dflinediffparsedfile1
2条回答

您可以将每个文件的FileName映射到它的id

在 让我们编写一个函数来提取id值:

import re
def extract_id(l):
    return re.search('#Id: ([a-z]+)\\n', line).group(1)

让我们将文件作为RDD读取:

^{pr2}$

现在数据帧:

file_id_df = spark.createDataFrame(file_id, ["FileName", "id"])

现在可以将它与第一个数据帧连接起来

par_df.join(file_id_df, "FileName", "inner")

不使用csv加载程序,请执行以下步骤来实现此目的:

  • 使用将数据加载到配对rddsparkContext.wholeTextFiles. 在
  • 应用flatMapValues函数
    1. 使用新行“\n”拆分每条记录
    2. 从第一行获取id>;使用“:”拆分第一行,然后将拆分的第二部分作为id
    3. 跳过第二行,因为架构是预定义的。在
    4. 第3行到最后一行追加id
  • 将包含文件名和拆分值的映射函数skip key应用于各个列>;使用“,”拆分。在
  • 使用“col1,col2,col3”将RDD转换为数据集

我是java开发人员,不太熟悉Python,类似的东西可能会对您有所帮助:

pairRdd=sc.wholeTextFiles('<path>')

#it exactly wont work, make required changes:
def appendId( record ):
   splits = record.splitlines()
   id=splits[0].split(':')[1].strip()
   print(id)
   output=[]
   for s in xrange(2,len(splits)):
       output.append(splits[s]+','+id)
   return output
objRDD=pairRdd.flatMapValues(appendId)
.map(lambda key,val:val.split(','))
.map(lambda p:Row(col1=int(p[0]), col2=int(p[1])........FileName=p[3]))
dataframe=spark.createDataFrame(objRdd)
.....

等效Java:

^{pr2}$

相关问题 更多 >

    热门问题