在apachebeam和googledataflow中使用全局变量

2024-09-29 21:30:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我被困了几天。所以我的问题是,我使用ApacheBeam和dataflow runner创建数据管道。我在脚本中使用一个全局变量(一个字典),由某个函数访问。问题是,当我在本地运行它时,估计有200000行数据,它在本地和数据流中都成功了。但是当我在数据流中使用包含6.000.000行的数据集运行它时,字典变成了空的。这是我的密码:

职能:

global pre_compute
pre_compute = {} # {(transnumber,seq):[dordertxt, dorderupref], (transnumber,seq):[dordertxt, dorderupref]}

def compute_all_upref_and_ordertxt(data):
    '''
    Compute all dorder_txt and dorder_upref
    '''
    trans_number = data.get("transaction_number")
    seq = data.get("stdetail_seq")

    # get and remove ordertxt and upref from data
    ordertxt = data.pop("dorder_ordertxt","")
    upref = data.pop("dorder_upref","")

    
    global pre_compute
    if pre_compute.get((trans_number,seq), None) == None:
        pre_compute[(trans_number, seq)] = [ordertxt, upref]

    else:
        if ordertxt:
            pre_compute[(trans_number, seq)][0] = ordertxt
        if upref:
            pre_compute[(trans_number, seq)][1] = upref

    return data # -> data with no upref and ordertxt

def evaluate_and_inject_upref_ordertxt(data):
    # Using json.loads() faster 4-6x than eval()
    data = data.strip("\n")
    data = data.replace("'", '"')
    data = data.replace("None", "null")
    data = json.loads(data) # str to dict
    
    
    trans_number = data.get('transaction_number')
    seq = data.get('stdetail_seq')

    global pre_compute
    ordertxt, upref = pre_compute[(trans_number, seq)]
    data['dorder_ordertxt'] = ordertxt
    data['dorder_upref'] = upref
    return data

管道代码:

left_join_std_dtdo = (join_stddtdo_dict | 'Left Join STD DTable DOrder' >> Join(left_pcol_name=stdbsap_dimm_data, left_pcol=left_join_std_bsap,
                                                                    right_pcol_name=dtdo_data, right_pcol=left_join_dtdo,
                                                                    join_type='left', join_keys=join_keys)
                                            | 'UPDATE PRICE FOR SCCRM01' >> beam.ParDo(update_price_sccrm01())
                                            | 'REMOVE PRICE from DICTIONARY' >> beam.ParDo(remove_dtdo_price())
                                            | 'PreCompute All Upref and ordertxt based on trans_number and seq' >> beam.Map(compute_all_upref_and_ordertxt)
    )

rm_left_std_dtdo = (left_join_std_dtdo | 'CHANGE JOINED STD DTDO INTO STR' >> beam.Map(lambda x: str(x))
                                             | 'DISTINCT STD DTDO' >> beam.Distinct()
                                             | 'EVALUATE AND INJECT AS DICT STD DTDO' >> beam.Map(evaluate_and_inject_upref_ordertxt)
                                             | 'Adjust STD_NET_PRICE WITH DODT_PRICE' >> beam.ParDo(replaceprice())
    ) 

它在本地和数据流中都能完美运行,有200.000行数据。但是当我尝试在数据流中使用6.000.000行数据时,脚本正在执行

ordertxt, upref = pre_compute[(trans_number, seq)]

在数据流中运行时,它总是给我一个键错误,就像字典是空的一样。有什么解决办法吗


Tags: andnumbertransdatagetleftpreseq
2条回答

您可以尝试使用Beam state API。请注意,stateAPI不是为存储大量数据而设计的

另一种选择可能是将数据存储在外部存储系统(例如,GCS)中,以便所有工作人员都可以访问该数据

请注意,如果您试图存储大量数据,这两种解决方案都可能会限制管道的并行化(从而限制性能)。在这种情况下,最好重新设计管道,使其真正可并行化

ApacheBeam基于在分布式基础设施上运行的假设。节点将独立运行,任何状态都必须在工作者之间共享。因此,全局变量不可用。如果您确实需要在员工之间交换信息,您可能需要自己实现。然而,我更愿意建议过度考虑管道

相关问题 更多 >

    热门问题