有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java无法发布来自KafkaProducer的消息

我无法发送卡夫卡制作人的消息。我的配置不起作用,看起来是这样的:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
properties.setProperty("acks", "1");
properties.setProperty("retries", "3");
properties.setProperty("linger.ms", "1");

Producer<String, String> producer =
    new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord =
    new ProducerRecord<String, String>("second_topic", "3", "messagtest");
Future<RecordMetadata> s = producer.send(producerRecord);

producer.flush();
producer.close();

下面是我执行s.get()后的错误

 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for second_topic-0: 30021 ms has passed since batch creation plus linger time
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at ai.sys.producer.Test.main(Test.java:33)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for second_topic-0: 30021 ms has passed since batch creation plus linger time

共 (1) 个答案

  1. # 1 楼答案

    默认情况下,在Kafka Producer中启用了16K字节大小的批处理。然而,在您的代码中,您只发送了一条可能不满足批量大小的记录

    因此,为了让代码正常工作,请尝试将以下属性“batch.size”添加到Kafka Producer属性的值“0”中

    properties.setProperty("batch.size", "0");
    

    这将禁用批处理机制,并允许制作人将记录写入卡夫卡代理

    注意:在实时情况下,禁用批处理会增加对代理的写入请求数量,并降低生产者和服务器的I/O吞吐量和性能