我正在为我的Pyspark项目编写我自己的变形金刚,但我遇到了一个问题:
如果我把变形金刚写在我要使用的模块/笔记本中,一切都会正常工作;例如:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol,
HasOutputCols, Param)
from pyspark.sql import (SparkSession, types, functions as funcs)
spark = SparkSession.builder.appName('my_session').getOrCreate()
# My Custom Transformer 1:
class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol='my_input', outputCol='my_output'):
super(MyTransformerOne, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol='my_input', outputCol='my_output'):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
# I need a little dataframe here to perform some tasks:
df = spark.createDataFrame(
[
{'col1': 1, 'col2': 'A'}, {'col1': 2, 'col2': 'B'}
],
schema = types.StructType([
types.StructField('col1', types.IntegerType(), True),
types.StructField('col2', types.StringType(), True),
])
)
pass # Lots of other things happen here... the little dataframe above
# is joined with the 'to be transformed' dataset and some columns
# are calculated.
return final_dataset
df = MyTransformerOne().fit(input_df).transform(input_df)
# This works Ok
我有7个这样的变形金刚,所以我想把它们存储在一个单独的模块中(我们称之为my_transformers.py
,我想:“好吧,我需要一个SparkSession对象来让它工作。。。所以让我们把它放在__init__
方法上。但是它不起作用:
"""
my_transformers.py
"""
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol,
HasOutputCols, Param)
from pyspark.sql import (types, functions as funcs)
class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, spark=None, inputCol='my_input', output_col='my_output'):
super(MyTransformerOne, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol='my_input', outputCol='my_output'):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
# Let's use the instance attribute to create the dataframe
df = self.spark.createDataframe(...)
# ... same as above
然后,在我的另一个模块/笔记本上:
import my_transformers
# ... Create a spark session, load the data, etcetera
df = my_transformers.MyTransformerOne().fit(input_df).transform(input_df)
此操作失败:
AttributeError: 'MyTransformerOne' object has no attribute 'spark'
我在这里迷路了。所以,我的问题是:
SparkSession
对象传递给自定义transformer对象吗?你知道吗你能给我指出正确的方向吗?你知道吗
结果比我想象的要容易!你知道吗
我发现了this answer:我可以在类中调用
SparkSession.builder.getOrCreate()
。一旦my_transformers
模块被导入,每次我需要使用Spark会话时,我只需要将该行添加到我的方法中。你知道吗所以,完整的代码是这样的:
我将把这个帖子留在这里,我将把我的问题标记为重复。你知道吗
相关问题 更多 >
编程相关推荐