我在Streamset控制中心使用PySpark
但是我遇到了以下问题:ModuleNotFoundError: No module named 'numpy'
我正在使用docker映像运行StreamSets Transformer
我的代码:
# Import required libraries
from pyspark.ml.feature import VectorAssembler, StopWordsRemover, Tokenizer, CountVectorizer, IDF
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import FloatType
# Setup variables for convenience and readability
trainSplit = 0.8
testSplit = 0.2
maxIter = 10
regParam = 0.3
elasticNetParam = 0.8
numberOfCVFolds = 3
# The input dataframe is accessible via inputs[0]
df = inputs[0]
# Split dataset into "train" and "test" sets
(train, test) = df.randomSplit([trainSplit, testSplit], 42)
tokenizer = Tokenizer(inputCol="text",outputCol="tokenized")
stopWordsRemover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol="stopWordsRemoved")
countVectorizer = CountVectorizer(inputCol=stopWordsRemover.getOutputCol(),outputCol="countVectorized")
idf = IDF(inputCol=countVectorizer.getOutputCol(),outputCol="inverted")
# MUST for Spark features
assembler = VectorAssembler(inputCols=[idf.getOutputCol()], outputCol="features")
# LogisticRegression Model
lr = LogisticRegression(maxIter=maxIter, regParam=regParam, elasticNetParam=elasticNetParam)
# Setup pipeline -- pay attention to the order -- it matters!
pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, countVectorizer, idf, assembler, lr])
# Setup evaluator -- default is F1 score
classEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# Setup hyperparams grid
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam,[0.0]).addGrid(countVectorizer.vocabSize,[5000]).build()
# Setup cross validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=classEvaluator, numFolds=numberOfCVFolds)
# Fit model on "train" set
cvModel = cv.fit(train)
# Get the best model based on CrossValidator
model = cvModel.bestModel
# Run inference on "test" set
predictions = model.transform(test)
# Return accuracy as output dataframe
accuracy = classEvaluator.evaluate(predictions)
output = spark.createDataFrame([accuracy], FloatType()).withColumnRenamed("value","Accuracy")
错误消息:
An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/tmp/1628848435304-0/python_code_runner.py", line 55, in run
exec(code, context)
File "<string>", line 2, in <module>
File "/opt/spark/python/pyspark/ml/__init__.py", line 22, in <module>
from pyspark.ml.base import Estimator, Model, Transformer, UnaryTransformer
File "/opt/spark/python/pyspark/ml/base.py", line 24, in <module>
from pyspark.ml.param.shared import *
File "/opt/spark/python/pyspark/ml/param/__init__.py", line 26, in <module>
import numpy as np ModuleNotFoundError: No module named 'numpy'**
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/tmp/1628848435304-0/python_code_runner.py", line 55, in run
exec(code, context)
File "<string>", line 2, in <module>
File "/opt/spark/python/pyspark/ml/__init__.py", line 22, in <module>
from pyspark.ml.base import Estimator, Model, Transformer, UnaryTransformer
File "/opt/spark/python/pyspark/ml/base.py", line 24, in <module>
from pyspark.ml.param.shared import *
File "/opt/spark/python/pyspark/ml/param/__init__.py", line 26, in <module>
import numpy as np
ModuleNotFoundError: No module named 'numpy'
at py4j.Protocol.getReturnValue(Protocol.java:473)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy68.run(Unknown Source)
at com.streamsets.pipeline.spark.transform.py.PythonTransform.callPython(PythonTransform.scala:148)
at com.streamsets.pipeline.spark.transform.py.PythonTransform.transform(PythonTransform.scala:138)
at com.streamsets.datatransformer.api.operator.Transform.lambda$getOrTransform$0(Transform.java:29)
at com.streamsets.datatransformer.api.operator.Operator.generateDF(Operator.java:103)
at com.streamsets.datatransformer.api.operator.Operator.lambda$getOrCreate$0(Operator.java:99)
at java.util.Optional.orElseGet(Unknown Source)
at com.streamsets.datatransformer.api.operator.Operator.getOrCreate(Operator.java:99)
at com.streamsets.datatransformer.api.operator.Transform.getOrTransform(Transform.java:27)
at com.streamsets.datatransformer.api.spark.SparkTransform.getOrTransform(SparkTransform.java:39)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner$$anonfun$generateDataRefs$1.apply(BaseBatchDAGRunner.scala:638)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner$$anonfun$generateDataRefs$1.apply(BaseBatchDAGRunner.scala:609)
at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.generateDataRefs(BaseBatchDAGRunner.scala:609)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.run(BaseBatchDAGRunner.scala:564)
at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:95)
at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:55)
at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.startDataTransformerDagRunner(DataTransformerRunner.java:494)
at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.start(DataTransformerRunner.java:274)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:145)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
目前没有回答
相关问题 更多 >
编程相关推荐