Java卡夫卡制作者以双精度发送消息
我正在使用卡夫卡2.4.0,在我的制作人中面临着奇怪的行为:消息被两次发送到我的主题中。 我已经启用了调试日志记录,可以看到只有一条消息被发送到偏移量11,但是当使用consumer cmd工具连接到kafka时,我可以看到偏移量11和12中存在相同的消息!!! 我使用的是asyn producer,具有所有默认设置:
_kafkaProducer.send(record, new org.apache.kafka.clients.producer.Callback() {
public void onCompletion(org.apache.kafka.clients.producer.RecordMetadata metadata, Exception e) {
countDownLatch.countDown();
if (e != null) {
exception.set(e);
}else{
countKafkaSentMessages.getAndIncrement();
}
}
});
这是我得到的日志:
2021-06-24 15:43:51,181 [Thread-55] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [10.244.66.31:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2021-06-24 15:43:51,182 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Starting the Kafka producer
2021-06-24 15:43:51,205 [Thread-55] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Kafka producer started
2021-06-24 15:43:51,205 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Starting Kafka producer I/O thread.
2021-06-24 15:43:51,413 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Requesting metadata update for topic my_topic.
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Attempting to append record ProducerRecord(topic=my_topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = MessageType, value = [84, 114, 100, 67, 97, 112, 116, 82, 112, 116])], isReadOnly = true), key=A, AAA, timestamp=null) with callback MyCallback to topic my_topic partition 0
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Retrying append due to new batch creation for topic my_topic partition 0. The old partition was 0
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer clientId=producer-2] Allocating a new 16384 byte message buffer for topic my_topic partition 0
2021-06-24 15:43:51,427 [Thread-55] TRACE org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Waking up the sender since topic my_topic partition 0 is either full or getting a new batch
2021-06-24 15:43:51,430 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Nodes with data ready to send: [kafka:9092 (id: 1001 rack: null)]
2021-06-24 15:43:51,430 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Sent produce request to 1001: (type=ProduceRequest, acks=1, timeout=30000, partitionRecords=({my_topic-0=MemoryRecords(size=1041, buffer=java.nio.HeapByteBuffer[pos=0 lim=1041 cap=1041])}), transactionalId=''
2021-06-24 15:43:51,433 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Received produce response from node 1001 with correlation id 4
2021-06-24 15:43:51,433 [kafka-producer-network-thread | producer-2] TRACE org.apache.kafka.clients.producer.internals.ProducerBatch - Successfully produced messages to my_topic-0 with base offset 11.
2021-06-24 15:43:51,465 [Thread-55] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2021-06-24 15:43:51,465 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
2021-06-24 15:43:51,468 [kafka-producer-network-thread | producer-2] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-2] Shutdown of Kafka producer I/O thread has completed.
2021-06-24 15:43:51,469 [Thread-55] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-2] Kafka producer has been closed
有什么想法吗
共 (0) 个答案