写入的文件多于分区号

2024-09-21 03:14:24 发布

您现在位置:Python中文网/ 问答频道 /正文

使用pyspark,我正在将一系列不同的电子邮件从配置单元表写入HDFS中的平面文件。 限制条件是我的文件不应包含超过4.5M的行

我写了这段代码:

df = spark.read.table(working_table).select("email").distinct()

final_df = df.withColumn(
    "rnk", 
    (F.row_number().over(Window.orderBy(F.lit(1))) / 4500000).cast("int")
)

final_df.groupBy("rnk").count().show()

+---+-------+                                                                   
|rnk|  count|
+---+-------+
|  0|4499999|
|  1|1858773|
+---+-------+


final_df.repartition("rnk").select("email").write.csv(
    working_dir_email, mode="overwrite", compression="none"
)

这几乎可以正常工作,只是我总是得到一个额外的空文件:

hdfs dfs -ls /tmp/emails
Found 4 items
-rw-rw-r--   3 hadoop hadoop          0 2020-02-05 17:59 /tmp/emails/_SUCCESS
-rw-rw-r--   3 hadoop hadoop          0 2020-02-05 17:59 /tmp/emails/part-00000-f724cf35-5ed8-4b7a-98c2-fc992dca9ad1-c000.csv
-rw-rw-r--   3 hadoop hadoop  173936230 2020-02-05 17:59 /tmp/emails/part-00043-f724cf35-5ed8-4b7a-98c2-fc992dca9ad1-c000.csv
-rw-rw-r--   3 hadoop hadoop  292499935 2020-02-05 17:59 /tmp/emails/part-00191-f724cf35-5ed8-4b7a-98c2-fc992dca9ad1-c000.csv

我可以避免这个空白文件吗?有没有更聪明的方法可以在没有空文件的情况下实现这个结果


Tags: 文件csvhadoopdfemailtabletmpfinal
1条回答
网友
1楼 · 发布于 2024-09-21 03:14:24

通过调用final_df.repartition("rnk"),您没有指定分区数,因此Spark使用默认的分区数(200),请参见^{}。 您可以通过以下方式轻松检查:

final_df = final_df.repartition("rnk")
print(final_df.rdd.getNumPartitions())

空文件对应于空分区

实现所需的另一种方法是计算数据帧中的行数,然后将其除以文件四舍五入所需的最大行数。这将为您提供要生成的文件数:

nb_lines_split = 4500000
nb_lines = df.count()

nb_files = math.ceil(nb_lines / nb_lines_split)

df.repartition(nb_files).write.csv(path, mode="overwrite")

相关问题 更多 >

    热门问题