#!/usr/bin/python
import sys
for line in sys.stdin:
print line.upper()
火花代码:用于管道数据
import org.apaches.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("Pipe")
val sc = new SparkContext(conf)
val distScript = "/path/on/driver/PipeScript.py"
val distScriptName = "PipeScript.py"
sc.addFile(distScript)
val ipData = sc.parallelize(List("asd","xyz","zxcz","sdfsfd","Ssdfd","Sdfsf"))
val opData = ipData.pipe(SparkFiles.get(distScriptName))
opData.foreach(println)
您可以创建自己的自定义UDF,并可以在Spark应用程序代码中使用它。只有在可用的Spark本机函数做不到的情况下才使用自定义UDF。你知道吗
我不知道有什么进展_数据.py它需要什么样的投入,你期望从中得到什么。 如果你想让它对不同的应用程序代码可用。您可以执行以下操作:
您可以在python中创建一个类,并在其中使用一个函数进行处理,然后将其添加到spark应用程序代码中。你知道吗
在pyspark应用程序代码中导入类
创建一个对象来访问类及其函数
现在可以访问类函数来发送和接收参数。你知道吗
你需要在stacktrace中输入一些错误,以便社区能够快速回答你的问题。 为了在Scala代码中运行Python脚本(这就是我的假设),可以通过以下方式实现:
示例:
Python文件:将输入数据转换为大写的代码
火花代码:用于管道数据
相关问题 更多 >
编程相关推荐