我正在尝试创建一个beam管道,以便在一个PCollection上同时应用多个ParDo变换,并在一个列表中收集和打印所有结果。到目前为止,我经历了一个循序渐进的过程,就像第一个帕多然后第二个帕多。 下面是我为我的问题准备的一个例子:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
class Tr1(beam.DoFn):
def process(self, number):
number = number + 1
yield number
class Tr2(beam.DoFn):
def process(self, number):
number = number + 2
yield number
def pipeline_test():
numbers = p | "Create" >> beam.Create([1])
tr1 = numbers | "Tr1" >> beam.ParDo(Tr1())
tr2 = numbers | "Tr2" >> beam.ParDo(Tr2())
tr1 | "Print1" >> beam.Map(print)
tr2 | "Print2" >> beam.Map(print)
def main(argv):
del argv
pipeline_test()
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
app.run(main)
转换和元素的调度由用于运行管道的运行程序管理。你知道吗
运行程序通常尝试优化图形,并可能按顺序或并行运行某些任务。你知道吗
在您的例子中,Tr1和Tr2都是无状态的,并且应用于相同的输入。在这种情况下,runner通常在同一台机器上为相同的元素依次运行它们。 注意,runner仍将并行运行不同的元素。你知道吗
应该是这样的。你知道吗
螺纹1 标高1->;Tr1 ->;Tr2型
螺纹2 标高1->;Tr1 ->;Tr2型
我不建议依赖管道不同部分的预期并行性,因为它取决于流道。你知道吗
相关问题 更多 >
编程相关推荐