有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

如果使用不同的线程,JavaSpring云数据流可轮询消费者dlq和errorChannel将不起作用

为了使用带有Kafka binder的Spring Cloud Stream 3.1.1管理长时间运行的任务,我们需要使用可轮询的使用者在单独的线程中手动管理消耗,以便Kafka不会触发重新平衡。为此,我们定义了一个新的注释来管理可轮询消费者。这种方法的问题在于,工作需要在单独的线程中进行管理。抛出的任何异常最终都不会在errorChannel和DLQ中结束

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableConsumer) && args(dataCapsule,..)")
  public void handleMessage(ProceedingJoinPoint joinPoint,
      PollableConsumer pollableConsumer, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableConsumerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {  

        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

我们可以创建一个不同的输出通道来在异常情况下发布消息,但它感觉我们正在尝试实现一些可能不必要的东西

更新1

我们添加了以下bean:

  @Bean
  public KafkaTemplate<String, byte[]> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
  @Bean
  public ProducerFactory<String, byte[]> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        "http://localhost:9092");
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        KafkaAvroSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }
  @Bean
  public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
    return new KafkaAdmin(configs);
  }
  @Bean
  public NewTopic topicErr() {
    return TopicBuilder.name("ERR").partitions(1).replicas(1).build();
  }
  @Bean
  public SeekToCurrentErrorHandler eh(KafkaOperations<String, byte[]> template) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
        template,
        (cr, e) -> new TopicPartition("ERR", 1)),
        new FixedBackOff(0L, 1L));
  }

并且enable-dlq未在spring.cloud.stream.kafka.bindings.channel-name.consumer中设置 但是我们仍然看不到任何针对ERR主题的消息。 即使是主线程抛出的任何异常

如果enable-dlq设置为true,则主线程上的异常将发布到默认dlq主题中,并且正如预期的那样,子线程上的异常将被忽略

更新2

加里的例子似乎在总体上起作用。尽管在使用不推荐使用的StreamListner方法而不是函数时,我们需要做一些修改,但仍有一些问题我们无法解决

  • 主题名称似乎总是channel_name+.DLT,因为我们不知道如何使用像dlq这样的不同名称。我们为所有消费者使用一个dlq主题,这似乎不是Spring kafka默认DLT所期望的
  • 似乎我们需要在DLT上至少有与消费者主题相同数量的分区。否则,此解决方案将不起作用。虽然不确定如何管理,但对我们来说,这似乎不是一个实际的假设
  • 有没有一种方法可以像SpringCloudStream在幕后所做的那样利用SpringRetry?或者这需要单独实施?i、 e.基于max.attempts重试工作,然后输入DLQ部分
  • 我可以看到,在示例中,Spring actuator已被用于通过this.endpoint.changeState("polled", State.PAUSED)this.endpoint.changeState("polled", State.RESUMED)更新通道状态。为什么我们需要在暂停、重新提问等的同时这样做。不这样做的副作用是什么

共 (1) 个答案

  1. # 1 楼答案

    你的观察是正确的;错误处理绑定到线程

    您可以在代码中直接使用DeadLetterPublishingRecoverer来简化DLQ的发布(而不是输出通道)。这样,您将获得带有异常信息等的增强标题

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

    编辑

    这里是一个例子;我暂停绑定,以防止在“作业”运行时出现任何新的交付,而不是像您这样重新请求交付

    @SpringBootApplication
    @EnableScheduling
    public class So67296258Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67296258Application.class, args);
        }
    
        @Bean
        TaskExecutor exec() {
            return new ThreadPoolTaskExecutor();
        }
    
        @Bean
        DeadLetterPublishingRecoverer recoverer(KafkaOperations<Object, Object> template) {
            return new DeadLetterPublishingRecoverer(template);
        }
    
        @Bean
        NewTopic topic() {
            return TopicBuilder.name("polled.DLT").partitions(1).replicas(1).build();
        }
    
        @Bean
        MessageSourceCustomizer<KafkaMessageSource<?, ?>> customizer() {
            return (source, dest, group) -> source.setRawMessageHeader(true);
        }
    
    }
    
    @Component
    class Handler {
    
        private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
    
        private final PollableMessageSource source;
    
        private final TaskExecutor exec;
    
        private final BindingsEndpoint endpoint;
    
        private final DeadLetterPublishingRecoverer recoverer;
    
        Handler(PollableMessageSource source, TaskExecutor exec, BindingsEndpoint endpoint,
                DeadLetterPublishingRecoverer recoverer) {
    
            this.source = source;
            this.exec = exec;
            this.endpoint = endpoint;
            this.recoverer = recoverer;
        }
    
        @Scheduled(fixedDelay = 5_000)
        public void process() {
            LOG.info("Polling");
            boolean polled = this.source.poll(msg -> {
                LOG.info("Pausing Binding");
                this.endpoint.changeState("polled", State.PAUSED);
                AcknowledgmentCallback callback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(msg);
                callback.noAutoAck();
    //          LOG.info(msg.toString());
                this.exec.execute(() -> {
                    try {
                        runJob(msg);
                    }
                    catch (Exception e) {
                        this.recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), e);
                    }
                    finally {
                        callback.acknowledge();
                        this.endpoint.changeState("polled", State.RESUMED);
                        LOG.info("Resumed Binding");
                    }
                });
            });
            LOG.info("" + polled);
        }
    
        private void runJob(Message<?> msg) throws InterruptedException {
            LOG.info("Running job");
            Thread.sleep(30_000);
            throw new RuntimeException("fail");
        }
    
    }
    
    spring.cloud.stream.pollable-source=polled
    spring.cloud.stream.bindings.polled-in-0.destination=polled
    spring.cloud.stream.bindings.polled-in-0.group=polled
    

    EDIT2

    补充问题的答复:

    1,2:查看Spring以获取ApacheKafka文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

    DLPR有一个备用构造函数,允许您指定目标解析器。默认值只是附加.DLT并使用相同的分区。javadocs指定了如何指定目标分区:

        /**
         * Create an instance with the provided template and destination resolving function,
         * that receives the failed consumer record and the exception and returns a
         * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
         * 0, no partition is set when publishing to the topic.
         * @param template the {@link KafkaOperations} to use for publishing.
         * @param destinationResolver the resolving function.
         */
    

    null时,KafkaProducer选择分区

    1. 用适当的重试和退避策略连接RetryTemplate;然后
    retryTemplate.execute(context -> { ... },
        context -> {...});
    

    第二个参数是RecoveryCallback,在重试次数用尽时调用

    1. 效率更高。使用您的解决方案,您可以在处理上一个任务的同时不断检索和重新查询交付。通过暂停绑定,我们告诉卡夫卡在恢复消费者之前不要再发送任何记录。这允许我们通过轮询消费者来保持其活力,但不需要检索和重置偏移量的开销