Pyspark展平RDD错误:解除的值过多

2024-06-01 10:59:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试在RDD中展平数据。RDD的结构是由4个元组组成的列表,第一个元素是主元素,第二个元素是字典列表,第三和第四个元素分别包含一个包含字典的列表。在

rdd=   [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
                  [{'pol_cat_id':'234','pol_dt':'20100220'}],
                  [{'qor_pol_id':'23492','qor_cd':'30'}]),

     ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                  [{'pol_cat_id':'532','pol_dt':'20091020'}],
                  [{'qor_pol_id':'49320','qor_cd':'21'}]) ]

我想将数据展平,以便以格式显示

enter image description here

在Pyspark中我该怎么做呢?在

这是我所尝试的,但这给了我一个错误:太多元组无法解包

^{pr2}$

如果需要的话,我可以发布完整的错误,但为了简洁起见

^{3}$

注意:转换成熊猫不起作用,因为RDD太大了


Tags: 数据id元素列表字典cdcovcat
1条回答
网友
1楼 · 发布于 2024-06-01 10:59:42

IIUC,您可以通过使用列表理解来运行flatMap(),来迭代4项元组(1个字符串+3个列表)的第2项,例如:

from pyspark.sql import Row

myrdd = sc.parallelize(rdd)

myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
#[({'primary_id': 'xxxxx99'},
#  {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
#  {'pol_cat_id': '234', 'pol_dt': '20100220'},
#  {'qor_pol_id': '23492', 'qor_cd': '30'}),
# ......

简短说明:在flatMap函数的列表理解中,除了迭代第二项x[1](作为z,这是一个字典),我还将第一个字符串项x[0]转换成一个只有一个条目的字典:{"primary_id":x[0]},并取第一项x[2]x[3],这两本书都是词典。在

因此,在运行了上面的flatMap函数之后,RDD元素变成了一个由4个字典组成的元组,接下来需要做的就是合并它们。下面是我将4个字典的元组映射到Row对象的示例代码,您可能需要更改如何处理异常和缺失字段的逻辑,以满足您自己的需求。在

^{pr2}$

顺便说一句。如果您想让您的原始函数正常工作,请检查以下5行包含#<

def flatten_map(record): 
  try: 
    #yield(record)    #<  comment this out, no need unprocessed data in output
    # Unpack items 
    id, items, line, pls = record 
    pol_id = line[0]["pol_cat_id"]      #<  from line[0] not pls
    pol_dt = line[0]["pol_dt"]          #<  from line[0] not pls
    qor_id = pls[0]["qor_pol_id"]       #<  from pls[0] not pls
    for item in items: 
      #<  below line removed the ending ", 1", thus no need the last map() function to flatten tuples
      yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
  except Exception as e: 
    pass 

相关问题 更多 >