java如何使用卡夫卡在一段时间(窗口)内累计总金额?
下面的代码只是处理第一条到达并正确发布的消息。但之后不再处理任何消息(我在终端中使用kafka console consumer.bat来监控发布到的消息总量(按id)
卡夫卡流:
KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));
totalAmount
.mapValues(v -> Integer.valueOf(v))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(100)))
.aggregate(
() -> new Integer(0),
(key, value, aggregate) -> {
System.out.println("value: "+value);
System.out.println("aggregate: "+aggregate);
return value+aggregate;
},,
Materialized.with(Serdes.String(), Serdes.Integer())
)
.toStream()
.map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
.to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));
测试:
- 我每100ms发布一条消息,始终使用相同的键,主题为“数据消耗”
- 发布到“数据消耗”主题的前四个(k,v)分别是:(1,1)、(1,2)、(1,4)、(1,1)
- 卡夫卡流出版(1,1)到“总金额按id”,但之后没有其他东西
- 系统。出来上面代码中的println()仅打印: 价值:1 合计:0 价值:2 合计:0
猜猜这个问题背后的原因是什么
*我希望第二个聚合等于1(聚合:1)
共 (0) 个答案