处理无界数据流时在窗口结束后激发的事件

2024-10-03 21:29:26 发布

您现在位置:Python中文网/ 问答频道 /正文

给定的:无限的数据流,其中一些重复事件位于每个事件附近(以时间为单位);重复事件具有相同的事件时间戳和相同的唯一id
目标:从流中删除重复项

我计划应用一个固定大小的窗口,然后通过一个唯一的键将给定的事件分组,并执行早期开火。因此,我有如下几点:

(
    element
    | "Add timestamp to event" >> ParDo(AddTimestampCustomFn())    # simply return TimistampedValue
    | "Select unique key" >> Map(lambda elem: (elem.id, elem))
    | "Apply window" >> WindowInto(
        FixedWindows(50),
        trigger=AfterCount(1),
        accumulation_mod=AccumulationMode.DISCARDING,
        allowed_lateness=0,
    )
    | "Group events by id" >> GroupByKey()
    | "Print results" >> ParDo(CustomPrintFn())    # simply print the first element from the grouped elements along with timestamp
)

一切似乎都在按预期进行,但我意识到,即使经过一段时间,仍然可以处理属于时间窗口的事件,而该时间窗口应该已经过了

假设我们有以下具有相应时间戳的事件:[('a', 0), ('a', 1), ('a', 40), ('b', 20), ('c', 90), ('d', '140'), ('f', 1)]。我希望我的输出类似:[('a', 49), ('b', 49), ('c': 99), ('d', 149)]。然而,除了上面的输出之外,我还得到了f事件。所以实际输出是[('a', 49), ('b', 49), ('c': 99), ('d', 149), ('f', 49)]。值得注意的是,分组事件的时间戳等于时间窗口的最后一个时间。我不太明白为什么会发生f事件。窗口是固定的,其长度为50秒,并且allowed_lateness设置为0。我还假设水印应该已经通过了。因此,我真的不明白为什么eventf仍然存在

我还尝试执行没有GROUPBY语句的代码。然而,它似乎产生了类似的结果。让我们将以下数据作为输入:[('a', 1), ('b', 90), ('c', 140), ('f', 1)]。然后结果仍然包括事件f:[('a', 49), ('b', 99), ('c', 149), ('f', 49)]。以下是简化代码:

(
    element
    | "Add timestamp to event" >> ParDo(AddTimestampCustomFn())
    | "Apply window" >> WindowInto(FixedWindows(50))
    | "Print results" >> ParDo(CustomPrintFn())
)

我觉得我对固定窗口的理解是错误的,但我不明白为什么


Tags: toeventaddid时间事件elementwindow
1条回答
网友
1楼 · 发布于 2024-10-03 21:29:26

水印行为取决于源;它可能没有按照您认为的方式前进,因此这里的所有数据可能都是准时的,而不是丢弃的

特别是,如果使用“创建”,则水印从负无穷大开始,所有元素都被注入管道,然后水印前进到正无穷大。这可以确保所有数据都不会延迟,而不管您给出的创建顺序如何

如果要显式控制元素注入和水印以进行测试,可以使用TestStream

相关问题 更多 >