有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Spring引导中的java TaskRejectedException

我有一个Spring启动应用程序,两个端点异步运行

  1. 使用RESTAPI在外部系统中注册用户。成功注册后,将他放入DB和Redis缓存

代码是这样的

 @Service
 public class UserRegistrationService {
    @Async("asyncExecutor")
    public String registerUser(DomainRequest request) throws SystemException {

        try {
            // External API call
            extResponse = extServiceImplInterface.registerUser(extRequest);
        } catch (Exception e) {

        }
        if (extResponse.getResCode = 0) {
            // Success response from API - save to DB and redis cache
            savedUser = saveUser(extResponse);
        }

    }
}
  1. 通过对每个用户调用外部RESTAPI来刷新DB表中的每个用户。为了触发这个事件,我每隔5秒调用第二个端点,它执行refreshUser()方法

代码是这样的

@Service
public class UserRefreshService {
    
    @Autowired
    //External API call class
    GetLastChatResponse getLastChatResponse;
    
    @Async("asyncExecutor")
    public void refreshUser() {
        try{
            //Get all registerd users from DB
            List<User> currentUsers = userRepositoryInterface.findAll();
            //Traverse through the list and call an external API
            if(!currentUsers.isEmpty()) {
                for(User item : currentUsers) {
                    getLastChatResponse.getLastResponse(item);
                }
            }
        }
        catch(Exception e){
            
        }
    }
    
}

@Service
public class GetLastChatResponse {
    
    @Autowired
    JedisPool jedisPool;
    
    @Async("asyncExecutor")
    public void getLastResponse(User item) {
        //Call external rest API
        LastAgentResponse lastResponseMessage = getLastAgentResponse(item);
        try {
            if(lastResponseMessage != null) {
                //Set info to Redis cache
                Jedis jedis = jedisPool.getResource();
                jedis.set(item.getChatId()+Constants.LAST_INDEX, lastResponseMessage.getLastIndex());
                jedis.set(item.getChatId()+Constants.LAST_TEXT_TIME, LocalDateTime.now().toString());
            }               
        } catch (SystemException e) {
            logger.error("Exception: {}", e);
        }
    }
}

我正在使用这些线程池配置

@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(100);
    executor.setMaxPoolSize(200); 
    executor.setQueueCapacity(1); 
    executor.setKeepAliveSeconds(5);
    executor.setThreadNamePrefix("AsyncThread-");
    executor.initialize();
    return executor;
}

通常DB表包含大约10个用户,因为过期的用户将从表中删除

我遇到的问题是,在运行应用程序一段时间后,当我调用其中一个端点时,会出现此错误

    {
    "code": "500",
    "type": "TaskRejectedException",
    "message": "Executor [java.util.concurrent.ThreadPoolExecutor@7de76256[Running, pool size = 200, active threads = 200, queued tasks = 1, completed tasks = 5089]] did not accept task: org.springframework.cloud.sleuth.instrument.async.TraceCallable@325cf639"
}

我尝试更改池配置,但没有成功

executor.setCorePoolSize(2000);
executor.setMaxPoolSize(4000); 
executor.setQueueCapacity(1); 
executor.setKeepAliveSeconds(5);

有人对此有想法吗


共 (0) 个答案