java Spark 1.6:如何将Scala jar生成的RDD转换为pyspark RDD?
我试图创建一些POC代码,演示如何从PySpark调用Scala函数,从而得到PySpark。RDD
下面是Scala端的代码:
object PySpark extends Logger {
def getTestRDD(sc: SparkContext): RDD[Int] = {
sc.parallelize(List.range(1, 10))
}
}
这就是我在PySpark端访问它所做的:
>>> foo = sc._jvm.com.clickfox.combinations.lab.PySpark
>>> jrdd = foo.getTestRDD(sc._jsc.sc())
>>> moo = RDD(jrdd, sc._jsc.sc())
>>> type(moo)
>>> <class 'pyspark.rdd.RDD'>
到目前为止还不错——我得到的反馈似乎是PySpark的一个实例。RDD。当我尝试使用RDD时,会出现以下问题:
>>> moo.take(1)
>>> Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 1267, in take
totalParts = self.getNumPartitions()
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 356, in getNumPartitions
return self._jrdd.partitions().size()
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o25.size. Trace:
py4j.Py4JException: Method size([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
我还尝试传递PySpark上下文,而不是Java上下文,以查看会发生什么:
>>> moo = RDD(jrdd, sc)
>>> moo.collect()
>>> Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 771, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o21.rdd. Trace:
py4j.Py4JException: Method rdd([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
还是不行。有没有办法从PySpark转换或至少访问Java RDD中的数据
EDIT我知道我可以将RDD转换为Java端的数组,并迭代生成的JavaArray对象,但如果可能的话,我希望避免这种情况
# 1 楼答案
仅仅因为它是一个有效的PySpark RDD,并不意味着Python可以理解它的内容。您传递的是Java对象的RDD。对于内部转换,Spark使用Pyrolite在Python和JVM之间重新序列化对象
这是一个内部API,但您可以:
注意,这种方法相当有限,只支持基本类型转换
您还可以使用
RDD
替换为DataFrame
: