有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

SpringBoot中的java@Async正在创建新线程,但控制器在发送响应之前等待异步调用完成

上下文

我有一个场景——我需要公开一个rest端点并提供一个post方法,fronend将使用它来接收表单值(姓名、电子邮件、地址)。 有了这些细节,我需要ti调用第三方api(最多需要10秒的响应时间)。 此外,我需要将处理后的应用程序和响应存储在DB中的某个位置,以便跟踪监控

问题 我计划使用Spring @Async功能在队列(a DB)中存储细节,这样在存储过程中就不会让用户等待响应。{}似乎正在创建新线程,我可以从日志中看到,但控制器没有将响应发送回客户端(这与我掌握的实际{}知识相反,这并不多);因此,在异步完成之前,用户必须等待响应

这里有什么不对劲的地方吗

谢谢你的帮助

以下是我的一些课程片段-

Main

@EnableAsync(proxyTargetClass = true)
@EnableScheduling
@SpringBootApplication
public class CardsApplication {

    public static void main(String[] args) {
        SpringApplication.run(CardsApplication.class, args);
    }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }

}

控制器

@Validated
@RequiredArgsConstructor
public class CardEligibilityController {
    private final CardEligibilityService cardEligibilityService;

    @PostMapping("/check-eligibility")
    @CrossOrigin(origins = "*")
    public EligibilityResponse checkEligibility(@RequestBody @Valid Applicant applicant){
        return cardEligibilityService.eligibilityService(applicant);
    }
}

服务1

public interface CardEligibilityService {
    EligibilityResponse eligibilityService(Applicant applicant);
}

@Slf4j
@Service
@RequiredArgsConstructor
public class CardEligibilityServiceImpl implements CardEligibilityService {

    private final ThirdPartyEligibilityAdapter thirdPartyEligibilityAdapter;
    private final QueueService queueService;
    private final QueueMessageResponseService queueMessageResponseService;

    @Override
    public EligibilityResponse eligibilityService(Applicant applicant){
        EligibilityResponse eligibilityResponse = checkEligibility(applicant);
        queueService.pushMessage(queueMessageResponseService.createQueueResponse(applicant,eligibilityResponse));
        return eligibilityResponse;
    }

    private EligibilityResponse checkEligibility(Applicant applicant) {
        return thirdPartyEligibilityAdapter.getEligibility(applicant);
    }

}

服务2

public interface QueueService {
     void pushMessage(QueueMessage queueMessage);
     void retry();
}


@Service
@RequiredArgsConstructor
@Slf4j
public class QueueServiceImpl implements QueueService{

    private final List<QueueMessage> deadQueue = new LinkedList<>();


    //TODO check why async gets response stuck
    @Override
    @Async
    public void pushMessage(QueueMessage queueMessage){
        try {
            //Push message to a queue - Queue settings Rabbit/Kafka - then this could be
            //used by listeners to persist the data into DB
            log.info("message queued {} ", queueMessage);
        } catch (Exception e) {
            log.error("Error {} , queueMessage {} ", e, queueMessage);
            deadQueue.add(queueMessage);
        }
    }

   
**This method is a fault tolerance mechanism in case push to queue had any issues, The Local Method call to pushMessage isn’t the problem I also tried this by deleting retry method method**

    @Override
    @Scheduled(fixedDelay = 300000)
    public void retry() {
        log.info("Retrying Message push if there are any failure in enqueueing ");
        final List<QueueMessage> temp = new LinkedList<>(deadQueue);
        deadQueue.clear();
        Collections.reverse(temp);
        temp.forEach(this::pushMessage);
    }

}

服务3

public interface QueueMessageResponseService {
    QueueMessage createQueueResponse(Applicant applicant, EligibilityResponse eligibilityResponse);
}

@Service
public class QueueMessageResponseServiceServiceImpl implements QueueMessageResponseService {
    @Override
    public QueueMessage createQueueResponse(Applicant applicant, EligibilityResponse eligibilityResponse) {
        return new QueueMessage(applicant,eligibilityResponse);
    }
}

编辑2

最奇怪的行为

如果我在我的异步方法中添加Thread.sleep(20);,这会像预期的那样工作,用户无需等待async完成就可以得到响应。但仍然无法理解原因

@Async
    public void pushMessage(QueueMessage queueMessage) {
        try {
            //Push message to a queue - Queue settings Rabbit/Kafka - then this could be
            //used by listeners to persist the data into DB
            Thread.sleep(20);
            log.info("message queued {} ", queueMessage);
        } catch (Exception e) {
            log.error("Error {} , queueMessage {} ", e, queueMessage);
            deadQueue.add(queueMessage);
        }
    }

共 (1) 个答案

  1. # 1 楼答案

    retry中的pushMessage的调用是本地调用。因此,在同步执行方法时,不涉及代理

    必须将异步方法移动到它自己的类中