在Apache Beam上按顺序执行数据库写入和读取任务

2024-09-24 04:27:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用beam-nuggets库从apachebeam管道向Postgres数据库读写数据。我想按顺序执行以下两项任务:

  1. processing_info表中插入新行
  2. 查询新创建的processing_info表记录的主键。然后,主键查询的值将作为边输入传递给另一个DoFn(其中它用于填充相关表的外键列)

目前,我正在执行Beam管道之前创建processing_info记录,但我希望创建新记录作为管道执行的一部分(在googledataflow上运行管道时,它使事情变得更简单)。理想情况下,代码应如下所示:

with beam.Pipeline(options=pipeline_options) as p:
    # Executes first
    proc_id_result = (p | 'Create Proc Info Record' >> beam.Create([{'pipeline_name': 'cleansed_data_pipeline'}])
                          | 'Make Processing Id' >> relational_db.Write(
                                source_config=source_config,
                                table_config=proc_table_config))
    # Executes second
    proc_id_record = p | relational_db.ReadFromDB(
            source_config=source_config,
            table_name='processing_info',
            query='SELECT pi.id FROM processing_info pi WHERE processing_date_time = '
                  '   (SELECT MAX(pi1.processing_date_time) from processing_info pi1 '
                  f'      where pi1.pipeline_name = \'cleansed_data_pipeline\')'
        )
    ...
    # This code executes later, and is automatically deferred until the side input is available
    | 'Add \'processing_info_id\'' >>
                (beam.ParDo(AddKeyValuePairToDict(), 'processing_info_id', AsSingleton(proc_id_record)))
    ...

我可能能够破解一些东西(例如,一个未使用的端输入)来延迟查询,直到插入操作完成,但我想知道是否有一种更惯用的方法(我对Beam不熟悉)

谢谢


Tags: nameinfoidconfigsource管道pipeline记录
1条回答
网友
1楼 · 发布于 2024-09-24 04:27:01

您的想法是正确的:您可以使用未使用的侧输入来实现这一点。您可以这样做(在Beam中用于ReadFromBigQuery

class PassThrough(beam.DoFn):
  def process(self, element):
    yield element

output = input | beam.ParDo(PassThrough()).with_outputs(
    'cleanup_signal', main='main')
main_output = output['main']
cleanup_signal = output['cleanup_signal']

single_element = (
    input.pipeline
    | beam.Create([None])
    | beam.Map(lambda x, nothing: x, beam.pvalue.AsSingleton(cleanup_signal)))

single_element | relational_db.ReadFromDB(...)

现在,问题是如何利用你的ReadFromDB转换来实现这一点,我想它不会接受这样的输入。有什么方法可以实现这一转换吗

相关问题 更多 >