<p>由于我对<em>pyspark</em>的了解非常有限,我将向您展示如何使用它
以纯粹的方式完成你的任务</p>
<p>其思想是通过<em>聚合或删除</em>对<em>df2</em>进行分组,并应用一个函数
给每组</p>
<p>此函数必须有两个变体:<em>单行</em>和<em>多行</em></p>
<p><em>单行</em>案例归结为只返回原始组
(一行)列顺序改变,数量重复为
<em>totalOrderQuantity</em>并插入到正确的位置</p>
<p><em>多行</em>情况涉及从<em>df1</em>连接相应行
和当前组的“重新格式化”内容</p>
<p>为此,请定义以下重新格式化函数:</p>
<pre><code>def reformat(grp):
grpSiz = grp.index.size
if grpSiz == 1: # Single-row case
grp2 = grp[['aggregatedOrderId', 'ETA', 'Quantity']]
grp2.insert(1, 'totalOrderQuantity', grp2.Quantity)
return grp2
# Multi-row case
grp1 = (grp.aggregatedOrderId + [ f'.{n}' for n in range(
1, grpSiz + 1)]).to_frame().assign(totalOrderQuantity='')\
.join(grp[['ETA', 'Quantity']])
return pd.concat([df1[df1.aggregatedOrderId == grp.iloc[0,0]], grp1])
</code></pre>
<p>然后从<em>df2</em>将其应用于各组:</p>
<pre><code>df2.groupby('aggregatedOrderId', sort=False).apply(reformat).reset_index(drop=True)
</code></pre>
<p>最后一步(<em>reset_index</em>)是清除创建的多索引所必需的
通过<em>groupby</em>创建一个默认索引</p>
<p>对于您的示例数据,结果是:</p>
<pre><code> aggregatedOrderId totalOrderQuantity ETA Quantity
0 xyz 20
1 xyz.1 08/01 10
2 xyz.2 08/25 10
3 abc 10 07/25 10
</code></pre>
<p>我认为,我的代码比另一个版本中的<em>pyspark</em>方式要短
解决方案,因此可能值得只使用<em>Pandas</em>而不是<em>pyspark</em></p>