PySpark:如果单行或添加行是子行,则将行合并到其父行

2024-10-05 17:23:19 发布

您现在位置:Python中文网/ 问答频道 /正文

对python或大规模处理数据非常陌生。我是新手,任何帮助都将不胜感激

我在pyspark中有两个数据帧

dF1 : total order schedule

aggregatedOrderId | totalOrderQuantity | ETA  | Quantity
 xyz              | 20                 |      |
 abc              | 10                 |      |

dF2: detailed order

aggregatedOrderId | Quantity           | ETA 
 xyz              | 10                 | 08/01
 xyz              | 10                 | 08/25
 abc              | 10                 | 07/25

output should look like:

aggregatedOrderId | totalOrderQuantity | ETA   | Quantity
 xyz              | 20                 |       |
 xyz.1            |                    |  08/01| 10
 xyz.2            |                    |  08/25| 10
 abc              |  10                |  07/25| 10

when there is single child record it has to become a single row in final df and when there are multiple child they go into separate rows with {orderId.[1-9]}. the out of this has to be in a json so I tried something like this.

from pyspark.sql import Window
window = Window.partitionBy('aggregatedOrderId').orderBy('aggregatedOrderId')
resdf2 = df2.withColumn('row_id', F.row_number().over(window))
resdf2 = resdf2.withColumn('agg', F.col('aggregatedOrderId')).withColumn('newRow' ,F.concat(F.col('aggregatedOrderId'), F.lit('.'), F.col('row_id'))).drop('aggregatedOrderId').withColumnRenamed('newColumn', 'aggregatedOrderId')


df1 = df1.withColumn('agg', F.col('aggregatedOrderId'))

finaldf = df1.join(
            resdf2 .groupBy("agg")
            .agg(F.collect_list(F.struct(*resdf2 .columns)).alias("orderSplits"))
            , ["agg"], 'left').drop('agg')

最后的df类似于,其中orderspits是一个行数组。我将最终的DF作为Json文件编写,供API使用

aggregatedOrderId | totalOrderQuantity | ETA  | Quantity | orderSplits

我对如何在只有一行的情况下进行合并感到非常惊讶,并将其保留下来,为多行提供子ID,然后使用orderSplits将其转换为json

感谢任何在这方面帮助我的人


Tags: 数据colaggquantitypysparkrowetadf1
3条回答

您可以将订单拆分添加为

df4 = final_df.withColumn('orderSplits',f.array(f.struct(*final_df.columns)))
df4.printSchema()

root
 |  aggregatedOrderId: string (nullable = true)
 |  totalOrderQuantity: string (nullable = true)
 |  ETA: string (nullable = true)
 |  Quantity: string (nullable = true)
 |  orderSplits: array (nullable = false)
 |    |  element: struct (containsNull = false)
 |    |    |  aggregatedOrderId: string (nullable = true)
 |    |    |  totalOrderQuantity: string (nullable = true)
 |    |    |  ETA: string (nullable = true)
 |    |    |  Quantity: string (nullable = true)


df4.show()

+        -+         +  -+    +          -+
|aggregatedOrderId|totalOrderQuantity|ETA  |Quantity|orderSplits          |
+        -+         +  -+    +          -+
|abc              |10                |     |10      |[[abc, 10, , 10]]    |
|xyz              |20                |     |        |[[xyz, 20, , ]]      |
|xyz.1            |null              |08/01|10      |[[xyz.1,, 08/01, 10]]|
|xyz.2            |null              |08/25|10      |[[xyz.2,, 08/25, 10]]|
+        -+         +  -+    +          -+

希望能有帮助

由于我对pyspark的了解非常有限,我将向您展示如何使用它 以纯粹的方式完成你的任务

其思想是通过聚合或删除df2进行分组,并应用一个函数 给每组

此函数必须有两个变体:单行多行

单行案例归结为只返回原始组 (一行)列顺序改变,数量重复为 totalOrderQuantity并插入到正确的位置

多行情况涉及从df1连接相应行 和当前组的“重新格式化”内容

为此,请定义以下重新格式化函数:

def reformat(grp):
    grpSiz = grp.index.size
    if grpSiz == 1:    # Single-row case
        grp2 = grp[['aggregatedOrderId', 'ETA', 'Quantity']]
        grp2.insert(1, 'totalOrderQuantity', grp2.Quantity)
        return grp2
    # Multi-row case
    grp1 = (grp.aggregatedOrderId + [ f'.{n}' for n in range(
        1, grpSiz + 1)]).to_frame().assign(totalOrderQuantity='')\
        .join(grp[['ETA', 'Quantity']])
    return pd.concat([df1[df1.aggregatedOrderId ==  grp.iloc[0,0]], grp1])

然后从df2将其应用于各组:

df2.groupby('aggregatedOrderId', sort=False).apply(reformat).reset_index(drop=True)

最后一步(reset_index)是清除创建的多索引所必需的 通过groupby创建一个默认索引

对于您的示例数据,结果是:

  aggregatedOrderId totalOrderQuantity    ETA Quantity
0               xyz                 20                
1             xyz.1                     08/01       10
2             xyz.2                     08/25       10
3               abc                 10  07/25       10

我认为,我的代码比另一个版本中的pyspark方式要短 解决方案,因此可能值得只使用Pandas而不是pyspark

看看这个。我分别计算了单子记录和多子记录,并使用union合并它们

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window


    spark = SparkSession.builder \
        .appName('so')\
        .getOrCreate()

    sc= spark.sparkContext

    df1 = sc.parallelize([
        ("xyz", 20, '',''), ("abc", 10, '','')
    ]).toDF(["aggregatedOrderId","totalOrderQuantity", "ETA", "quantity"])

    # df1.show()
    df1_r = df1.withColumnRenamed("aggregatedOrderId", "aggregatedOrderId_par")

    w_p = Window().partitionBy("aggregatedOrderId_par").orderBy("aggregatedOrderId_par")

    w1 = Window().partitionBy("aggregatedOrderId")

    w_c = Window().partitionBy("aggregatedOrderId").orderBy("aggregatedOrderId")

    df1_1 = df1_r.withColumn("r_no", F.row_number().over(w_p))

    # df1_1.show()


    df2 = sc.parallelize([
        ("xyz", 10, '08/01'), ("xyz", 10, '08/05'),("abc", 10, '07/25')
    ]).toDF(["aggregatedOrderId","Quantity", "ETA"])

    df2_c = df2.withColumnRenamed("ETA", "ETA_c").withColumnRenamed("Quantity", "Quantity_c")\
            .withColumn("r_no", F.row_number().over(w_c)).withColumn("order_count", F.max("r_no").over(w1))


    #calculate single parent has single child
    df2_2_c_single = df2_c.filter(F.col("order_count")==1)

    # df2_2_c_single.show()

    cond = (df1_1.aggregatedOrderId_par==df2_2_c_single.aggregatedOrderId)
    df_single = df2_2_c_single.join(df1_1,cond , how ='left')

    df_single_final = df_single.select("aggregatedOrderId",  "totalOrderQuantity", F.col("ETA_c").alias("ETA"), F.col("Quantity_c").alias("quantity"))

    df_single_final.show()

    # +        -+         +  -+    +
    # |aggregatedOrderId|totalOrderQuantity|  ETA|quantity|
    # +        -+         +  -+    +
    # |              abc|                10|07/25|      10|
    # +        -+         +  -+    +

    #calculate single parent many child dataframe
    df2_2_gre_1 = df2_c.where(F.col("order_count")>1)\
        .withColumn("aggregatedOrderId", F.concat(F.col("aggregatedOrderId"), F.lit('.'), F.col("r_no")))\
        .withColumn("totalOrderQuantity", F.lit(''))\
        .select("aggregatedOrderId", "totalOrderQuantity", F.col("ETA_c").alias("ETA"), F.col("Quantity_c").alias("quantity"))

    single_record = df2_c.where(F.col("order_count")==1).select("aggregatedOrderId").collect()
    single_record_final = [r['aggregatedOrderId'] for r in single_record]

    df1_without_single_record = df1_1.filter(~F.col("aggregatedOrderId_par").isin(single_record_final))\
    .select(F.col("aggregatedOrderId_par").alias("aggregatedOrderId"), "totalOrderQuantity", "ETA", "quantity")

    df_multi_union = df1_without_single_record.union(df2_2_gre_1)
    df_multi_union.show()

    df_final = df_multi_union.union(df_single_final)

    # +        -+         +  -+    +
    # |aggregatedOrderId|totalOrderQuantity|  ETA|quantity|
    # +        -+         +  -+    +
    # |              xyz|                20|     |        |
    # |            xyz.1|                  |08/01|      10|
    # |            xyz.2|                  |08/05|      10|
    # +        -+         +  -+    +

    df_final.show()

    # +        -+         +  -+    +
    # |aggregatedOrderId|totalOrderQuantity|  ETA|quantity|
    # +        -+         +  -+    +
    # |              xyz|                20|     |        |
    # |            xyz.1|                  |08/01|      10|
    # |            xyz.2|                  |08/05|      10|
    # |              abc|                10|07/25|      10|
    # +        -+         +  -+    +

相关问题 更多 >