Sparksql获取广播超时异常

2024-10-01 09:20:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我有下面的pyspark代码来聚合某个东西的计数

df_total_asin_count_stat = df_product_full_asin.agg(count(col("asin")).alias("totalAsinCount")).withColumn("batchId", lit(batch_name)).cache()
df_delta_asin_count_stat = df_product_delta_asin.agg(count(col("asin")).alias("deltaAsinCount")).withColumn("batchId", lit(batch_name)).cache()

然后我做了一个内部连接,刷新到数据库

df_stat = df_total_asin_count_stat.join(df_delta_asin_count_stat, "batchId", "inner").withColumn("rankType", lit(rank_type))
df_stat.write.jdbc(url=jdbc_url, table='"stats"', mode="append", properties=jdbc_properties)

我的问题是,如果我没有在两个agg函数后面使用cache(),为什么会出现“Could not execute broadcast in 300 secs”异常?但如果我只是在两个数据帧后面附加“cache()”,效果会很好。需要join的表实际上只有一行,广播不应该超过300秒

我知道这里的意思。我搞不懂的是cache()不会起任何作用,因为我只有一个“action”(write.jdbc),为什么spark只在这里添加cache()就给出了不同的结果


Tags: cachedfcountaliascolproductaggstat