有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在没有“changelog主题”的情况下加入KStream和KTable

我加入了KStream和KTable。下面是我的代码

StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressTable, 
        (order, address) -> order + " send to " + address)
        .to(ORDER_JOIN_STREAM);

KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();

执行此代码后,将创建一个新主题

$ ./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
order-join-application3-address-store-name-changelog
address
order

没有“~~~~~~~~~~~~~~~~~~~~~~日志”主题,我如何执行代码

  • 代理版本:0.11.0.2
  • 流版本:2.7.0

共 (1) 个答案

  1. # 1 楼答案

    我不确定这是否可能,因为

    In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable.

    Kafka aggregations中提取,它也使用流DSL,如Kafka join。基本上,Kafka join所做的是在他们从here调用“changelog stream”的KTable上执行KStream的查找

    KStream-KTable joins are always non-windowed joins. They allow you to perform table lookups against a KTable (changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be to enrich a stream of user activities (KStream) with the latest user profile information (KTable).