<p>看看这个。我分别计算了单子记录和多子记录,并使用union合并它们</p>
<pre><code> from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName('so')\
.getOrCreate()
sc= spark.sparkContext
df1 = sc.parallelize([
("xyz", 20, '',''), ("abc", 10, '','')
]).toDF(["aggregatedOrderId","totalOrderQuantity", "ETA", "quantity"])
# df1.show()
df1_r = df1.withColumnRenamed("aggregatedOrderId", "aggregatedOrderId_par")
w_p = Window().partitionBy("aggregatedOrderId_par").orderBy("aggregatedOrderId_par")
w1 = Window().partitionBy("aggregatedOrderId")
w_c = Window().partitionBy("aggregatedOrderId").orderBy("aggregatedOrderId")
df1_1 = df1_r.withColumn("r_no", F.row_number().over(w_p))
# df1_1.show()
df2 = sc.parallelize([
("xyz", 10, '08/01'), ("xyz", 10, '08/05'),("abc", 10, '07/25')
]).toDF(["aggregatedOrderId","Quantity", "ETA"])
df2_c = df2.withColumnRenamed("ETA", "ETA_c").withColumnRenamed("Quantity", "Quantity_c")\
.withColumn("r_no", F.row_number().over(w_c)).withColumn("order_count", F.max("r_no").over(w1))
#calculate single parent has single child
df2_2_c_single = df2_c.filter(F.col("order_count")==1)
# df2_2_c_single.show()
cond = (df1_1.aggregatedOrderId_par==df2_2_c_single.aggregatedOrderId)
df_single = df2_2_c_single.join(df1_1,cond , how ='left')
df_single_final = df_single.select("aggregatedOrderId", "totalOrderQuantity", F.col("ETA_c").alias("ETA"), F.col("Quantity_c").alias("quantity"))
df_single_final.show()
# + -+ + -+ +
# |aggregatedOrderId|totalOrderQuantity| ETA|quantity|
# + -+ + -+ +
# | abc| 10|07/25| 10|
# + -+ + -+ +
#calculate single parent many child dataframe
df2_2_gre_1 = df2_c.where(F.col("order_count")>1)\
.withColumn("aggregatedOrderId", F.concat(F.col("aggregatedOrderId"), F.lit('.'), F.col("r_no")))\
.withColumn("totalOrderQuantity", F.lit(''))\
.select("aggregatedOrderId", "totalOrderQuantity", F.col("ETA_c").alias("ETA"), F.col("Quantity_c").alias("quantity"))
single_record = df2_c.where(F.col("order_count")==1).select("aggregatedOrderId").collect()
single_record_final = [r['aggregatedOrderId'] for r in single_record]
df1_without_single_record = df1_1.filter(~F.col("aggregatedOrderId_par").isin(single_record_final))\
.select(F.col("aggregatedOrderId_par").alias("aggregatedOrderId"), "totalOrderQuantity", "ETA", "quantity")
df_multi_union = df1_without_single_record.union(df2_2_gre_1)
df_multi_union.show()
df_final = df_multi_union.union(df_single_final)
# + -+ + -+ +
# |aggregatedOrderId|totalOrderQuantity| ETA|quantity|
# + -+ + -+ +
# | xyz| 20| | |
# | xyz.1| |08/01| 10|
# | xyz.2| |08/05| 10|
# + -+ + -+ +
df_final.show()
# + -+ + -+ +
# |aggregatedOrderId|totalOrderQuantity| ETA|quantity|
# + -+ + -+ +
# | xyz| 20| | |
# | xyz.1| |08/01| 10|
# | xyz.2| |08/05| 10|
# | abc| 10|07/25| 10|
# + -+ + -+ +
</code></pre>