使用pyspark,我正在将一系列不同的电子邮件从配置单元表写入HDFS中的平面文件。 限制条件是我的文件不应包含超过4.5M的行
我写了这段代码:
df = spark.read.table(working_table).select("email").distinct()
final_df = df.withColumn(
"rnk",
(F.row_number().over(Window.orderBy(F.lit(1))) / 4500000).cast("int")
)
final_df.groupBy("rnk").count().show()
+---+-------+
|rnk| count|
+---+-------+
| 0|4499999|
| 1|1858773|
+---+-------+
final_df.repartition("rnk").select("email").write.csv(
working_dir_email, mode="overwrite", compression="none"
)
这几乎可以正常工作,只是我总是得到一个额外的空文件:
hdfs dfs -ls /tmp/emails
Found 4 items
-rw-rw-r-- 3 hadoop hadoop 0 2020-02-05 17:59 /tmp/emails/_SUCCESS
-rw-rw-r-- 3 hadoop hadoop 0 2020-02-05 17:59 /tmp/emails/part-00000-f724cf35-5ed8-4b7a-98c2-fc992dca9ad1-c000.csv
-rw-rw-r-- 3 hadoop hadoop 173936230 2020-02-05 17:59 /tmp/emails/part-00043-f724cf35-5ed8-4b7a-98c2-fc992dca9ad1-c000.csv
-rw-rw-r-- 3 hadoop hadoop 292499935 2020-02-05 17:59 /tmp/emails/part-00191-f724cf35-5ed8-4b7a-98c2-fc992dca9ad1-c000.csv
我可以避免这个空白文件吗?有没有更聪明的方法可以在没有空文件的情况下实现这个结果
通过调用} 。
您可以通过以下方式轻松检查:
final_df.repartition("rnk")
,您没有指定分区数,因此Spark使用默认的分区数(200),请参见^{空文件对应于空分区
实现所需的另一种方法是计算数据帧中的行数,然后将其除以文件四舍五入所需的最大行数。这将为您提供要生成的文件数:
相关问题 更多 >
编程相关推荐