pyspark:从pyspark调用自定义java函数。我需要Java网关吗?

2024-09-27 07:29:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我写了以下内容MyPythonGateway.java这样我就可以从Python调用我的自定义java类:

public class MyPythonGateway {

    public String findMyNum(String input) {
        return MyUtiltity.parse(input).getMyNum(); 
    }

    public static void main(String[] args) {
        GatewayServer server = new GatewayServer(new MyPythonGateway());
        server.start();
    }
}

下面是我如何在Python代码中使用它:

^{pr2}$

现在我想使用PySpark中的MyPythonGateway.findMyNum()函数,而不仅仅是一个独立的python脚本。我做了以下事情:

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
print(myNum)

但是,我得到了以下错误:

... line 43, in main:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
  File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.

我错过了什么?在使用pyspark时,我不知道是否应该运行MyPythonGateway的单独JavaApplication来启动网关服务器。请指教。谢谢!在


下面正是我需要的:

input.map(f)

def f(row):
   // call MyUtility.java 
   // x = MyUtility.parse(row).getMyNum()
   // return x

最好的办法是什么?谢谢!在


Tags: newinputstringreturnserverparsemainjava
3条回答

首先,您看到的所有错误通常意味着您尝试使用的类不可访问。所以很可能是一个CLASSPATH问题。在

关于总体思路,有两个重要问题:

  • 您不能在操作或转换中访问SparkContext,因此使用PySpark网关将无法工作(有关详细信息,请参见How to use Java/Scala function from an action or a transformation?))。如果您想从workers使用Py4J,那么必须在每个worker机器上启动一个单独的网关。在
  • 你真的不想用这种方式在Python和JVM之间传递数据。Py4J不是为数据密集型任务而设计的。在

在开始调用方法之前-

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")

您必须导入mypythongatewayjava类,如下所示

^{pr2}$

指定包含myPackage.MyPythonGateway在spark submit中使用jars选项

例如,如果input.map(f)将输入作为RDD,这可能会起作用,因为您无法访问执行器中RDD映射函数的JVM变量(附加到spark上下文中)(据我所知,pyspark中没有@transient lazy val的等价物)。在

def pythonGatewayIterator(iterator):
    results = []
    jvm = py4j.java_gateway.JavaGateway().jvm
    mygw = jvm.myPackage.MyPythonGateway()
    for value in iterator:
        results.append(mygw.findMyNum(value))
    return results


inputs.mapPartitions(pythonGatewayIterator)

相关问题 更多 >

    热门问题