pyspark如何在分层随机抽样中使用(df.sampleByKey())选择每个层的确切记录数

2024-09-29 19:22:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个spark数据帧(我正在使用pyspark)“订单”。它有以下几列

['id', 'orderdate', 'customerid', 'status']

我正在尝试使用关键列作为“状态”进行分层随机抽样。我的目标如下

>> create a new dataframe with exactly 5 random records per status

所以我选择的方法是使用.sampleBy('strata_key',{fraction_dict})。但我所面临的挑战是为每个状态选择精确的分数值,这样每次我都应该得到每个状态正好5个随机记录。我遵循了下面的方法

1.为每个状态的总计数创建了一个字典,如下所示

#Total count of records for each order 'status' in 'ORDERS' dataframe is as below

d=dict([(x['status'],x['count']) for x in orders.groupBy("status").count().collect()])
print(d)

输出:

{'PENDING_PAYMENT': 15030, 'COMPLETE': 22899, 'ON_HOLD': 3798, 'PAYMENT_REVIEW': 729, 'PROCESSING': 8275, 'CLOSED': 7556, 'SUSPECTED_FRAUD': 1558, 
'PENDING': 7610, 'CANCELED': 1428}

2.创建了一个函数,该函数生成获取精确N条记录所需的分数值

#Exact number of records needed per status
N=5

#function calculates fraction

def fraction_calc(count_dict,N)
    d_mod={}
    for i in d:
        d_mod[i]=(N/d[i])
    return d_mod

#creating dictionary of fractions using above function
fraction=fraction_calc(d,5)
print(fraction)

输出:

{'PENDING_PAYMENT': 0.00033266799733865603, 'COMPLETE': 0.000218350146294598, 'ON_HOLD': 0.0013164823591363876, 'PAYMENT_REVIEW': 0.006858710562414266, 'PROCESSING': 0.0006042296072507553, 'CLOSED': 0.0006617257808364214, 'SUSPECTED_FRAUD': 0.003209242618741977, 'PENDING': 0.000657030223390276, 'CANCELED': 0.0035014005602240898}

3.创建使用startified采样API进行采样的最终数据帧。sampleBy()

#creating final sampled dataframe
df_sample=orders.sampleBy("status",fraction)

但我仍然没有得到每个状态的确切5条记录

#Checking count per status of resultant sample dataframe
df_sample.groupBy("status").count().show()
+---------------+-----+
|         status|count|
+---------------+-----+
|PENDING_PAYMENT|    3|
|       COMPLETE|    6|
|        ON_HOLD|    7|
| PAYMENT_REVIEW|    4|
|     PROCESSING|    6|
|         CLOSED|    6|
|SUSPECTED_FRAUD|    7|
|        PENDING|    9|
|       CANCELED|    5|
+---------------+-----+

我应该在这里做些什么来实现我的目标


Tags: ofindataframefor状态statuscount记录
1条回答
网友
1楼 · 发布于 2024-09-29 19:22:52

找到一份工作

from pyspark.sql.window import Window
from pyspark.sql.functions import rand,row_number

一,。使用rand()内置函数生成随机数的“key”列,然后按“key”顺序为“order_status”列上创建的分区窗口的每个元素分配一个行号。代码如下

df_sample=df.withColumn("key",rand()).\
withColumn("rnk", row_number().\
over(Window.partitionBy("status").\
orderBy("key"))).\
where("rnk<=5").drop("key","rnk")

二,。现在,我得到了每个状态的5条随机记录。此输出将在每次spark会话中更改

#Checking count per status of resultant sample dataframe
df_sample.groupBy("status").count().show()
+       -+  -+
|   status      |count|
+       -+  -+
|PENDING_PAYMENT|    5|
|       COMPLETE|    5|
|        ON_HOLD|    5|
| PAYMENT_REVIEW|    5|
|     PROCESSING|    5|
|         CLOSED|    5|
|SUSPECTED_FRAUD|    5|
|        PENDING|    5|
|       CANCELED|    5|
+       -+  -+

相关问题 更多 >

    热门问题