Spark SQL Row_number()按排序Des分区

2024-09-29 01:19:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经在Spark using Window中成功地创建了一个row_number()partitionBy,但是希望通过降序而不是默认的升序来排序。这是我的工作代码:

from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window

data_cooccur.select("driver", "also_item", "unit_count", 
    F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count")).alias("rowNum")).show()

这给了我一个结果:

 +------+---------+----------+------+
 |driver|also_item|unit_count|rowNum|
 +------+---------+----------+------+
 |   s10|      s11|         1|     1|
 |   s10|      s13|         1|     2|
 |   s10|      s17|         1|     3|

在这里,我将desc()添加到降序:

data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count").desc()).alias("rowNum")).show()

并获取此错误:

AttributeError: 'WindowSpec' object has no attribute 'desc'

我在这里做错什么了?


Tags: fromimportsqldrivercountunitwindowitem
3条回答

或者可以使用Spark SQL中的SQL代码:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('Test')\
    .getOrCreate()

spark.sql("""
    select driver
        ,also_item
        ,unit_count
        ,ROW_NUMBER() OVER (PARTITION BY driver ORDER BY unit_count DESC) AS rowNum
    from data_cooccur
""").show()

desc应应用于非窗口定义的列。可以对列使用以下任一方法:

from pyspark.sql.functions import col  

F.rowNumber().over(Window.partitionBy("driver").orderBy(col("unit_count").desc())

或独立功能:

from pyspark.sql.functions import desc

F.rowNumber().over(Window.partitionBy("driver").orderBy(desc("unit_count"))

更新实际上,我试着对此进行了更多的研究,但似乎不起作用。(实际上它抛出了一个错误)。它不起作用的原因是我在Databricks中调用了display()中的代码(调用display()之后的代码永远不会运行)。似乎数据帧上的orderBy()window上的orderBy()实际上并不相同。我会把这个答案保留下来只是为了否定的确认

从PySpark 2.4开始(可能更早),只需在orderBy调用中添加关键字ascending=False就可以了。

例如

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))

以及

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))

似乎给了我同样的行为。

相关问题 更多 >