需要RDD的实例,但返回了类“pyspark.RDD.PipelinedRDD”

2024-09-24 00:27:58 发布

您现在位置:Python中文网/ 问答频道 /正文

嗨,我在笔记本里有这段代码,正在搜索python spark的代码:

 mydataNoSQL.createOrReplaceTempView("mytable")
 spark.sql("SELECT * from mytable")
 return mydataNoSQL

def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))

我需要一个实例RDD,但我正在获取一个类'pyspark.RDD.PipelinedRDD'

任何帮助都会好起来的。


Tags: 代码fromsqlreturndefmytable笔记本result
1条回答
网友
1楼 · 发布于 2024-09-24 00:27:58

pyspark.rdd.PipelinedRDDRDD的子类,它必须在RDD中定义所有API。即PipelinedRDD只是RDD的特殊类型,它是在RDD上运行映射函数时创建的。

例如,请看下面的代码片段。

>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD

所以你应该把你的pyspark.rdd.PipelinedRDD当作代码中的RDD

由于Python是动态类型语言,因此没有完全的casting支持。要强制将pyspark.rdd.PipelinedRDD转换为普通的RDD,可以在RDD上收集并将其并行化

>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>

如果RDD的数据很大,则在RDD上运行collect可能会导致MemoryError

相关问题 更多 >