我一直试图过滤掉开头带有Id的标题行,并将id number
作为列添加到正在处理的文件名中。下面是要处理的示例文件
文件1:
#sample first line
#Id: abcdef
col1,col2,col3
1,2,3
2,3,3
4,5,6
文件2:
^{pr2}$当我试图构造数据帧并打印结果时,我可以使用下面的代码片段将文件名添加为列。在
par_df = spark.read.schema(schema) \
.option("header", "true") \
.format("com.databricks.spark.csv") \
.option("mode", "DROPMALFORMED")\
.csv("s3a://" + bucket "/"+prefix+"/").withColumn("FileName", func.input_file_name())
这将过滤掉标题信息,下面是打印结果的片段。在
parsed_diff_df = par_df.select(
par_df['col1'],
par_df['col2'])
parsed_diff_df.registerTempTable("parsed_diff_df_table")
results = sqlContext.sql("select col1, col2, FileName from "
"parsed_diff_df_table").collect()
这是我得到的结果,无法追加Id列,因为它已经被过滤掉了。在
1,2,3,File1
2,3,3,File1
4,5,6,File1
5,1,3,File2
2,5,8,File2
8,0,4,File2
预期结果如下。在
1,2,3,abcdef,File1
2,3,3,abcdef,File1
4,5,6,abcdef,File1
5,1,3,ghjklo,File2
2,5,8,ghjklo,File2
8,0,4,ghjklo,File2
我也试过这个,但没有运气。在
rdd = sc.textFile("s3a://" + bucket + "/"+prefix+"/").flatMap(lambda line: line.split("\n")).filter(lambda line: '#' in line)
results = rdd.collect()
for row in results:
print row
您可以将每个文件的
FileName
映射到它的id
:在 让我们编写一个函数来提取id值:
让我们将文件作为RDD读取:
^{pr2}$现在数据帧:
现在可以将它与第一个数据帧连接起来
不使用csv加载程序,请执行以下步骤来实现此目的:
我是java开发人员,不太熟悉Python,类似的东西可能会对您有所帮助:
等效Java:
^{pr2}$相关问题 更多 >
编程相关推荐