PySpark在重新分区的批上应用函数

2024-09-27 07:27:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我在5000亿对上使用{}和{}进行{a1}。它对于我当前的集群设置来说太大了,因此,我想批量运行它

我想对数据进行分区,并在每个分区上迭代运行approxSimilarityJoin,这样我的集群就可以处理它

我目前的职能是:

matched_df = model.stages[-1].approxSimilarityJoin(df1, df2, 1.0, "confidence")

但是我被困在如何组合repartitionforeachPartitionapproxSimilarityJoin

我认为应该是这样的:

df1.repartition(100).foreachPartition(batch : model.stages[-1].approxSimilarityJoin(batch, df2, 1.0, "confidence"))

但是我有错误的语法。foreachPartition的正确语法是什么


Tags: 数据modela1batch语法集群批量df1
1条回答
网友
1楼 · 发布于 2024-09-27 07:27:09

我认为你不能用foreachParition实现这一点。 foreachParition接受一个将在执行器上运行的函数,并将实际数据传递给它,而不是数据帧(这是一个将触发处理的操作,如.collect或.write,而不仅仅是转换定义)。如果您想从这个传入集重新创建一个数据帧,这也不会起作用,因为工作进程本身没有可用的spark上下文。从概念上讲,dataframe不是一个表,而是一个转换的延迟计算定义

但是,您可以做的只是使用Spark分割df1。如果没有可以过滤数据帧的键,则可以使用randomSplit进行过滤,例如:

df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)

此操作的结果是数据帧列表

[DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string]]

您可以使用常规Python进行迭代

dfs = df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)
for df in dfs:
    matched_df = model.stages[-1].approxSimilarityJoin(df, df2, 1.0, "confidence")
    do_something_with(matched_df)

要以这种方式将数据集拆分为100个部分,可以生成权重元组:

df.randomSplit(tuple([0.01 for x in range (100)]), seed=42)

相关问题 更多 >

    热门问题