在PySpark中使用Scala对象时出错

2024-05-20 21:00:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我计划在Pyspark中使用Scala对象。以下是Scala中的代码

package za.co.absa.cobrix.spark.cobol.utils

import org.apache.spark.sql.{Column, DataFrame}
import scala.annotation.tailrec
import scala.collection.mutable

object SparkUtils {

  def flattenSchema(df: DataFrame, useShortFieldNames: Boolean = false): DataFrame = {
   val fields = new mutable.ListBuffer[Column]()
   val stringFields = new mutable.ListBuffer[String]()
   val usedNames = new mutable.HashSet[String]()
 }
}

Github链接:https://github.com/AbsaOSS/cobrix/blob/f95efdcd5f802b903404162313f5663bf5731a83/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala

我只是复制了FlatterSchema()方法的几行代码

Scala中的Spark代码:

import za.co.absa.cobrix.spark.cobol.utils.SparkUtils
val dfFlattened = SparkUtils.flattenSchema(df)

在spark submit中导入jar后,我尝试在PySpark中调用相同的FlatteSchema()方法

    dfflatten = DataFrame(sparkContext._jvm.za.co.absa.cobrix.spark.cobol.utils.SparkUtils.flattenSchema(df._jdf),sqlContext)

但正在获取错误消息:

df = sparkCont._jvm.za.co.absa.cobrix.spark.cobol.utils.SparkUtils.flattenSchema(df._jdf)
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p3544.1321029/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__   File
"/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p3544.1321029/lib/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 63, in deco   File
"/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p3544.1321029/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error
 occurred while calling
z:za.co.absa.cobrix.spark.cobol.utils.SparkUtils.flattenSchema. Trace:
py4j.Py4JException: Method flattenSchema([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:276)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

请帮忙


Tags: importdflibutilsjavaatsparkco
1条回答
网友
1楼 · 发布于 2024-05-20 21:00:00

您忘了在Scala中声明对象,以便Python部件可以找到它。大概是这样的:

package za.co.absa.cobrix.spark.cobol.utils

import org.apache.spark.sql.{Column, DataFrame}
import scala.annotation.tailrec
import scala.collection.mutable

object SparkUtils {
  def flattenSchema(df: DataFrame, useShortFieldNames: Boolean): DataFrame = {
   val fields = new mutable.ListBuffer[Column]()
   val stringFields = new mutable.ListBuffer[String]()
   val usedNames = new mutable.HashSet[String]()
  }
}

重要提示:也不要使用方法重载(或实际导致方法重载的默认参数或下面的其他技巧)。。。这将很难翻译(并且在Python端使用它)

注意:要克服缺少默认值的问题,只需从Python部分显式传递值,就完成了,在本例中,只需添加一个布尔值。此外,您可以在Python端创建默认值,这样更安全、更有用(特别是当您有很多调用点时)

相关问题 更多 >