回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我一直试图过滤掉开头带有<strong>Id</strong>的标题行,并将<code>id number</code>作为列添加到正在处理的<strong>文件名</strong>中。下面是要处理的示例文件</p>
<p><strong>文件1:</strong></p>
<pre><code>#sample first line
#Id: abcdef
col1,col2,col3
1,2,3
2,3,3
4,5,6
</code></pre>
<p><strong>文件2:</strong></p>
^{pr2}$
<p>当我试图构造数据帧并打印结果时,我可以使用下面的代码片段将文件名添加为列。在</p>
<pre><code>par_df = spark.read.schema(schema) \
.option("header", "true") \
.format("com.databricks.spark.csv") \
.option("mode", "DROPMALFORMED")\
.csv("s3a://" + bucket "/"+prefix+"/").withColumn("FileName", func.input_file_name())
</code></pre>
<p>这将过滤掉标题信息,下面是打印结果的片段。在</p>
<pre><code>parsed_diff_df = par_df.select(
par_df['col1'],
par_df['col2'])
parsed_diff_df.registerTempTable("parsed_diff_df_table")
results = sqlContext.sql("select col1, col2, FileName from "
"parsed_diff_df_table").collect()
</code></pre>
<p>这是我得到的结果,无法追加Id列,因为它已经被过滤掉了。在</p>
<pre><code>1,2,3,File1
2,3,3,File1
4,5,6,File1
5,1,3,File2
2,5,8,File2
8,0,4,File2
</code></pre>
<p>预期结果如下。在</p>
<pre><code>1,2,3,abcdef,File1
2,3,3,abcdef,File1
4,5,6,abcdef,File1
5,1,3,ghjklo,File2
2,5,8,ghjklo,File2
8,0,4,ghjklo,File2
</code></pre>
<p>我也试过这个,但没有运气。在</p>
<pre><code> rdd = sc.textFile("s3a://" + bucket + "/"+prefix+"/").flatMap(lambda line: line.split("\n")).filter(lambda line: '#' in line)
results = rdd.collect()
for row in results:
print row
</code></pre>