为什么PySpark任务花费了太多时间?

2024-10-04 05:26:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行一个Pyspark进程,它可以正常工作。该过程的第一步是将特定的UDF应用于数据帧。这就是功能:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = self.h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

以下是我如何应用UDF:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs

udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))

它处理大约2900万行和300GB。问题是有些任务需要花费太多的时间来处理。任务的平均时间为:

average times

其他任务已完成,持续时间超过1小时

但有些任务需要花费太多时间处理:

task time

该进程在AWS中运行,具有100个节点的集群中的EMR,每个节点具有32gb的RAM和4个CPU。此外,还启用了火花投机

这些任务的问题在哪里? 这是UDF的问题吗? 这是一个线程问题


Tags: textimportselfraw进程def时间extract
2条回答

我在第一个数据帧中使用了repartitionByRange找到了解决方案。使用正确的id和分区数进行Usign,可以平衡每个分区中的行数

我的直觉是您使用了太多的分区。我会第一次尝试大幅减少他们的数量。你可以在这个主题上找到这个有趣的post

如果分区是平衡的,则按分区平均有29 millions /80k partitions = 362个观测值。我想这还不够。你花了很多时间安排任务,而不是执行任务

如果没有平衡分区(请参见here),情况会变得更糟。这通常会造成瓶颈,这在您的案例中似乎会发生。有几个选项:

  • 您可以coalesce将数据分配到较少的分区。这比使用repartition要好,因为它避免了完全的混乱
  • repartitionByRange如果您希望基于某些列分割数据。您将不会像使用coalescerepartition那样拥有平衡的分区,但如果使用后者,则会非常有用。您需要使用这些分割列的操作

您可以使用spark.sql.shuffle.partitionsspark.default.parallelism更改有关分区的默认值

根据我的经验,这是一个猜测。找到分区的adequat数很难,但值得。请告诉我它是否有帮助,或者您是否仍然遇到瓶颈

相关问题 更多 >