有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java卡夫卡制作者以双精度发送消息

我正在使用卡夫卡2.4.0,在我的制作人中面临着奇怪的行为:消息被两次发送到我的主题中。 我已经启用了调试日志记录,可以看到只有一条消息被发送到偏移量11,但是当使用consumer cmd工具连接到kafka时,我可以看到偏移量11和12中存在相同的消息!!! 我使用的是asyn producer,具有所有默认设置:

_kafkaProducer.send(record, new org.apache.kafka.clients.producer.Callback() {
                            public void onCompletion(org.apache.kafka.clients.producer.RecordMetadata metadata, Exception e) {
                                countDownLatch.countDown();
                                if (e != null) {
                                    exception.set(e);
                                }else{
                                    countKafkaSentMessages.getAndIncrement(); 
                                }
                                
                            }
                    });

这是我得到的日志:

2021-06-24 15:43:51,181 [Thread-55] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [10.244.66.31:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = PLAIN
        security.protocol = SASL_PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2021-06-24 15:43:51,182 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Starting the Kafka producer
2021-06-24 15:43:51,205 [Thread-55] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Kafka producer started
2021-06-24 15:43:51,205 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Starting Kafka producer I/O thread.
2021-06-24 15:43:51,413 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Requesting metadata update for topic my_topic.
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Attempting to append record ProducerRecord(topic=my_topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = MessageType, value = [84, 114, 100, 67, 97, 112, 116, 82, 112, 116])], isReadOnly = true), key=A, AAA, timestamp=null) with callback MyCallback to topic my_topic partition 0
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Retrying append due to new batch creation for topic my_topic partition 0. The old partition was 0
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer clientId=producer-2] Allocating a new 16384 byte message buffer for topic my_topic partition 0
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Waking up the sender since topic my_topic partition 0 is either full or getting a new batch
2021-06-24 15:43:51,430 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Nodes with data ready to send: [kafka:9092 (id: 1001 rack: null)]
2021-06-24 15:43:51,430 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Sent produce request to 1001: (type=ProduceRequest, acks=1, timeout=30000, partitionRecords=({my_topic-0=MemoryRecords(size=1041, buffer=java.nio.HeapByteBuffer[pos=0 lim=1041 cap=1041])}), transactionalId=''
2021-06-24 15:43:51,433 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Received produce response from node 1001 with correlation id 4
2021-06-24 15:43:51,433 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.ProducerBatch - Successfully produced messages to my_topic-0 with base offset 11.
2021-06-24 15:43:51,465 [Thread-55] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2021-06-24 15:43:51,465 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
2021-06-24 15:43:51,468 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Shutdown of Kafka producer I/O thread has completed.
2021-06-24 15:43:51,469 [Thread-55] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Kafka producer has been closed

有什么想法吗


共 (0) 个答案