有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何使用卡夫卡在一段时间(窗口)内累计总金额?

下面的代码只是处理第一条到达并正确发布的消息。但之后不再处理任何消息(我在终端中使用kafka console consumer.bat来监控发布到的消息总量(按id

卡夫卡流

KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));


totalAmount
  .mapValues(v -> Integer.valueOf(v))
  .groupByKey()
  .windowedBy(TimeWindows.of(Duration.ofMillis(100)))
  .aggregate(
        () -> new Integer(0),
        (key, value, aggregate) -> {
                        System.out.println("value: "+value);
                        System.out.println("aggregate: "+aggregate);
                        return value+aggregate;
                    },,
        Materialized.with(Serdes.String(), Serdes.Integer())
  )
  .toStream()
  .map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
  .to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));

测试

  • 我每100ms发布一条消息,始终使用相同的键,主题为“数据消耗”
  • 发布到“数据消耗”主题的前四个(k,v)分别是:(1,1)、(1,2)、(1,4)、(1,1)
  • 卡夫卡流出版(1,1)到“总金额按id”,但之后没有其他东西
  • 系统。出来上面代码中的println()仅打印: 价值:1 合计:0 价值:2 合计:0

猜猜这个问题背后的原因是什么

*我希望第二个聚合等于1(聚合:1)


共 (0) 个答案