将平局设置为高的SAS排名转换为PySpark

2024-09-28 17:16:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在PySpark中复制以下SAS代码:

PROC RANK DATA = aud_baskets OUT = aud_baskets_ranks GROUPS=10 TIES=HIGH;
BY customer_id;
VAR expenditure;
RANKS basket_rank;
RUN;

这样做的目的是将所有支出按每个客户id块进行排序。数据如下所示:

+-----------+--------------+-----------+
|customer_id|transaction_id|expenditure|
+-----------+--------------+-----------+
|          A|             1|         34|
|          A|             2|         90|
|          B|             1|         89|
|          A|             3|          6|
|          B|             2|          8|
|          B|             3|          7|
|          C|             1|         96|
|          C|             2|          9|
+-----------+--------------+-----------+

在PySpark中,我尝试了以下方法:

spendWindow = Window.partitionBy('customer_id').orderBy(col('expenditure').asc())
aud_baskets = (aud_baskets_ranks.withColumn('basket_rank', ntile(10).over(spendWindow)))

问题是PySpark不允许用户改变处理领带的方式,就像SAS那样(据我所知)。我需要在PySpark中设置此行为,以便在每次出现一个边缘情况时将值上移到下一层,而不是将它们放到下面的级别

或者有没有一种方法可以定制这种方法


Tags: 方法代码idcustomerprocpysparksasrank
2条回答

使用密集等级,它将给出相同等级的情况下,领带和下一个等级将不会被跳过 ntile函数将每个分区中的记录组拆分为n个部分。你的情况是哪一个是10

from pyspark.sql.functions import dense_rank

spendWindow = Window.partitionBy('customer_id').orderBy(col('expenditure').asc())
aud_baskets = aud_baskets_ranks.withColumn('basket_rank',dense_rank.over(spendWindow))

请尝试以下代码。它是由一个称为链轮的自动化工具生成的。它应该注意领带

df = (aud_baskets)
for (colToRank,rankedName) in zip(['expenditure'],['basket_rank']): 
    wA = Window.orderBy(asc(colToRank))
    df_w_rank = (df.withColumn('raw_rank', rank().over(wA)))
    ties = df_w_rank.groupBy('raw_rank').count().filter("""count > 1""")
    df_w_rank = (df_w_rank.join(ties,['raw_rank'],'left').withColumn(rankedName,expr("""case when count is not null
                                                                                     then (raw_rank + count - 1) else
                                                                                     raw_rank end""")))
    rankedNameGroup = rankedName
    n = df_w_rank.count()
    df_with_rank_groups = (df_w_rank.withColumn(rankedNameGroup,expr("""FLOOR({rankedName}
                                                                     *{k}/({n}+1))""".format(k=10, n=n,
                                                                     rankedName=rankedName))))
    df = df_with_rank_groups
aud_baskets_ranks = df_with_rank_groups.drop('raw_rank', 'count')

相关问题 更多 >