有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Apache Kafka order根据消息的值对消息进行窗口化

我试图找到一种方法,在一个主题分区内重新排序消息,并将排序后的消息发送到一个新主题

我有卡夫卡出版社,它发送以下格式的字符串消息: {system_timestamp}-{event_name}?{parameters}

例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

此外,我们还为每条消息添加了一些消息密钥,以便将它们发送到相应的分区

我想做的是根据消息的{system timestamp}部分在1分钟内重新排序事件,因为我们的发布者不保证消息将按照{system timestamp}值发送

例如,我们可以先向主题发送一条具有更大的{system timestamp}值的消息

我调查了Kafka Stream API,发现了一些关于消息窗口和聚合的示例:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

但我接下来该如何处理这个分组流呢?我没有看到任何“sort()(e1,e2)->;e1。与可用的(e2)方法相比,windows也可以应用于聚合()减少()计数(),但我认为我不需要任何消息数据操作

如何在1分钟的时间窗口中重新排序邮件,并将其发送到其他主题


共 (1) 个答案

  1. # 1 楼答案

    以下是一个提纲:

    创建一个处理器实现:

    • 在process()方法中,对于每条消息:

      • 从消息值读取时间戳
      • 将(时间戳、消息密钥)对作为密钥,消息值作为值插入KeyValueStore。注意:这也提供了重复数据消除。您需要提供一个自定义Serde来序列化密钥,以便时间戳优先于字节,以便范围查询首先按时间戳排序
    • 在标点()方法中:

      • 使用从0到时间戳-60'000(=1分钟)的范围读取存储
      • 使用上下文按顺序发送获取的消息。forward()并将其从存储中删除

    这种方法的问题是,如果没有新的MSG到达以提前“流时间”,则不会触发标点符号()。如果这在您的情况下是一种风险,您可以创建一个外部计划程序,定期向每个(!)发送“勾选”消息你的主题的分区,你的处理器应该忽略它,但它们会导致在没有“真正的”MSG的情况下触发标点符号。 KIP-138将通过添加对系统时间标点的明确支持来解决这一限制: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics