如何根据另一个数据帧行的值用联接填充一个数据帧行单元格?

2024-10-06 08:01:28 发布

您现在位置:Python中文网/ 问答频道 /正文

对不起,如果我的问题不是很清楚,我不是那么好,当谈到查询。我想用一个模式来解释我想要达到的目标会容易得多。你知道吗

#loading dataframes with csv files
cores = spark.read.format("csv").option("header","true").load(coreFile)
children = spark.read.format("csv").option("header","true").load(childFile)

#gets all attribute types (entire columns values)
childTypes = children.select("AttributeType").distinct().collect()

#creates new column for each child type
redimDF = cores
for childType in childTypes : 
    redimDF = redimDF.withColumn(childType['AttributeType'], lit(0))

我在databricks集群中有两个数据帧

第一个“redimDF”:

+---+-----+-----+-------+-----+--+-----+-----+-------+------+-------+
|PId|SCode|PCode|LOYALTY|OFFER|VF|VENUE|GROUP|MISSION|REGION|GENERIC|
+---+-----+-----+-------+-----+--+-----+-----+-------+------+-------+
|663|  770|   30|      0|    0| 0|    0|    0|      0|     0|      0|
|527|  786|   32|      0|    0| 0|    0|    0|      0|     0|      0|
+---+-----+-----+-------+-----+--+-----+-----+-------+------+-------+

第二个“孩子们”:

+---+--------------+-------+ 
|PId| AttributeType|  Value| 
+---+--------------+-------+ 
|663|        REGION|      6| 
|663|       LOYALTY|      0| 
|663|         OFFER|   0000| 
|663|       MISSION|      D| 
|663|            VF|     77|
|663|         VENUE|  20744|  
|527|        REGION|      4| 
|527|       LOYALTY|      0| 
+---+--------------+-------+ 

我希望结果是这样的:

+---+-----+-----+-------+-----+--+-----+-----+-------+------+-------+
|PId|SCode|PCode|LOYALTY|OFFER|VF|VENUE|GROUP|MISSION|REGION|GENERIC|
+---+-----+-----+-------+-----+--+-----+-----+-------+------+-------+
|663|  770|   30|      0| 0000|77|20744|    0|      D|     6|      0|
|527|  786|   32|      0|    0| 0|    0|    0|      0|     4|      0|
+---+-----+-----+-------+-----+--+-----+-----+-------+------+-------+

有没有办法使用pyspark查询来实现这一点?你知道吗

提前谢谢


Tags: csvformatreadpidcoresregionsparkoption
1条回答
网友
1楼 · 发布于 2024-10-06 08:01:28

使用pivot有一种方法:

  1. 创建所需的数据帧

    import pyspark.sql.functions as F 
    redim = [(663,770, 30, 0, 0, 0), (527,786, 32, 0 ,0 ,0)]
    redimDF = sqlContext.createDataFrame(redim, ["PId","SCode","PCode","LOYALTY","OFFER","VF"])
    redimDF.show()
    + -+  -+  -+   -+  -+ -+
    |PId|SCode|PCode|LOYALTY|OFFER| VF|
    + -+  -+  -+   -+  -+ -+
    |663|  770|   30|      0|    0|  0|
    |527|  786|   32|      0|    0|  0|
    + -+  -+  -+   -+  -+ -+
    
    children = [(663,"LOYALTY",40),(663,"OFFER", 20),(527,"LOYALTY",40),(527,"VF", 20)]
    childrenDF = sqlContext.createDataFrame(children, ["PId","AttributeType","Value"])
    childrenDF .show()
    + -+      -+  -+
    |PId|AttributeType|Value|
    + -+      -+  -+
    |663|      LOYALTY|   40|
    |663|        OFFER|   20|
    |527|      LOYALTY|   40|
    |527|           VF|   20|
    + -+      -+  -+
    
  2. 透视childrenDF,如果redimDF的attributeType不是全部都在childrenDF中,则添加并设置为0。你知道吗

    childrenDF = childrenDF.groupBy("PId").pivot("AttributeType").agg(F.sum(F.col("Value")))
    for col in redimDF.columns:
        if col not in childrenDF.columns:
            childrenDF = childrenDF.withColumn(col, F.lit(0))
    
  3. 按与redimDF和union相同的顺序选择列

    childrenDF = childrenDF.select(redimDF.columns)
    df = redimDF.union(childrenDF)
    
  4. groupby和sum得到结果df

    df = df.groupBy("PId").agg(F.sum("SCode").alias("SCode"), 
    F.sum("PCode").alias("PCode"), F.sum("LOYALTY").alias("LOYALTY"), 
    F.sum("OFFER").alias("OFFER"), F.sum("VF").alias("VF"))
    df.show()
    
    + -+  -+  -+   -+  -+ -+
    |PId|SCode|PCode|LOYALTY|OFFER| VF|
    + -+  -+  -+   -+  -+ -+
    |663|  770|   30|     40|   20|  0|
    |527|  786|   32|     40|    0| 20|
    + -+  -+  -+   -+  -+ -+
    

相关问题 更多 >