有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spark 1.6:如何将Scala jar生成的RDD转换为pyspark RDD?

我试图创建一些POC代码,演示如何从PySpark调用Scala函数,从而得到PySpark。RDD

下面是Scala端的代码:

object PySpark extends Logger {

    def getTestRDD(sc: SparkContext): RDD[Int] = {
        sc.parallelize(List.range(1, 10))
    }

}

这就是我在PySpark端访问它所做的:

>>> foo = sc._jvm.com.clickfox.combinations.lab.PySpark
>>> jrdd = foo.getTestRDD(sc._jsc.sc())
>>> moo = RDD(jrdd, sc._jsc.sc())
>>> type(moo)
>>> <class 'pyspark.rdd.RDD'>

到目前为止还不错——我得到的反馈似乎是PySpark的一个实例。RDD。当我尝试使用RDD时,会出现以下问题:

>>> moo.take(1)
>>> Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 1267, in take
    totalParts = self.getNumPartitions()
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 356, in getNumPartitions
    return self._jrdd.partitions().size()
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o25.size. Trace:
py4j.Py4JException: Method size([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

我还尝试传递PySpark上下文,而不是Java上下文,以查看会发生什么:

>>> moo = RDD(jrdd, sc)
>>> moo.collect()
>>> Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 771, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o21.rdd. Trace:
py4j.Py4JException: Method rdd([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

还是不行。有没有办法从PySpark转换或至少访问Java RDD中的数据

EDIT我知道我可以将RDD转换为Java端的数组,并迭代生成的JavaArray对象,但如果可能的话,我希望避免这种情况


共 (1) 个答案

  1. # 1 楼答案

    what I get back appears to be an instance of PySpark.RDD.

    仅仅因为它是一个有效的PySpark RDD,并不意味着Python可以理解它的内容。您传递的是Java对象的RDD。对于内部转换,Spark使用Pyrolite在Python和JVM之间重新序列化对象

    这是一个内部API,但您可以:

    from pyspark.ml.common import _java2py
    
    rdd = _java2py(
        sc, sc._jvm.com.clickfox.combinations.lab.PySpark.getTestRDD(sc._jsc.sc()))
    

    注意,这种方法相当有限,只支持基本类型转换

    您还可以使用RDD替换为DataFrame

    object PySpark {
      def getTestDataFrame(sqlContext: SQLContext): DataFrame = {
        sqlContext.range(1, 10)
      }
    }
    
    from pyspark.sql.dataframe import DataFrame
    
    DataFrame(
       sc._jvm.com.clickfox.combinations.lab.PySpark.getTestDataFrame(
           sqlContext._jsqlContext),
       sqlContext)