java窗口触发器不会返回最新的结果
我试图测量具有如下所示窗口操作的Flink应用程序的延迟:
SingleOutputStreamOperator<String> branch = stream
.getSideOutput(outputTag2)
.keyBy(MetricObject::getRootAssetId)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(15)))
.aggregate(new CountDistinctAggregate(), new CountDistinctProcess())
.name("windowed-count-distinct")
.uid("windowed-count-distinct")
.map((value)->String.valueOf(value.getTimestamp().toEpochMilli()))
.name("send-timestamp");
我正在考虑事件时间,为了提取时间戳,我使用以下水印策略:
.<SingleRecord>forBoundedOutOfOrderness(Duration.ofSeconds(15))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp().toEpochMilli()))
聚合函数将特定对象保存为累加器,该累加器还包含提取的时间戳;这些时间戳写在卡夫卡主题中。问题是返回的时间戳如下所示:
1639651859988
1639651890163
1639651904900
1639651919728
1639651919728
1639651949973
1639651965085
1639651979870
返回的时间戳并不像我预期的那样等距,第四个和第五个时间戳是相等的,但它们以15秒的间隔返回,这是不可能的,因为应用程序记录的输入是每秒(10秒)连续生成的。在其他测试中,我也得到了如下更糟糕的情况:
1639651979870
1639651992771
1639651992771
1639651992771
1639651992771
1639652189791
1639652205001
1639652219876
奇怪的是,当我使用一个没有触发器的简单翻滚窗口时:
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
返回的时间戳的间隔与预期相同:
1639652429766
1639652444930
1639652459900
1639652474609
1639652489746
1639652504862
1639652519734
1639652534847
我真的不明白问题出在哪里,似乎聚合函数中的累加器没有正确升级
# 1 楼答案
我认为您可以检查Flink streaming的输入数据,以验证结果是否符合您的预期
对于第一个流,聚合操作在60秒窗口的数据集上运行4次(每15秒一次)。不确定聚合中的逻辑。例如,假设我们有一个3秒的窗口,并且每1秒触发一次。运算符将获取窗口中的max元素。还假设每1秒生成一次输入。如果输入是1,3,2。。。,然后我们会看到输出像1,3,3。。。从Flink开始,第一个窗口的窗格中有[1,3,2],对于每个获得max element的触发器,结果将是1,3,3
对于第二个流作业,每个窗口都有一个触发时间,例如,以上面的输入为例,如果窗口持续1秒,我们将得到1、3、2、