如果使用不同的线程,JavaSpring云数据流可轮询消费者dlq和errorChannel将不起作用
为了使用带有Kafka binder的Spring Cloud Stream 3.1.1管理长时间运行的任务,我们需要使用可轮询的使用者在单独的线程中手动管理消耗,以便Kafka不会触发重新平衡。为此,我们定义了一个新的注释来管理可轮询消费者。这种方法的问题在于,工作需要在单独的线程中进行管理。抛出的任何异常最终都不会在errorChannel
和DLQ中结束
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableConsumer) && args(dataCapsule,..)")
public void handleMessage(ProceedingJoinPoint joinPoint,
PollableConsumer pollableConsumer, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableConsumerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
我们可以创建一个不同的输出通道来在异常情况下发布消息,但它感觉我们正在尝试实现一些可能不必要的东西
更新1
我们添加了以下bean:
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"http://localhost:9092");
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicErr() {
return TopicBuilder.name("ERR").partitions(1).replicas(1).build();
}
@Bean
public SeekToCurrentErrorHandler eh(KafkaOperations<String, byte[]> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
template,
(cr, e) -> new TopicPartition("ERR", 1)),
new FixedBackOff(0L, 1L));
}
并且enable-dlq
未在spring.cloud.stream.kafka.bindings.channel-name.consumer
中设置
但是我们仍然看不到任何针对ERR主题的消息。
即使是主线程抛出的任何异常
如果enable-dlq
设置为true,则主线程上的异常将发布到默认dlq主题中,并且正如预期的那样,子线程上的异常将被忽略
更新2
加里的例子似乎在总体上起作用。尽管在使用不推荐使用的StreamListner方法而不是函数时,我们需要做一些修改,但仍有一些问题我们无法解决
- 主题名称似乎总是
channel_name+.DLT
,因为我们不知道如何使用像dlq
这样的不同名称。我们为所有消费者使用一个dlq
主题,这似乎不是Spring kafka默认DLT所期望的李> - 似乎我们需要在DLT上至少有与消费者主题相同数量的分区。否则,此解决方案将不起作用。虽然不确定如何管理,但对我们来说,这似乎不是一个实际的假设李>
- 有没有一种方法可以像SpringCloudStream在幕后所做的那样利用SpringRetry?或者这需要单独实施?i、 e.基于
max.attempts
重试工作,然后输入DLQ部分李> - 我可以看到,在示例中,Spring actuator已被用于通过
this.endpoint.changeState("polled", State.PAUSED)
和this.endpoint.changeState("polled", State.RESUMED)
更新通道状态。为什么我们需要在暂停、重新提问等的同时这样做。不这样做的副作用是什么李>
# 1 楼答案
你的观察是正确的;错误处理绑定到线程
您可以在代码中直接使用
DeadLetterPublishingRecoverer
来简化DLQ的发布(而不是输出通道)。这样,您将获得带有异常信息等的增强标题https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
编辑
这里是一个例子;我暂停绑定,以防止在“作业”运行时出现任何新的交付,而不是像您这样重新请求交付
EDIT2
补充问题的答复:
1,2:查看Spring以获取ApacheKafka文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
DLPR有一个备用构造函数,允许您指定目标解析器。默认值只是附加
.DLT
并使用相同的分区。javadocs指定了如何指定目标分区:当
null
时,KafkaProducer
选择分区RetryTemplate
;然后第二个参数是
RecoveryCallback
,在重试次数用尽时调用