在PySp中使用toDF()函数将RDD转换为Dataframe时的奇怪行为

2024-09-21 11:36:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我是新来的火花。当我使用toDF()函数将RDD转换为dataframe时,它似乎计算了我以前编写的所有转换函数,如map()。我想知道PySpark中的toDF()是转换还是操作。在

我创建了一个简单的RDD,并使用一个简单的函数输出它的值,仅用于测试,并在map()之后使用toDF()。结果似乎部分运行了map中的函数。当我显示dataframe的结果时,toDF()就像转换一样,再次输出结果。在

>>> a = sc.parallelize([(1,),(2,),(3,)])
>>> def f(x):
...     print(x[0])
...     return (x[0] + 1, )
...
>>> b = a.map(f).toDF(["id"])
2
1
>>> b = a.map(f).toDF(["id"]).show()
2
1
1
2
3
+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+

有人能告诉我为什么PySpark中的toDF()函数既像动作又像转换?谢谢。在

PS:在Scala中,toDF在我的例子中类似于转换。在


Tags: 函数idmapdataframereturndefshowpyspark
1条回答
网友
1楼 · 发布于 2024-09-21 11:36:57

这并不奇怪。由于您没有提供模式,Spark必须根据数据进行推断。如果RDD是一个输入,它将调用^{},然后调用{a2},如果{}丢失,will evaluate up to 100 row

first = rdd.first()
if not first:
    raise ValueError("The first row in RDD is empty, "
                     "can not infer schema")
if type(first) is dict:
    warnings.warn("Using RDD of dict to inferSchema is deprecated. "
                  "Use pyspark.sql.Row instead")


if samplingRatio is None:
    schema = _infer_schema(first, names=names)
    if _has_nulltype(schema):
        for row in rdd.take(100)[1:]:
            schema = _merge_type(schema, _infer_schema(row, names=names))
            if not _has_nulltype(schema):
                break
        else:
            raise ValueError("Some of types cannot be determined by the "
                             "first 100 rows, please try again with sampling")

现在剩下的唯一的谜题是为什么它不能精确计算一个记录。毕竟在您的例子中,first不是空的,也不包含None。在

这是因为first是通过take实现的,并且不能保证计算的项目的确切数量。如果第一个分区没有产生所需数量的项,它将迭代地增加要扫描的分区数。有关详细信息,请查看the implementation。在

如果要避免这种情况,则应使用createDataFrame,并以DDL字符串的形式提供模式:

^{pr2}$

或等效的StructType。在

在Scala中没有类似的行为,因为它在toDF中没有使用模式推理。它要么从Encoder(使用Scala反射获取)检索相应的模式,要么根本不允许转换。最相似的行为是对CSV or JSON这样的输入源进行推理:

spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))

相关问题 更多 >

    热门问题