我被困了几天。所以我的问题是,我使用ApacheBeam和dataflow runner创建数据管道。我在脚本中使用一个全局变量(一个字典),由某个函数访问。问题是,当我在本地运行它时,估计有200000行数据,它在本地和数据流中都成功了。但是当我在数据流中使用包含6.000.000行的数据集运行它时,字典变成了空的。这是我的密码:
职能:
global pre_compute
pre_compute = {} # {(transnumber,seq):[dordertxt, dorderupref], (transnumber,seq):[dordertxt, dorderupref]}
def compute_all_upref_and_ordertxt(data):
'''
Compute all dorder_txt and dorder_upref
'''
trans_number = data.get("transaction_number")
seq = data.get("stdetail_seq")
# get and remove ordertxt and upref from data
ordertxt = data.pop("dorder_ordertxt","")
upref = data.pop("dorder_upref","")
global pre_compute
if pre_compute.get((trans_number,seq), None) == None:
pre_compute[(trans_number, seq)] = [ordertxt, upref]
else:
if ordertxt:
pre_compute[(trans_number, seq)][0] = ordertxt
if upref:
pre_compute[(trans_number, seq)][1] = upref
return data # -> data with no upref and ordertxt
def evaluate_and_inject_upref_ordertxt(data):
# Using json.loads() faster 4-6x than eval()
data = data.strip("\n")
data = data.replace("'", '"')
data = data.replace("None", "null")
data = json.loads(data) # str to dict
trans_number = data.get('transaction_number')
seq = data.get('stdetail_seq')
global pre_compute
ordertxt, upref = pre_compute[(trans_number, seq)]
data['dorder_ordertxt'] = ordertxt
data['dorder_upref'] = upref
return data
管道代码:
left_join_std_dtdo = (join_stddtdo_dict | 'Left Join STD DTable DOrder' >> Join(left_pcol_name=stdbsap_dimm_data, left_pcol=left_join_std_bsap,
right_pcol_name=dtdo_data, right_pcol=left_join_dtdo,
join_type='left', join_keys=join_keys)
| 'UPDATE PRICE FOR SCCRM01' >> beam.ParDo(update_price_sccrm01())
| 'REMOVE PRICE from DICTIONARY' >> beam.ParDo(remove_dtdo_price())
| 'PreCompute All Upref and ordertxt based on trans_number and seq' >> beam.Map(compute_all_upref_and_ordertxt)
)
rm_left_std_dtdo = (left_join_std_dtdo | 'CHANGE JOINED STD DTDO INTO STR' >> beam.Map(lambda x: str(x))
| 'DISTINCT STD DTDO' >> beam.Distinct()
| 'EVALUATE AND INJECT AS DICT STD DTDO' >> beam.Map(evaluate_and_inject_upref_ordertxt)
| 'Adjust STD_NET_PRICE WITH DODT_PRICE' >> beam.ParDo(replaceprice())
)
它在本地和数据流中都能完美运行,有200.000行数据。但是当我尝试在数据流中使用6.000.000行数据时,脚本正在执行
ordertxt, upref = pre_compute[(trans_number, seq)]
在数据流中运行时,它总是给我一个键错误,就像字典是空的一样。有什么解决办法吗
您可以尝试使用Beam state API。请注意,stateAPI不是为存储大量数据而设计的
另一种选择可能是将数据存储在外部存储系统(例如,GCS)中,以便所有工作人员都可以访问该数据
请注意,如果您试图存储大量数据,这两种解决方案都可能会限制管道的并行化(从而限制性能)。在这种情况下,最好重新设计管道,使其真正可并行化
ApacheBeam基于在分布式基础设施上运行的假设。节点将独立运行,任何状态都必须在工作者之间共享。因此,全局变量不可用。如果您确实需要在员工之间交换信息,您可能需要自己实现。然而,我更愿意建议过度考虑管道
相关问题 更多 >
编程相关推荐