我在pyspark中运行了jpype代码,但它在JFloat
中出错,在下面的示例中,它成功运行:
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext().getOrCreate(conf=conf)
rdd = sc.parallelize([(1.0, float(i)) for i in range(10)])
jvm_path = getDefaultJVMPath()
startJVM(jvm_path, "-ea", "-Djava.class.path=spark-1.0-SNAPSHOT.jar", convertStrings=False)
print(JClass("com.littlely.AddNum")().add(JFloat(100), JFloat(200)))
shutdownJVM()
def inner_map(p):
startJVM(jvm_path, "-ea", "-Djava.class.path=spark-1.0-SNAPSHOT.jar", convertStrings=False)
for r in p:
# JClass("com.littlely.AddNum").main([str(r[0]), str(r[1])])
print(JClass("com.littlely.AddNum")().add(r[0], r[1]))
shutdownJVM()
rdd.foreachPartition(inner_map)
但是当我用print(JClass("com.littlely.AddNum")().add(JFloat(r[0]), JFloat(r[1])))
替换print(JClass("com.littlely.AddNum")().add(r[0], r[1]))
时,它出错了:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads return pickle.loads(obj, encoding=encoding)
TypeError: __new__() takes 3 positional arguments but 4 were given
怎么了
目前没有回答
相关问题 更多 >
编程相关推荐