有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java窗口商店未满(Spring Cloud Stream Kafka)

我试图从一堆数据点创建一个窗口化存储,但由于某种原因,流拓扑分支似乎没有被评估

我使用相同的流来填充物化到存储中的KTable,效果很好

我正在使用具有以下配置的Spring Cloud Streams:

spring:
  application.name: stream-test

  kafka.bootstrap-servers: localhost:9092

  cloud.stream:
    # assign group and topic name to binding
    bindings:
      windowedStream:
        destination: myTopic
        group: stream-test-window
    kafka:
      # configure kafka binder
      binder:
        brokers: ${spring.kafka.bootstrap-servers}
        configuration.auto.offset.reset: latest
      # kafka-streams specific binding configuration
      streams.bindings.windowedStream.consumer:
        keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
        valueSerde: kstreamstest.StreamSerdes$DataItemSerde

DataItemSerde只是一个扩展的JSON Serde(也适用于KTable

@Data class DataItem {
    String value;
}

public class StreamSerdes {
    public static final Serde<DataItem> DATA_ITEM_SERDE = new DataItemSerde();
    public static class DataItemSerde extends JsonSerde<DataItem> {}
}

有约束力

interface WindowedTableBinding {
    String WINDOW_STREAM = "windowedStream";

    @Input(WINDOW_STREAM)
    KStream<String, DataItem> stream();
}

我创建了一个流侦听器,如下所示

@Configuration
@EnableBinding(WindowedTableBinding.class)
class StreamToWindowed {
    String storeName = "wvs";

    @Bean
    String windowedStoreName() {
        return storeName;
    }

    @StreamListener(WindowedTableBinding.WINDOW_STREAM)
    public void windowStream(@Input(WindowedTableBinding.WINDOW_STREAM) KStream<String, DataItem> stream) {
        stream.peek((k, v)  -> System.out.printf("%s: %s%n", k, v))
                .groupByKey()
                .windowedBy(TimeWindows.of(5_000))
                .reduce((d1, d2) -> d2,
                    Materialized
                        .<String, DataItem, WindowStore<Bytes, byte[]>>as("wvs")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(StreamSerdes.DATA_ITEM_SERDE));
    }
}

但是,当我查询商店时

Set<String> getWindowedKeys() {
    ReadOnlyWindowStore<String, DataItem> queryableStore = queryService
            .getQueryableStore(windowedStoreName, QueryableStoreTypes.windowStore());
    Set<String> result = new HashSet<>();
    if (queryableStore != null) { // store is not null though
        try (KeyValueIterator<Windowed<String>, DataItem> values = queryableStore.all()) {
            values.forEachRemaining(kvs -> result.add(kvs.key.key()));
        }
    }
    return result;
}

该集合总是空的(当然,在我发送数据之后)。System.out.print语句也不会被触发,因此我猜根本不会对分支进行计算

同样,我为相同的值并行地建立了一个KTable,并且得到了很好的填充。我可以删除它,但窗口版本仍然不起作用

我确实看到了this example,但我看到的唯一区别是它将数据写回输出流,我不想这样做。而且,如果我添加它也没有帮助

我也试过了

@KafkaStreamsStateStore(name="wvs", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=5_000)
public void windowStream(@Input(WindowedTableBinding.WINDOW_STREAM) KStream<String, DataItem> stream) {}

但这没什么区别

我需要做什么来填充窗口数据存储


共 (1) 个答案

  1. # 1 楼答案

    与Spring一样,这是一个配置问题

    我需要为这两个绑定分别提供application-id

    spring.cloud.stream.kafka.streams.bindings:
        tableStream.consumer:
            application-id: table-generator
        windowedStream.consumer:
            application-id: windows-generator