2024-10-02 06:25:56 发布
网友
下面是我输入的spark数据帧,有人可以帮助我获得所需的数据帧或至少是方法
我期望的输出数据帧:
OAOS-STP>;OAOS NONSP>;手册按优先顺序排列。 提前谢谢
您可以使用字典和user-defined function将每行的状态链接到表示状态顺序的整数,然后在unique-id上执行^{},聚合^{}状态顺序,最后从该状态顺序检索状态,如下所示:
unique-id
from pyspark.sql import functions as F from pyspark.sql.types import IntegerType status_dict = {'OAOS-STP': 1, 'OAOS-nonSTP': 2, 'manual': 3} @F.udf(returnType=IntegerType()) def get_status_order(status): return status_dict.get(status) data = input_df.withColumn('status_order', F.struct(get_status_order('status'), F.col('status'))) \ .groupBy('unique-id') \ .agg(F.min('status_order').alias('status')) \ .withColumn('status', F.col('status.status')) \ .orderBy('unique-id') # Optional
您可以使用窗口函数,通过对status的值进行排序,为每个唯一键选择最高值
PS:我用scala编写代码
import spark.implicits._ import org.apache.spark.sql.functions._ val df = Seq( (1, "OAOS-STP"), (1, "OAOS-nonSTP"), (1, "manual"), (2, "OAOS-nonSTP"), (2, "manual"), (3, "OAOS-STP"), (3, "OAOS-nonSTP"), (4, "OAOS-STP"), (4, "manual") ).toDF("unique-id", "status") import org.apache.spark.sql.expressions.Window val df2 = df.withColumn("lower_status", lower($"status")) val windowSpec = Window.partitionBy("unique-id").orderBy("status") val df3 = df2 .withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1) .drop("rank") .drop("lower_status")
df3.show(false)的输出将是
df3.show(false)
您可以使用字典和user-defined function将每行的状态链接到表示状态顺序的整数,然后在} ,聚合^{} 状态顺序,最后从该状态顺序检索状态,如下所示:
unique-id
上执行^{您可以使用窗口函数,通过对status的值进行排序,为每个唯一键选择最高值
PS:我用scala编写代码
df3.show(false)
的输出将是相关问题 更多 >
编程相关推荐