有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何使用标点器从状态存储中删除旧记录?(卡夫卡)

我使用streamsBuilder.table("myTopic")为主题创建了一个Ktable,我将其具体化为一个状态存储,以便使用交互式查询

每小时,我都要从这个状态存储(以及相关的变更日志主题)中删除其值在过去一小时内未更新的记录

我相信使用punctuator可以实现这一点,但到目前为止我只使用了DSL,因此不确定如何继续。如果有人能给我举个例子,我将不胜感激

谢谢

杰克


共 (1) 个答案

  1. # 1 楼答案

    可以将处理器API与DSL混合匹配,但不能处理KTable。您需要转换为KStream。或者,您可以使用与状态存储交互的处理器创建新拓扑

    您需要将该状态存储在某个位置—如何确定记录是否超过一小时。一个选项是为状态存储中的每条记录添加时间戳

    在处理器的init方法中,可以调用schedule(标点符号)来迭代状态存储中的记录并删除旧记录:

    context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
        myStateStore.all().forEachRemaining(keyValue -> {
            if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
                myStateStore.delete(keyValue.key);
            }
        });
    });