如何使用Apache Beam将1个文本文件的内容拆分为不同的PCollection

2024-10-01 13:35:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我刚刚开始使用Python学习ApacheBeam,并解决了这个问题一段时间,希望能从任何擅长ApacheBeam的人那里得到一些帮助

这是我的问题陈述:

我有一个如下所示的文本文件:

BEGIN=burger
blue
lettuce
mayonise 
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish

我可以知道如何使用apache beam将burger和fish拆分为不同的PCollection,以便对这两个PCollection执行不同的操作吗

这里我附加了Python中的代码片段

import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText

class SplitRow(beam.DoFn):
  def process(self,element):
    return element.splitlines()


def ExtractBurger(element):
    if element == "BEGIN=burger":
        return element

p = beam.Pipeline()
squares = (
    p 
#     | "Read From Text" >> ReadFromText("gs://abc.txt")
    | "Create dummy text file" >> Create([
          'BEGIN=burger',
          'blue',
          'lettuce',
          'mayonise',
          'END=burger',
          'BEGIN=fish',
          'green',
          'strawberry',
          'ketchup',
          'END=fish',
      ])
    | "Decode and split lines" >> ParDo(SplitRow())
    | "Extract out Burger" >> Filter(ExtractBurger)
    | Map(print)
)
p.run()

我的输出是

BEGIN=burger

我能够提取出包含“BEGIN=burger”的行,但我真正想要的是将“BEGIN=burger”到“END=burger”之间的所有数据提取到1 PCollection中,并将“BEGIN=fish”到“END=fish”提取到另一PCollection中,我不确定是否可以这样做,因为我觉得Apache Beam只能执行行操作,我如何编写这样的逻辑

  1. 如果找到,开始=汉堡
  2. 继续循环下一行,直到找到END=burger
  3. 把整个部分写进一个PCollection

如果有人能提供一些见解,我将不胜感激!谢谢大家!


Tags: importapachecreategreenblueelementendbeam
1条回答
网友
1楼 · 发布于 2024-10-01 13:35:36

梁并行处理单元。因此,不能保证它将按原始顺序逐行处理

要实现这一点,必须使用状态(https://beam.apache.org/blog/stateful-processing/)来记录当前处理是否在开始和结束之间。您必须确保梁及其流道(无论您选择哪个流道)的平行度为1,以便它不会并行处理元素。 但这违背了使用光束的目的

  • 如果无法更改文件:只需编写Python脚本即可

  • 如果可以更改生成文件的行为:可以为“开始”和“结束”之间的每一行提供uuid。而且您的文件甚至不需要按原始顺序包含行。 例如:

       'burger=blue',
       'burger=lettuce',
       'burger=mayonise',
       'fish=green',
       'fish=strawberry',
       'fish=ketchup',
       'burger=pickle',
       'fish=chips',
    

然后,您可以并行处理所有行,将它们解析为{key}={value},然后按键分组到一个PCollection中,该PCollection包含用于进一步转换的所有内容

相关问题 更多 >