我有一张很大的桌子,在那里我的星火工作一直在崩溃。我想重新分配它。我有两个变量(id
,time
),其中我需要确保具有给定id
的所有行将被分配给同一个worker。但我有数亿个独特的ID我希望pyspark均匀地分布数据,但对于给定的ID,所有行都应该位于一个辅助进程上。我可以简单地执行以下操作:
df.repartition("id")
这个documentation似乎表明了这一点。但我想知道spark现在是否会将作业划分为数亿个子集,并且一次只向每个工作者发送一个子集(即一个id
的数据)。这当然是非常低效的
我使用的是Spark 2.4.0-cdh6.2.1
让我们使用
explain
来查看spark在调用repartition
时做了什么:Exchange hashpartitioning(id#0L, 200)
意味着将数据洗牌到200个分区中。行结束时所在的分区是通过执行id.hashCode() % 200
来确定的。如果您的数据没有偏差,那么分布应该相当均匀。200是spark.sql.shuffle.partitions
的默认值,它决定了洗牌后生成的分区数。要将该值更改为400,您可以更改配置的值be dospark.conf.set("spark.sql.shuffle.partitions", 400)
或dorepartition(400, "id")
。事实上,如果你有很多数据,200可能不够相关问题 更多 >
编程相关推荐