有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java窗口触发器不会返回最新的结果

我试图测量具有如下所示窗口操作的Flink应用程序的延迟:

SingleOutputStreamOperator<String> branch = stream
                .getSideOutput(outputTag2)
                .keyBy(MetricObject::getRootAssetId)
                .window(TumblingEventTimeWindows.of(Time.seconds(60)))
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(15)))
                .aggregate(new CountDistinctAggregate(), new CountDistinctProcess())
                .name("windowed-count-distinct")
                .uid("windowed-count-distinct")
                .map((value)->String.valueOf(value.getTimestamp().toEpochMilli()))
                .name("send-timestamp");

我正在考虑事件时间,为了提取时间戳,我使用以下水印策略:

                        .<SingleRecord>forBoundedOutOfOrderness(Duration.ofSeconds(15))
                        .withTimestampAssigner((event, timestamp) -> event.getTimestamp().toEpochMilli()))

聚合函数将特定对象保存为累加器,该累加器还包含提取的时间戳;这些时间戳写在卡夫卡主题中。问题是返回的时间戳如下所示:

1639651859988
1639651890163
1639651904900
1639651919728
1639651919728
1639651949973
1639651965085
1639651979870

返回的时间戳并不像我预期的那样等距,第四个和第五个时间戳是相等的,但它们以15秒的间隔返回,这是不可能的,因为应用程序记录的输入是每秒(10秒)连续生成的。在其他测试中,我也得到了如下更糟糕的情况:

1639651979870
1639651992771
1639651992771
1639651992771
1639651992771
1639652189791
1639652205001
1639652219876

奇怪的是,当我使用一个没有触发器的简单翻滚窗口时:

 .window(TumblingEventTimeWindows.of(Time.seconds(15)))

返回的时间戳的间隔与预期相同:

1639652429766
1639652444930
1639652459900
1639652474609
1639652489746
1639652504862
1639652519734
1639652534847

我真的不明白问题出在哪里,似乎聚合函数中的累加器没有正确升级


共 (1) 个答案

  1. # 1 楼答案

    我认为您可以检查Flink streaming的输入数据,以验证结果是否符合您的预期

    对于第一个流,聚合操作在60秒窗口的数据集上运行4次(每15秒一次)。不确定聚合中的逻辑。例如,假设我们有一个3秒的窗口,并且每1秒触发一次。运算符将获取窗口中的max元素。还假设每1秒生成一次输入。如果输入是1,3,2。。。,然后我们会看到输出像1,3,3。。。从Flink开始,第一个窗口的窗格中有[1,3,2],对于每个获得max element的触发器,结果将是1,3,3

    对于第二个流作业,每个窗口都有一个触发时间,例如,以上面的输入为例,如果窗口持续1秒,我们将得到1、3、2、