Pandas到PySpark:将元组列表的列转换为每个元组项的单独列

2024-10-05 22:43:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要转换一个数据帧,其中一列包含一个元组列表,每个元组中的每个项都必须是单独的列。在

以下是熊猫的一个例子和解决方案:

import pandas as pd

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame.from_dict(df_dict)
print(df)  # intial structure

           a    d
    1   stuff   [(1, 2), (3, 4)]
    2   stuff2  [(1, 2), (3, 4)]

# first transformation, let's separate each list item into a new row
row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
print(row_breakdown)

            a        
    stuff   0    (1, 2)
            1    (3, 4)
    stuff2  0    (1, 2)
            1    (3, 4)
    dtype: object

row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
print(row_breakdown)

    a   0
    0   stuff   (1, 2)
    1   stuff   (3, 4)
    2   stuff2  (1, 2)
    3   stuff2  (3, 4)

# second transformation, let's get each tuple item into a separate column
row_breakdown.columns = ["a", "d"]
row_breakdown = row_breakdown["d"].apply(pd.Series)
row_breakdown.columns = ["value_1", "value_2"]
print(row_breakdown)

        value_1 value_2
    0   1   2
    1   3   4
    2   1   2
    3   3   4

这就是熊猫的解决方案。我需要能够做同样的事情,但是使用PySpark(2.3)。我已经开始着手研究,但很快就卡住了:

^{pr2}$

显然,Spark不支持索引。有什么建议都可以。在


Tags: columnsdfvalue解决方案dictrowpd元组
2条回答

这可能会:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)


exploded = ddf.withColumn('d', F.explode("d"))
exploded.show()

结果:

^{pr2}$

我觉得使用SQL来实现这一点比较舒服:

exploded.createOrReplaceTempView("exploded")
spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()

重要提示:之所以使用_1_2访问器,是因为spark将元组解析为一个结构,并给了它默认键。如果在实际实现中,数据帧包含array<int>,则应该使用[0]语法。在

最终结果是:

+   +   -+   -+
|     a|value_1|value_2|
+   +   -+   -+
| stuff|      1|      2|
| stuff|      3|      4|
|stuff2|      1|      2|
|stuff2|      3|      4|
+   +   -+   -+

更新

如果从具有以下架构的数据帧开始:

ddf.printSchema()
#root
# |  a: string (nullable = true)
# |  d: array (nullable = true)
# |    |  element: struct (containsNull = true)
# |    |    |  _1: long (nullable = true)
# |    |    |  _2: long (nullable = true)

必须使用pyspark.sql.functions.explode将数组分解为列,但之后可以使用*选择器将结构转换为列:

^{pr2}$

要重命名列,可以使用列表理解和str.replace

from pyspark.sql.functions import col

row_breakdown = row_breakdown.select(
    *[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
)
row_breakdown.show()
#+   +   +   +
#|     a|value1|value2|
#+   +   +   +
#| stuff|     1|     2|
#| stuff|     3|     4|
#|stuff2|     1|     2|
#|stuff2|     3|     4|
#+   +   +   +

原始答案

如果你从字典开始,你根本不需要为此使用pandas。在

相反,您可以直接从字典创建数据帧。关键是transform your dictionary into the appropriate format,然后使用它来构建Spark数据帧。在

在您的示例中,似乎根本没有使用a键下的值。在

正如Imentioned in my comment,您可以使用以下代码实现所述的输出:

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

from itertools import chain
row_breakdown = spark.createDataFrame(
    chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
)
row_breakdown.show()
#+   +   +
#|value1|value2|
#+   +   +
#|     1|     2|
#|     3|     4|
#|     1|     2|
#|     3|     4|
#+   +   +

如果您想要一个类似索引的列,可以通过简单地使用enumerate来实现,如下面的示例所示。在这里,我还按键对值进行排序,因为这似乎是您的意图。在

data = (
    (i,) + v for i, v in enumerate(
        chain.from_iterable(
            v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
        )
    )
)
columns = ["index", "value1", "value2"]
row_breakdown = spark.createDataFrame(data, columns)
row_breakdown.show()
#+  -+   +   +
#|index|value1|value2|
#+  -+   +   +
#|    0|     1|     2|
#|    1|     3|     4|
#|    2|     1|     2|
#|    3|     3|     4|
#+  -+   +   +

正如您在这里看到的,我们可以将一个生成器表达式传递给spark.createDataFrame,而且这个解决方案不需要我们提前知道元组的长度。在

相关问题 更多 >