如何将N行分配到X组中,并在PySpark中指定值D?

2024-06-28 15:34:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我想做什么:

PySpark中,我试图将N行分配到X个大小相同的组中,并为这些组指定一个特定的值D

  • 每行由属性A、B、C(参考、项目、位置)组成 所有A都是唯一的,但不是B&;C
  • X是上游声明的常数
  • D是从D1=今天+1到Dx=今天+X的日期
  • 组合{B;C}相等的行不应在 不同的组(不应拆分同一位置的相同项目 并获得相同的日期)

我拥有的(df1):

df1 = spark.createDataFrame([ ('1234','banana','Paris'),
                            ('1235','orange','Berlin'),
                            ('1236','orange','Paris'),
                            ('1237','banana','Berlin'),
                            ('1238','orange','Paris'),
                            ('1239','banana','Berlin'),
                       ], ["A","B","C"])

+----+------+------+
|   A|     B|     C|
+----+------+------+
|1234|banana| Paris|
|1235|orange|Berlin|
|1236|orange| Paris|
|1237|banana|Berlin|
|1238|orange| Paris|
|1239|banana|Berlin|
+----+------+------+

我想要什么(df2):

例如,当X=3时:

    +----+------+------+-----+
    |   A|     B|     C|    D|
    +----+------+------+-----+
    |1234|banana| Paris|date1|
    |1235|orange|Berlin|date1|
    |1236|orange| Paris|date2|
    |1237|banana|Berlin|date3|
    |1238|orange| Paris|date2|
    |1239|banana|Berlin|date3|
    +----+------+------+-----+

例如,当X=4时:

    +----+------+------+-----+
    |   A|     B|     C|    D|
    +----+------+------+-----+
    |1234|banana| Paris|date1|
    |1235|orange|Berlin|date4|
    |1236|orange| Paris|date2|
    |1237|banana|Berlin|date3|
    |1238|orange| Paris|date2|
    |1239|banana|Berlin|date3|
    +----+------+------+-----+

               
                   

例如,当X=5时:

    +----+------+------+-----+
    |   A|     B|     C|    D|
    +----+------+------+-----+
    |1234|banana| Paris|date1|
    |1235|orange|Berlin|date4|
    |1236|orange| Paris|date2|
    |1237|banana|Berlin|date3|
    |1238|orange| Paris|date2|
    |1239|banana|Berlin|date3|
    +----+------+------+-----+

               

注:{B,C}元素的排序可以是随机的


到目前为止我所尝试的:

下面的代码平均分配元素,但不能满足不拆分类似{B;C}组合的条件

>>> w=Window.orderBy('B','C')
>>> df2 = df1.withColumn("id",(F.row_number().over(w))%3)
>>> df2.show()
+----+------+------+---+
|   A|     B|     C| id|
+----+------+------+---+
|1237|banana|Berlin|  1|
|1239|banana|Berlin|  2|
|1234|banana| Paris|  0|
|1235|orange|Berlin|  1|
|1236|orange| Paris|  2|
|1238|orange| Paris|  0|
+----+------+------+---+

                   
                   


Tags: 项目id元素属性pysparkbananadf1df2
2条回答

有人向我提出了如下备选答案:


利用collect_listexplode

df1 = spark.createDataFrame([ ('1234','banana','Paris'),
                            ('1235','orange','Berlin'),
                            ('1236','orange','Paris'),
                            ('1237','banana','Berlin'),
                            ('1238','orange','Paris'),
                            ('1239','banana','Berlin'),
                       ], ["A","B","C"])

from pyspark.sql import Window as W, functions as F

df = df1.groupBy("B", "C").agg(F.collect_list("A").alias("A"))\
        .withColumn("id", F.rand())\
        .withColumn("id", F.row_number().over(W.partitionBy().orderBy("id")) % 3)\
        .withColumn("A", F.explode("A"))\
df.show()

+   +   +  + -+
|     B|     C|   A| id|
+   +   +  + -+
|banana|Berlin|1237|  1|
|banana|Berlin|1239|  1|
|orange|Berlin|1235|  2|
|orange| Paris|1236|  0|
|orange| Paris|1238|  0|
|banana| Paris|1234|  1|
+   +   +  + -+

结果与PySpark Helper提供的答案基本相同

dense_rank代替row_number。如果你修改3,你不能保证得到相同大小的组,但它将接近取决于你的数据洗牌。如果需要尽可能精确,您可以使用类似floor(dense_rank_col / max(dense_rank_col) * 3)的方法拆分它

相关问题 更多 >