使用自定义python文件的转换在Spark中有效吗?

2024-10-06 12:59:01 发布

您现在位置:Python中文网/ 问答频道 /正文

目前,我正在使用自定义映射器和还原器处理配置单元中的数据,如下所示:

select TRANSFORM(hostname,impressionId) using 'python process_data.py' as a,b from impressions

但是当我尝试在sparksql中应用相同的逻辑时,我得到了SparkSqlParser错误。 我想恢复process_data.py中的逻辑。有什么办法吗?你知道吗


Tags: 数据frompydataastransform逻辑process
2条回答

您可以创建自己的自定义UDF,并可以在Spark应用程序代码中使用它。只有在可用的Spark本机函数做不到的情况下才使用自定义UDF。你知道吗

我不知道有什么进展_数据.py它需要什么样的投入,你期望从中得到什么。 如果你想让它对不同的应用程序代码可用。您可以执行以下操作:

您可以在python中创建一个类,并在其中使用一个函数进行处理,然后将其添加到spark应用程序代码中。你知道吗

class MyClass:
    def __init__(self, args):
    …
    def MyFunction(self):

spark.sparkContext.addPyFile('/py file location/somecode.py')

在pyspark应用程序代码中导入类

from somecode import MyClass

创建一个对象来访问类及其函数

myobject = MyClass()

现在可以访问类函数来发送和接收参数。你知道吗

你需要在stacktrace中输入一些错误,以便社区能够快速回答你的问题。 为了在Scala代码中运行Python脚本(这就是我的假设),可以通过以下方式实现:

示例

Python文件:将输入数据转换为大写的代码

#!/usr/bin/python
import sys
for line in sys.stdin:
    print line.upper()

火花代码:用于管道数据

import org.apaches.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("Pipe")
val sc = new SparkContext(conf)
val distScript = "/path/on/driver/PipeScript.py"
val distScriptName = "PipeScript.py"
sc.addFile(distScript)
val ipData = sc.parallelize(List("asd","xyz","zxcz","sdfsfd","Ssdfd","Sdfsf"))
val opData = ipData.pipe(SparkFiles.get(distScriptName))
opData.foreach(println) 

相关问题 更多 >