pyspark中的Groupby数据帧和过滤器

2024-10-02 06:25:56 发布

您现在位置:Python中文网/ 问答频道 /正文

下面是我输入的spark数据帧,有人可以帮助我获得所需的数据帧或至少是方法

^{tb1}$

我期望的输出数据帧:

^{tb2}$

OAOS-STP>;OAOS NONSP>;手册按优先顺序排列。 提前谢谢


Tags: 数据方法gt手册sparkstptb2tb1
2条回答

您可以使用字典和user-defined function将每行的状态链接到表示状态顺序的整数,然后在unique-id上执行^{},聚合^{}状态顺序,最后从该状态顺序检索状态,如下所示:

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType


status_dict = {'OAOS-STP': 1, 'OAOS-nonSTP': 2, 'manual': 3}


@F.udf(returnType=IntegerType())
def get_status_order(status):
    return status_dict.get(status)


data = input_df.withColumn('status_order', F.struct(get_status_order('status'), F.col('status'))) \
    .groupBy('unique-id') \
    .agg(F.min('status_order').alias('status')) \
    .withColumn('status', F.col('status.status')) \
    .orderBy('unique-id')  # Optional

您可以使用窗口函数,通过对status的值进行排序,为每个唯一键选择最高值

PS:我用scala编写代码

import spark.implicits._
import org.apache.spark.sql.functions._
val df = Seq(
  (1, "OAOS-STP"),
  (1, "OAOS-nonSTP"),
  (1, "manual"),
  (2, "OAOS-nonSTP"),
  (2, "manual"),
  (3, "OAOS-STP"),
  (3, "OAOS-nonSTP"),
  (4, "OAOS-STP"),
  (4, "manual")
).toDF("unique-id", "status")

import org.apache.spark.sql.expressions.Window
val df2 = df.withColumn("lower_status", lower($"status"))
val windowSpec  = Window.partitionBy("unique-id").orderBy("status")

val df3 = df2
.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1)
.drop("rank")
.drop("lower_status")

df3.show(false)的输出将是

output

相关问题 更多 >

    热门问题