我正在使用beam-nuggets
库从apachebeam管道向Postgres数据库读写数据。我想按顺序执行以下两项任务:
processing_info
表中插入新行李>processing_info
表记录的主键。然后,主键查询的值将作为边输入传递给另一个DoFn
(其中它用于填充相关表的外键列)李>目前,我正在执行Beam管道之前创建processing_info
记录,但我希望创建新记录作为管道执行的一部分(在googledataflow上运行管道时,它使事情变得更简单)。理想情况下,代码应如下所示:
with beam.Pipeline(options=pipeline_options) as p:
# Executes first
proc_id_result = (p | 'Create Proc Info Record' >> beam.Create([{'pipeline_name': 'cleansed_data_pipeline'}])
| 'Make Processing Id' >> relational_db.Write(
source_config=source_config,
table_config=proc_table_config))
# Executes second
proc_id_record = p | relational_db.ReadFromDB(
source_config=source_config,
table_name='processing_info',
query='SELECT pi.id FROM processing_info pi WHERE processing_date_time = '
' (SELECT MAX(pi1.processing_date_time) from processing_info pi1 '
f' where pi1.pipeline_name = \'cleansed_data_pipeline\')'
)
...
# This code executes later, and is automatically deferred until the side input is available
| 'Add \'processing_info_id\'' >>
(beam.ParDo(AddKeyValuePairToDict(), 'processing_info_id', AsSingleton(proc_id_record)))
...
我可能能够破解一些东西(例如,一个未使用的端输入)来延迟查询,直到插入操作完成,但我想知道是否有一种更惯用的方法(我对Beam不熟悉)
谢谢
您的想法是正确的:您可以使用未使用的侧输入来实现这一点。您可以这样做(在Beam中用于ReadFromBigQuery
现在,问题是如何利用你的
ReadFromDB
转换来实现这一点,我想它不会接受这样的输入。有什么方法可以实现这一转换吗相关问题 更多 >
编程相关推荐