在pyspark UDF内部使用类方法

2024-10-02 18:25:49 发布

您现在位置:Python中文网/ 问答频道 /正文

你好,数据工程师!在

我试图使用一个名为Astral的类中的方法来编写pyspark udf

以下是自定义项:

def time_from_solar_noon(d, y):
    noon = astral.Astral().solar_noon_utc
    time = noon(d, y)
    return time 

solarNoon = F.udf(lambda d, y: time_from_solar_noon(d,y), TimestampType())

现在,按照我的理解,这个类将被实例化为dataframe中的每一行,从而导致一个非常慢的工作。在

如果我从函数中取出类实例化:

^{pr2}$

我收到以下错误消息:

  [Previous line repeated 326 more times]
    RecursionError: maximum recursion depth exceeded while calling a Python object

所以这是我的问题,我认为应该有可能至少有一个类实例化由执行器/线程,而不是一行一行在我的数据帧,我该怎么做?在

谢谢你的帮助


Tags: 数据实例方法fromreturntimedefpyspark
1条回答
网友
1楼 · 发布于 2024-10-02 18:25:49

与数据库连接一样,通过使用mapPartitions,您只能实例化有限数量的此类实例:

In [1]: from datetime import date
   ...: from astral import Astral
   ...: 
   ...: df = spark.createDataFrame(
   ...:     ((date(2019, 10, 4), 0),
   ...:      (date(2019, 10, 4), 19)),
   ...:     schema=("date", "longitude"))
   ...: 
   ...: 
   ...: def solar_noon(rows):
   ...:     a = Astral()  # initialize the class once per partition
   ...:     return ((a.solar_noon_utc(date=r.date, longitude=r.longitude), *r)
   ...:             for r in rows)  # reuses the same Astral instance for all rows in this partition
   ...: 
   ...: 
   ...: (df.rdd
   ...:  .mapPartitions(solar_noon)
   ...:  .toDF(schema=("solar_noon_utc", *df.columns))
   ...:  .show()
   ...:  )
   ...: 
   ...:  
+         -+     +    -+                                      
|     solar_noon_utc|      date|longitude|
+         -+     +    -+
|2019-10-04 13:48:58|2019-10-04|        0|
|2019-10-04 12:32:58|2019-10-04|       19|
+         -+     +    -+

这是相当有效的,因为函数(solar_noon)被赋予每个工作线程,并且类在每个分区中只初始化一次,这可以容纳许多行。在

相关问题 更多 >