pySpark映射多变量

2024-06-28 19:21:06 发布

您现在位置:Python中文网/ 问答频道 /正文

下面的代码将我的引用df的值和列名映射到我的实际数据集,查找精确匹配,如果找到精确匹配,则返回OutputValue。但是,我试图添加一条规则,即当PrimaryValue = DEFAULT返回OutputValue

我试图解决这个问题的解决方案是创建一个具有空值的新数据帧——因为下面的代码没有提供匹配项。因此,下一步将以空值为目标,其对应的PrimaryValue = DEFAULT将空值替换为OutputValue

  #create a map based on columns from reference_df
  map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
  map_value = final_reference.OutputValue

  #dataframe of concatinated mappings to get the corresponding OutputValues from reference table
  d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
  #display(d)

  #iterate through mapped values 
  mappings = create_map([lit(i) for i in chain.from_iterable(d)])

  #dataframe with corresponding matched OutputValues
  dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()]) 
  display(dataset)

Tags: 数据代码fromdefaultmapdfwscreate
1条回答
网友
1楼 · 发布于 2024-06-28 19:21:06

根据评论中的讨论,我认为您只需要从现有映射添加一个默认映射,然后使用coalease()函数来查找第一个非空值,如下所示:

from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce

# skip some old code

d    
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])

# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>

# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}

# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[ 
  coalesce(
    mappings[concat_ws('\0', lit(c), col(c))],
    mappings_default[c],
    lit("Not Specified at Source" if c in available_list else "Lookup not found")
  ).alias(c_name) for c,c_name in matchedAttributeName_List.items()])

一些解释

(1)d是从引用中检索到的列表列表,我们使用列表理解[ lit(j) for i in d for j in i ]将其展平到列表,并将展平的列表应用到create_map函数:

(2)mappings_default与上述类似,但添加了一个if条件作为过滤器,并仅保留以\x00DEFAULT结尾的PrimaryLookupAttributeValue(它是内部列表i[0]的第一项)的条目,然后使用split将PrimaryLookupAttributeValue(基本上是\x00DEFAULT)从map_键中去掉

相关问题 更多 >