有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何使用带回调的send()方法返回元数据?

我想对Kafka Producer send()使用如下回调方法:

RecordMetadata recordmetadata = kafkaProducer.send(new ProducerRecord<>(topic,
                null, timestamp, key, message), this::onCompletion);

private RecordMetadata onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
        return metadata;
    } else {
        return null;
    }
}

但是我有onCompletion方法返回的元数据或null将由send()方法返回(因为调用send()的方法依赖于它)

如何使send()返回元数据或null与回调方法结合使用


共 (1) 个答案

  1. # 1 楼答案

    文档指出,发送是异步的,一旦记录存储在等待发送的记录的缓冲区中,该方法将立即返回。为了让它按您的意愿工作,您必须为它实现自己的解决方案,例如使用全局标志:

    private RecordMetadata recordMetadata;
    private boolean onCompletetionExecuted = false;
    
    kafkaProducer.send(new ProducerRecord<>(topic,
                    null, timestamp, key, auditorMessage), this::onCompletion);
    while (!onCompletetionExecuted) {
        // waiting, would be good to have a fixed timeout
    }
    // after this point the value of recordMetadata is the one returned by onCompletion
    
    private void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            recordMetadata = metadata;
        } else {
            recordMetadata = null;
        }
        onCompletetionExecuted = true;
    }
    

    这一点都不优雅,但它可以胜任