我刚刚开始使用Python学习ApacheBeam,并解决了这个问题一段时间,希望能从任何擅长ApacheBeam的人那里得到一些帮助
这是我的问题陈述:
我有一个如下所示的文本文件:
BEGIN=burger
blue
lettuce
mayonise
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish
我可以知道如何使用apache beam将burger和fish拆分为不同的PCollection,以便对这两个PCollection执行不同的操作吗
这里我附加了Python中的代码片段
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText
class SplitRow(beam.DoFn):
def process(self,element):
return element.splitlines()
def ExtractBurger(element):
if element == "BEGIN=burger":
return element
p = beam.Pipeline()
squares = (
p
# | "Read From Text" >> ReadFromText("gs://abc.txt")
| "Create dummy text file" >> Create([
'BEGIN=burger',
'blue',
'lettuce',
'mayonise',
'END=burger',
'BEGIN=fish',
'green',
'strawberry',
'ketchup',
'END=fish',
])
| "Decode and split lines" >> ParDo(SplitRow())
| "Extract out Burger" >> Filter(ExtractBurger)
| Map(print)
)
p.run()
我的输出是
BEGIN=burger
我能够提取出包含“BEGIN=burger”的行,但我真正想要的是将“BEGIN=burger”到“END=burger”之间的所有数据提取到1 PCollection中,并将“BEGIN=fish”到“END=fish”提取到另一PCollection中,我不确定是否可以这样做,因为我觉得Apache Beam只能执行行操作,我如何编写这样的逻辑
如果有人能提供一些见解,我将不胜感激!谢谢大家!
梁并行处理单元。因此,不能保证它将按原始顺序逐行处理
要实现这一点,必须使用状态(https://beam.apache.org/blog/stateful-processing/)来记录当前处理是否在开始和结束之间。您必须确保梁及其流道(无论您选择哪个流道)的平行度为1,以便它不会并行处理元素。 但这违背了使用光束的目的
如果无法更改文件:只需编写Python脚本即可
如果可以更改生成文件的行为:可以为“开始”和“结束”之间的每一行提供uuid。而且您的文件甚至不需要按原始顺序包含行。 例如:
然后,您可以并行处理所有行,将它们解析为
{key}={value}
,然后按键分组到一个PCollection中,该PCollection包含用于进一步转换的所有内容相关问题 更多 >
编程相关推荐