对python或大规模处理数据非常陌生。我是新手,任何帮助都将不胜感激
我在pyspark中有两个数据帧
dF1 : total order schedule
aggregatedOrderId | totalOrderQuantity | ETA | Quantity
xyz | 20 | |
abc | 10 | |
dF2: detailed order
aggregatedOrderId | Quantity | ETA
xyz | 10 | 08/01
xyz | 10 | 08/25
abc | 10 | 07/25
output should look like:
aggregatedOrderId | totalOrderQuantity | ETA | Quantity
xyz | 20 | |
xyz.1 | | 08/01| 10
xyz.2 | | 08/25| 10
abc | 10 | 07/25| 10
when there is single child record it has to become a single row in final df and when there are multiple child they go into separate rows with {orderId.[1-9]}. the out of this has to be in a json so I tried something like this.
from pyspark.sql import Window
window = Window.partitionBy('aggregatedOrderId').orderBy('aggregatedOrderId')
resdf2 = df2.withColumn('row_id', F.row_number().over(window))
resdf2 = resdf2.withColumn('agg', F.col('aggregatedOrderId')).withColumn('newRow' ,F.concat(F.col('aggregatedOrderId'), F.lit('.'), F.col('row_id'))).drop('aggregatedOrderId').withColumnRenamed('newColumn', 'aggregatedOrderId')
df1 = df1.withColumn('agg', F.col('aggregatedOrderId'))
finaldf = df1.join(
resdf2 .groupBy("agg")
.agg(F.collect_list(F.struct(*resdf2 .columns)).alias("orderSplits"))
, ["agg"], 'left').drop('agg')
最后的df类似于,其中orderspits是一个行数组。我将最终的DF作为Json文件编写,供API使用
aggregatedOrderId | totalOrderQuantity | ETA | Quantity | orderSplits
我对如何在只有一行的情况下进行合并感到非常惊讶,并将其保留下来,为多行提供子ID,然后使用orderSplits将其转换为json
感谢任何在这方面帮助我的人
您可以将订单拆分添加为
希望能有帮助
由于我对pyspark的了解非常有限,我将向您展示如何使用它 以纯粹的方式完成你的任务
其思想是通过聚合或删除对df2进行分组,并应用一个函数 给每组
此函数必须有两个变体:单行和多行
单行案例归结为只返回原始组 (一行)列顺序改变,数量重复为 totalOrderQuantity并插入到正确的位置
多行情况涉及从df1连接相应行 和当前组的“重新格式化”内容
为此,请定义以下重新格式化函数:
然后从df2将其应用于各组:
最后一步(reset_index)是清除创建的多索引所必需的 通过groupby创建一个默认索引
对于您的示例数据,结果是:
我认为,我的代码比另一个版本中的pyspark方式要短 解决方案,因此可能值得只使用Pandas而不是pyspark
看看这个。我分别计算了单子记录和多子记录,并使用union合并它们
相关问题 更多 >
编程相关推荐