PySpark将两个或多个数据帧与条件组合

2024-10-05 14:32:09 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有多个spark dataframesdf1,df2,df3,模式如下

--- X (float)
--- Y (float)
--- id (String)

现在我想把它们合并在一起

  • 如果df1.X == df2.X and df1.Y == df2.Y then concat(df1.id,df2.id)
  • 将其作为单行放入结果df
  • 否则将这两个列作为不同的行放入结果df

有没有一种方法可以在pyspark中使用联接或lambda来实现这一点


Tags: and方法iddfstring模式floatspark
1条回答
网友
1楼 · 发布于 2024-10-05 14:32:09

试试这个-

加载测试数据

 val df1 = spark.range(4).withColumn("x", row_number().over(Window.orderBy("id")) * lit(1f))
    df1.show(false)
    /**
      * + -+ -+
      * |id |x  |
      * + -+ -+
      * |0  |1.0|
      * |1  |2.0|
      * |2  |3.0|
      * |3  |4.0|
      * + -+ -+
      */
    val df2 = spark.range(2).withColumn("x", row_number().over(Window.orderBy("id")) * lit(1f))
    df2.show(false)
    /**
      * + -+ -+
      * |id |x  |
      * + -+ -+
      * |0  |1.0|
      * |1  |2.0|
      * + -+ -+
      */

合并常见记录和不常见记录

    val inner = df1.join(df2, Seq("x"))
      .select(
        $"x", concat(df1("id"), df2("id")).as("id")
      )
    val commonPlusUncommon =
      df1.join(df2, Seq("x"), "leftanti")
        .unionByName(
          df2.join(df1, Seq("x"), "leftanti")
        ).unionByName(inner)
    commonPlusUncommon.show(false)

    /**
      * + -+ -+
      * |x  |id |
      * + -+ -+
      * |3.0|2  |
      * |4.0|3  |
      * |1.0|00 |
      * |2.0|11 |
      * + -+ -+
      */

也可以使用完全外部联接

 df1.join(df2, Seq("x"), "full")
      .select(
        $"x",
        concat(coalesce(df1("id"), lit("")), coalesce(df2("id"), lit(""))).as("id")
      )
      .show(false)

    /**
      * + -+ -+
      * |x  |id |
      * + -+ -+
      * |1.0|00 |
      * |2.0|11 |
      * |3.0|2  |
      * |4.0|3  |
      * + -+ -+
      */

相关问题 更多 >