实例化时将SparkSession传递给自定义转换器

2024-09-30 02:30:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在为我的Pyspark项目编写我自己的变形金刚,但我遇到了一个问题:

如果我把变形金刚写在我要使用的模块/笔记本中,一切都会正常工作;例如:

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol, 
    HasOutputCols, Param)
from pyspark.sql import (SparkSession, types, functions as funcs)

spark = SparkSession.builder.appName('my_session').getOrCreate()

# My Custom Transformer 1:
class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol='my_input', outputCol='my_output'):
        super(MyTransformerOne, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol='my_input', outputCol='my_output'):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        # I need a little dataframe here to perform some tasks:
        df = spark.createDataFrame(
            [
                {'col1': 1, 'col2': 'A'}, {'col1': 2, 'col2': 'B'}
            ],
            schema = types.StructType([
                types.StructField('col1', types.IntegerType(), True),
                types.StructField('col2', types.StringType(), True),
            ])
        )
        pass # Lots of other things happen here... the little dataframe above
             # is joined with the 'to be transformed' dataset and some columns
             # are calculated.
        return final_dataset

df = MyTransformerOne().fit(input_df).transform(input_df)
# This works Ok

我有7个这样的变形金刚,所以我想把它们存储在一个单独的模块中(我们称之为my_transformers.py,我想:“好吧,我需要一个SparkSession对象来让它工作。。。所以让我们把它放在__init__方法上。但是它不起作用:

"""
my_transformers.py
"""

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol, 
    HasOutputCols, Param)
from pyspark.sql import (types, functions as funcs)

class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, spark=None, inputCol='my_input', output_col='my_output'):
        super(MyTransformerOne, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol='my_input', outputCol='my_output'):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        # Let's use the instance attribute to create the dataframe
        df = self.spark.createDataframe(...)
        # ... same as above

然后,在我的另一个模块/笔记本上:

import my_transformers

# ... Create a spark session, load the data, etcetera
df = my_transformers.MyTransformerOne().fit(input_df).transform(input_df)

此操作失败:

AttributeError: 'MyTransformerOne' object has no attribute 'spark'

我在这里迷路了。所以,我的问题是:

  1. 我可以将SparkSession对象传递给自定义transformer对象吗?你知道吗
  2. 如何做到这一点?我真的需要在transformer类中创建那些dataframes(在transformer类之外创建它们没有意义,因为它们不会用于任何其他任务)。你知道吗

你能给我指出正确的方向吗?你知道吗


Tags: fromimportselfonlydfinputmydef
1条回答
网友
1楼 · 发布于 2024-09-30 02:30:20

结果比我想象的要容易!你知道吗

我发现了this answer:我可以在类中调用SparkSession.builder.getOrCreate()。一旦my_transformers模块被导入,每次我需要使用Spark会话时,我只需要将该行添加到我的方法中。你知道吗

所以,完整的代码是这样的:

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol, 
    HasOutputCols, Param)
from pyspark.sql import (SparkSession, types, functions as funcs)

# My Custom Transformer 1:
class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol='my_input', outputCol='my_output'):
        super(MyTransformerOne, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol='my_input', outputCol='my_output'):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        # HERE! I get the active SparkSession.
        spark = SparkSession.builder.getOrCreate()

        # I need a little dataframe here to perform some tasks:
        df = spark.createDataFrame(
            [
                {'col1': 1, 'col2': 'A'}, {'col1': 2, 'col2': 'B'}
            ],
            schema = types.StructType([
                types.StructField('col1', types.IntegerType(), True),
                types.StructField('col2', types.StringType(), True),
            ])
        )
        pass # Lots of other things happen here... the little dataframe above
             # is joined with the 'to be transformed' dataset and some columns
             # are calculated.
        return final_dataset

df = MyTransformerOne().fit(input_df).transform(input_df)

我将把这个帖子留在这里,我将把我的问题标记为重复。你知道吗

相关问题 更多 >

    热门问题