如何基于使用Pyspark的条件从另一个表更新表的Spark DataFrame列值

2024-09-28 01:30:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我想比较pyspark中的2个数据帧

下面是我的测试用例数据集(来自谷歌)

所以我有两个df

  1. 基本测向
  2. 二次测向

baseDF

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3500,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Vom,5000,mex,IT,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

secDF

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,ITA,2/11/2019
22,Tom,2500,usa,HRA,2/11/2019
33,Kom,3000,uk,ITA,2/11/2019
44,Nom,4600,can,HRA,2/11/2019
55,Vom,8000,mex,ITA,2/11/2019
77,XYZ,5000,mex,ITA,2/11/2019

我必须用两个键(No和Name)比较secDF和baseDF,如果这些字段匹配(我只需要secDF中匹配的记录),那么我必须用secDF中的值更新baseDF的salary和Dept字段

预期产量

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,ITA,2/11/2019
22,Tom,2500,usa,HRA,2/11/2019
33,Kom,3000,uk,ITA,2/11/2019
44,Nom,4600,can,HRA,2/11/2019
55,Vom,8000,mex,ITA,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

使用pyspark,我可以使用subtract()查找表2中不存在的表1的值,然后使用这两个表中的所有值,或者应该使用withcolumn覆盖满足条件的值

有人能提出一个好的方法吗

更新--- 我必须用两个键(No和Name)比较secDF和baseDF,如果这些字段匹配(我只需要secDF中匹配的记录),那么我必须用secDF中的值更新baseDF的salary和Dept字段


Tags: nonamedateaddresssamitjoinind
1条回答
网友
1楼 · 发布于 2024-09-28 01:30:21

您可以执行左连接并合并生成的Sal列,secdf优先于basedf:

import pyspark.sql.functions as F

result = basedf.alias('basedf').join(
    secdf.alias('secdf'),
    ['No', 'Name'],
    'left'
).select(
    [F.coalesce('secdf.Sal', 'basedf.Sal').alias('Sal')
     if c == 'Sal'
     else F.coalesce('secdf.Dept', 'basedf.Dept').alias('Dept')
     if c == 'Dept'
     else f'basedf.{c}'
     for c in basedf.columns]
)

result.show()
+ -+  +  +   -+  +    -+
| No|Name| Sal|Address|Dept|Join_Date|
+ -+  +  +   -+  +    -+
| 11| Sam|1000|    ind| ITA|2/11/2019|
| 22| Tom|2500|    usa| HRA|2/11/2019|
| 33| Kom|3000|     uk| ITA|2/11/2019|
| 44| Nom|4600|    can| HRA|2/11/2019|
| 55| Vom|8000|    mex| ITA|2/11/2019|
| 66| XYZ|5000|    mex|  IT|2/11/2019|
+ -+  +  +   -+  +    -+

相关问题 更多 >

    热门问题