我写了以下内容MyPythonGateway.java这样我就可以从Python调用我的自定义java类:
public class MyPythonGateway {
public String findMyNum(String input) {
return MyUtiltity.parse(input).getMyNum();
}
public static void main(String[] args) {
GatewayServer server = new GatewayServer(new MyPythonGateway());
server.start();
}
}
下面是我如何在Python代码中使用它:
^{pr2}$现在我想使用PySpark中的MyPythonGateway.findMyNum()
函数,而不仅仅是一个独立的python脚本。我做了以下事情:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
print(myNum)
但是,我得到了以下错误:
... line 43, in main:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.
我错过了什么?在使用pyspark时,我不知道是否应该运行MyPythonGateway的单独JavaApplication来启动网关服务器。请指教。谢谢!在
下面正是我需要的:
input.map(f)
def f(row):
// call MyUtility.java
// x = MyUtility.parse(row).getMyNum()
// return x
最好的办法是什么?谢谢!在
首先,您看到的所有错误通常意味着您尝试使用的类不可访问。所以很可能是一个
CLASSPATH
问题。在关于总体思路,有两个重要问题:
SparkContext
,因此使用PySpark网关将无法工作(有关详细信息,请参见How to use Java/Scala function from an action or a transformation?))。如果您想从workers使用Py4J,那么必须在每个worker机器上启动一个单独的网关。在在开始调用方法之前-
您必须导入mypythongatewayjava类,如下所示
^{pr2}$指定包含myPackage.MyPythonGateway在spark submit中使用jars选项
例如,如果
input.map(f)
将输入作为RDD,这可能会起作用,因为您无法访问执行器中RDD映射函数的JVM变量(附加到spark上下文中)(据我所知,pyspark中没有@transient lazy val
的等价物)。在相关问题 更多 >
编程相关推荐