pyspark中未定义的函数UDF?

2024-10-01 13:39:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个在Dataframe内部调用的UDF,但是我得到了未定义的UDF。在

global ac
ac = sc.accumulator(0)

def incrementAC():
  ac.add(1)
  return str(ac.value)

df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"])

df.withColumn("lang_and_rank", expr("concat(language,'blah')")).show()

+--------+----+-------------+
|language|rank|lang_and_rank|
+--------+----+-------------+
|    Java|  90|     Javablah|
|   Scala|  95|    Scalablah|
|   Spark|  92|    Sparkblah|
+--------+----+-------------+

myudf = udf(incrementAC,StringType())
df.withColumn("lang_and_rank", expr("concat(language,myudf())")).show()

.utils.AnalysisException: u'undefined function myudf;'

Tags: anddflangjavalanguageacsparksc
2条回答

必须注册要与expr一起使用的函数:

spark.udf.register("incrementAC", incrementAC)

另外,从转换中使用的accumualtors也不可靠。在

希望这有帮助!在

from pyspark.sql.functions import udf, expr, concat, col
from pyspark.sql.types import StringType

ac = sc.accumulator(0)

def incrementAC():
  ac.add(1)
  return str(ac)

#sample data
df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"])

方法1:

^{pr2}$

方法2:

#another solution if you want to use 'expr' (as rightly pointed out by @user9132725)
sqlContext.udf.register("myudf", incrementAC, StringType())
df = df.withColumn("lang_and_rank", expr("concat(language, myudf())"))
df.show()

输出为:

+    +  +      -+
|language|rank|lang_and_rank|
+    +  +      -+
|    Java|  90|        Java1|
|   Scala|  95|       Scala1|
|   Spark|  92|       Spark2|
+    +  +      -+

相关问题 更多 >