有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaKafka流:从应用程序每个实例中的所有分区读取

使用KTable时,当实例/使用者的数量等于分区的数量时,Kafka streams不允许实例从特定主题的多个分区读取。我尝试使用GlobalKTable实现这一点,但问题是数据将被覆盖,聚合也无法应用于此

假设我有一个名为“data_in”的主题,有3个分区(P1、P2、P3)。当我运行Kafka流应用程序的3个实例(I1、I2、I3)时,我希望每个实例从“data_in”的所有分区读取数据。我的意思是I1可以从P1,P2和P3中读取,I2可以从P1,P2和P3中读取,I2等等等等

EDIT:请记住,生产者可以在“data_in”中将两个相似的ID发布到两个不同的分区中。因此,当运行两个不同的实例时,GlobalKtable将被覆盖

请问,如何做到这一点?这是我代码的一部分

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

共 (1) 个答案

  1. # 1 楼答案

    将输入主题“data_in”的分区数更改为1个分区,或者使用GlobalKtable从主题中的所有分区获取数据,然后您可以将流与之连接。这样,您的应用程序实例就不必再位于不同的消费者组中

    代码如下所示:

    private GlobalKTable<String, theDataList> globalStream() {
    
       // KStream of records from data-in topic using String and theDataSerde deserializers
      KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
    
      thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
    
      KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
    
      // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
      KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();
    
      Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
      materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
    
    // Return a KTable
      KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
          if (!value.getValideData())
              aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
          else
            aggregate.getList().add(value);
          return aggregate;
      }, materialized)
      .to("agg_data_in");
    
      return getBuilder().globalTable("agg_data_in");
    }
    

    编辑:我编辑了上面的代码,以强制对名为“new_data_in”的主题进行重新分区