ModuleNotFoundError:使用PySpark时,Streamsets Transformer中没有名为“numpy”的模块

2024-10-03 23:24:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Streamset控制中心使用PySpark 但是我遇到了以下问题:ModuleNotFoundError: No module named 'numpy' 我正在使用docker映像运行StreamSets Transformer

我的代码:

# Import required libraries
from pyspark.ml.feature import VectorAssembler, StopWordsRemover, Tokenizer, CountVectorizer, IDF
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import FloatType


# Setup variables for convenience and readability 
trainSplit = 0.8
testSplit = 0.2
maxIter = 10
regParam = 0.3
elasticNetParam = 0.8
numberOfCVFolds = 3

# The input dataframe is accessible via inputs[0]
df = inputs[0]

# Split dataset into "train" and "test" sets
(train, test) = df.randomSplit([trainSplit, testSplit], 42) 

tokenizer = Tokenizer(inputCol="text",outputCol="tokenized")
stopWordsRemover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol="stopWordsRemoved")
countVectorizer = CountVectorizer(inputCol=stopWordsRemover.getOutputCol(),outputCol="countVectorized")
idf = IDF(inputCol=countVectorizer.getOutputCol(),outputCol="inverted")

# MUST for Spark features
assembler = VectorAssembler(inputCols=[idf.getOutputCol()], outputCol="features")

# LogisticRegression Model
lr = LogisticRegression(maxIter=maxIter, regParam=regParam, elasticNetParam=elasticNetParam)

# Setup pipeline -- pay attention to the order -- it matters!
pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, countVectorizer, idf, assembler, lr])

# Setup evaluator -- default is F1 score
classEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# Setup hyperparams grid
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam,[0.0]).addGrid(countVectorizer.vocabSize,[5000]).build()

# Setup cross validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=classEvaluator, numFolds=numberOfCVFolds) 

# Fit model on "train" set
cvModel = cv.fit(train)

# Get the best model based on CrossValidator
model = cvModel.bestModel

# Run inference on "test" set
predictions = model.transform(test)

# Return accuracy as output dataframe
accuracy = classEvaluator.evaluate(predictions)
output = spark.createDataFrame([accuracy], FloatType()).withColumnRenamed("value","Accuracy")

错误消息:

An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params) 
  File "/tmp/1628848435304-0/python_code_runner.py", line 55, in run
    exec(code, context)
  File "<string>", line 2, in <module> 
  File "/opt/spark/python/pyspark/ml/__init__.py", line 22, in <module>
    from pyspark.ml.base import Estimator, Model, Transformer, UnaryTransformer 
  File "/opt/spark/python/pyspark/ml/base.py", line 24, in <module>
    from pyspark.ml.param.shared import *
  File "/opt/spark/python/pyspark/ml/param/__init__.py", line 26, in <module>
    import numpy as np ModuleNotFoundError: No module named 'numpy'** 


py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/tmp/1628848435304-0/python_code_runner.py", line 55, in run
    exec(code, context)
  File "<string>", line 2, in <module>
  File "/opt/spark/python/pyspark/ml/__init__.py", line 22, in <module>
    from pyspark.ml.base import Estimator, Model, Transformer, UnaryTransformer
  File "/opt/spark/python/pyspark/ml/base.py", line 24, in <module>
    from pyspark.ml.param.shared import *
  File "/opt/spark/python/pyspark/ml/param/__init__.py", line 26, in <module>
    import numpy as np
ModuleNotFoundError: No module named 'numpy'

    at py4j.Protocol.getReturnValue(Protocol.java:473)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy68.run(Unknown Source)
    at com.streamsets.pipeline.spark.transform.py.PythonTransform.callPython(PythonTransform.scala:148)
    at com.streamsets.pipeline.spark.transform.py.PythonTransform.transform(PythonTransform.scala:138)
    at com.streamsets.datatransformer.api.operator.Transform.lambda$getOrTransform$0(Transform.java:29)
    at com.streamsets.datatransformer.api.operator.Operator.generateDF(Operator.java:103)
    at com.streamsets.datatransformer.api.operator.Operator.lambda$getOrCreate$0(Operator.java:99)
    at java.util.Optional.orElseGet(Unknown Source)
    at com.streamsets.datatransformer.api.operator.Operator.getOrCreate(Operator.java:99)
    at com.streamsets.datatransformer.api.operator.Transform.getOrTransform(Transform.java:27)
    at com.streamsets.datatransformer.api.spark.SparkTransform.getOrTransform(SparkTransform.java:39)
    at com.streamsets.datatransformer.dag.BaseBatchDAGRunner$$anonfun$generateDataRefs$1.apply(BaseBatchDAGRunner.scala:638)
    at com.streamsets.datatransformer.dag.BaseBatchDAGRunner$$anonfun$generateDataRefs$1.apply(BaseBatchDAGRunner.scala:609)
    at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
    at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.generateDataRefs(BaseBatchDAGRunner.scala:609)
    at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.run(BaseBatchDAGRunner.scala:564)
    at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:95)
    at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:55)
    at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.startDataTransformerDagRunner(DataTransformerRunner.java:494)
    at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.start(DataTransformerRunner.java:274)
    at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:145)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Tags: infrompyimportcomlinejavaml