在Apache Beam pipelin中对一个PCollection同时应用多个PTransforms

2024-10-06 16:22:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试创建一个beam管道,以便在一个PCollection上同时应用多个ParDo变换,并在一个列表中收集和打印所有结果。到目前为止,我经历了一个循序渐进的过程,就像第一个帕多然后第二个帕多。 下面是我为我的问题准备的一个例子:

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions

p = beam.Pipeline(options=PipelineOptions())

class Tr1(beam.DoFn):
  def process(self, number):
    number = number + 1
    yield number

class Tr2(beam.DoFn):
  def process(self, number):
    number = number + 2
    yield number

def pipeline_test():

  numbers =  p | "Create" >> beam.Create([1])
  tr1 = numbers  | "Tr1" >> beam.ParDo(Tr1())
  tr2 = numbers  | "Tr2" >> beam.ParDo(Tr2())

  tr1 | "Print1" >> beam.Map(print)
  tr2 | "Print2" >> beam.Map(print) 

def main(argv):
  del argv

  pipeline_test()

  result = p.run()
  result.wait_until_finish()
if __name__ == '__main__':
  app.run(main)

Tags: importnumberpipelinemainapachedefclassoptions
1条回答
网友
1楼 · 发布于 2024-10-06 16:22:17

转换和元素的调度由用于运行管道的运行程序管理。你知道吗

运行程序通常尝试优化图形,并可能按顺序或并行运行某些任务。你知道吗

在您的例子中,Tr1和Tr2都是无状态的,并且应用于相同的输入。在这种情况下,runner通常在同一台机器上为相同的元素依次运行它们。 注意,runner仍将并行运行不同的元素。你知道吗

应该是这样的。你知道吗

螺纹1 标高1->;Tr1 ->;Tr2型

螺纹2 标高1->;Tr1 ->;Tr2型

我不建议依赖管道不同部分的预期并行性,因为它取决于流道。你知道吗

相关问题 更多 >