java如何使用标点器从状态存储中删除旧记录?(卡夫卡)
我使用streamsBuilder.table("myTopic")
为主题创建了一个Ktable
,我将其具体化为一个状态存储,以便使用交互式查询
每小时,我都要从这个状态存储(以及相关的变更日志主题)中删除其值在过去一小时内未更新的记录
我相信使用punctuator可以实现这一点,但到目前为止我只使用了DSL,因此不确定如何继续。如果有人能给我举个例子,我将不胜感激
谢谢
杰克
你可以在下面搜索框中键入要查询的问题!
我使用streamsBuilder.table("myTopic")
为主题创建了一个Ktable
,我将其具体化为一个状态存储,以便使用交互式查询
每小时,我都要从这个状态存储(以及相关的变更日志主题)中删除其值在过去一小时内未更新的记录
我相信使用punctuator可以实现这一点,但到目前为止我只使用了DSL,因此不确定如何继续。如果有人能给我举个例子,我将不胜感激
谢谢
杰克
# 1 楼答案
可以将处理器API与DSL混合匹配,但不能处理KTable。您需要转换为KStream。或者,您可以使用与状态存储交互的处理器创建新拓扑
您需要将该状态存储在某个位置—如何确定记录是否超过一小时。一个选项是为状态存储中的每条记录添加时间戳
在处理器的init方法中,可以调用schedule(标点符号)来迭代状态存储中的记录并删除旧记录: