我正在运行一个Pyspark进程,它可以正常工作。该过程的第一步是将特定的UDF应用于数据帧。这就是功能:
import html2text
class Udfs(object):
def __init__(self):
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
self.h2t.ignore_images = True
def extract_text(self, raw_text):
try:
texto = self.h2t.handle(raw_text)
except:
texto = "PARSE HTML ERROR"
return texto
以下是我如何应用UDF:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs
udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))
它处理大约2900万行和300GB。问题是有些任务需要花费太多的时间来处理。任务的平均时间为:
其他任务已完成,持续时间超过1小时
但有些任务需要花费太多时间处理:
该进程在AWS中运行,具有100个节点的集群中的EMR,每个节点具有32gb的RAM和4个CPU。此外,还启用了火花投机
这些任务的问题在哪里? 这是UDF的问题吗? 这是一个线程问题
我在第一个数据帧中使用了repartitionByRange找到了解决方案。使用正确的id和分区数进行Usign,可以平衡每个分区中的行数
我的直觉是您使用了太多的分区。我会第一次尝试大幅减少他们的数量。你可以在这个主题上找到这个有趣的post
如果分区是平衡的,则按分区平均有
29 millions /80k partitions = 362
个观测值。我想这还不够。你花了很多时间安排任务,而不是执行任务如果没有平衡分区(请参见here),情况会变得更糟。这通常会造成瓶颈,这在您的案例中似乎会发生。有几个选项:
coalesce
将数据分配到较少的分区。这比使用repartition
要好,因为它避免了完全的混乱repartitionByRange
如果您希望基于某些列分割数据。您将不会像使用coalesce
或repartition
那样拥有平衡的分区,但如果使用后者,则会非常有用。您需要使用这些分割列的操作您可以使用
spark.sql.shuffle.partitions
和spark.default.parallelism
更改有关分区的默认值根据我的经验,这是一个猜测。找到分区的adequat数很难,但值得。请告诉我它是否有帮助,或者您是否仍然遇到瓶颈
相关问题 更多 >
编程相关推荐