给定的:无限的数据流,其中一些重复事件位于每个事件附近(以时间为单位);重复事件具有相同的事件时间戳和相同的唯一id
目标:从流中删除重复项
我计划应用一个固定大小的窗口,然后通过一个唯一的键将给定的事件分组,并执行早期开火。因此,我有如下几点:
(
element
| "Add timestamp to event" >> ParDo(AddTimestampCustomFn()) # simply return TimistampedValue
| "Select unique key" >> Map(lambda elem: (elem.id, elem))
| "Apply window" >> WindowInto(
FixedWindows(50),
trigger=AfterCount(1),
accumulation_mod=AccumulationMode.DISCARDING,
allowed_lateness=0,
)
| "Group events by id" >> GroupByKey()
| "Print results" >> ParDo(CustomPrintFn()) # simply print the first element from the grouped elements along with timestamp
)
一切似乎都在按预期进行,但我意识到,即使经过一段时间,仍然可以处理属于时间窗口的事件,而该时间窗口应该已经过了
假设我们有以下具有相应时间戳的事件:[('a', 0), ('a', 1), ('a', 40), ('b', 20), ('c', 90), ('d', '140'), ('f', 1)]
。我希望我的输出类似:[('a', 49), ('b', 49), ('c': 99), ('d', 149)]
。然而,除了上面的输出之外,我还得到了f
事件。所以实际输出是[('a', 49), ('b', 49), ('c': 99), ('d', 149), ('f', 49)]
。值得注意的是,分组事件的时间戳等于时间窗口的最后一个时间。我不太明白为什么会发生f
事件。窗口是固定的,其长度为50秒,并且allowed_lateness
设置为0。我还假设水印应该已经通过了。因此,我真的不明白为什么eventf
仍然存在
我还尝试执行没有GROUPBY语句的代码。然而,它似乎产生了类似的结果。让我们将以下数据作为输入:[('a', 1), ('b', 90), ('c', 140), ('f', 1)]
。然后结果仍然包括事件f
:[('a', 49), ('b', 99), ('c', 149), ('f', 49)]
。以下是简化代码:
(
element
| "Add timestamp to event" >> ParDo(AddTimestampCustomFn())
| "Apply window" >> WindowInto(FixedWindows(50))
| "Print results" >> ParDo(CustomPrintFn())
)
我觉得我对固定窗口的理解是错误的,但我不明白为什么
水印行为取决于源;它可能没有按照您认为的方式前进,因此这里的所有数据可能都是准时的,而不是丢弃的
特别是,如果使用“创建”,则水印从负无穷大开始,所有元素都被注入管道,然后水印前进到正无穷大。这可以确保所有数据都不会延迟,而不管您给出的创建顺序如何
如果要显式控制元素注入和水印以进行测试,可以使用TestStream
相关问题 更多 >
编程相关推荐