有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

中断超过阈值的线程的java最佳实践

我正在使用Java ExecutorService框架提交可调用的任务以供执行。 这些任务与web服务通信,并应用5分钟的web服务超时。 然而,我已经看到,在某些情况下,超时被忽略,线程在API调用上“挂起”——因此,我想取消所有耗时超过5分钟的任务

目前,我有一个期货列表,我反复浏览它们,并称之为future。直到所有任务都完成。现在,我看到了未来。get-overloaded方法会超时,并在任务未在该窗口中完成时抛出超时。因此,我想到了一种方法,在那里我可以创造未来。get()带有timeout,如果出现TimeoutException,我将执行一个future。取消(true)以确保此任务被中断

我的主要问题
1.带超时的get是解决此问题的最佳方法吗
2.是否有可能我正在等待一个尚未放入线程池的任务(不是活动工作线程)的get调用。在这种情况下,我可能会终止一个线程,当它启动时,它实际上可能会在要求的时间限制内完成

如有任何建议,将不胜感激


共 (4) 个答案

  1. # 1 楼答案

    Is the get with a timeout the best way to solve this issue?

    是的,在未来对象上get(timeout)是完全可以的,如果未来指向的任务已经执行,它将立即返回。如果任务尚未执行或正在执行,则将等待超时,这是一种良好的做法

    Is there the possibility that I'm waiting with the get call on a task that hasnt yet been placed on the thread pool(isnt an active worker)

    只有在将任务放置在线程池中时,才能获得Future对象,因此如果不将任务放置在线程池中,就无法对任务调用get()。是的,有一种可能性是,这项任务还没有被一名自由工作者承担

  2. # 2 楼答案

    当然,您可以使用

    task.cancel(true)

    这是完全合法的。但是如果线程正在“运行”,这将中断线程

    如果线程正在等待获取一个内在锁,那么“中断”请求除了设置线程的中断状态之外没有任何效果在这种情况下,您无法采取任何措施来阻止它对于要发生的中断,线程应该通过获取它正在等待的锁(可能需要5分钟以上)从“阻塞”状态出来。这是使用“内在锁定”的一个限制

    但是,您可以使用显式锁类来解决这个问题。您可以使用“锁定”接口的“可中断锁定”方法来实现这一点。“lockInterruptibly”将允许线程尝试获取锁,同时保持对中断的响应。下面是一个实现这一点的小例子:

    public void workWithExplicitLock()throws InterruptedException{
    Lock lock = new ReentrantLock();
    lock.lockInterruptibly()();
    try {
    // work with shared object state
    } finally {
    lock.unlock();
    }
    

    }

  3. # 3 楼答案

    你所说的方法是可以的。但最重要的是,在设置超时阈值之前,您需要知道线程池大小和超时的最佳值是多少。做一个压力测试,它将显示您配置为Threadpool一部分的工作线程数是否正常。这甚至可以减少超时值。所以我觉得这个测试是最重要的

    get上的超时完全可以,但是如果任务引发TimeoutException,则应该添加以取消该任务。如果您正确地执行了上述测试,并将线程池大小和超时值设置为理想值,那么您甚至可能不需要从外部取消任务(但您可以将其作为备份)。是的,有时在取消一项任务时,你可能会取消一项执行者尚未执行的任务

  4. # 4 楼答案

    1. 带超时的get是解决此问题的最佳方法吗

      • 这是不够的。例如,如果您的任务不是为响应中断而设计的,它将继续运行或只是被阻止
    2. 是否有可能我正在等待一个尚未放入线程池的任务(不是活动工作线程)的get调用。在这种情况下,我可能会终止一个线程,当它启动时,它实际上可能会在要求的时间限制内完成

      • 是的,如果您的线程池配置不正确,您可能会取消从未计划运行的as任务

    当任务包含不可中断的阻塞时,下面的代码片段可能是使任务对中断做出响应的方法之一。此外,它不会取消未计划运行的任务。这里的想法是通过关闭套接字、数据库连接等来覆盖中断方法和关闭正在运行的任务。这段代码并不完美,您需要根据需要进行更改,处理异常等

    class LongRunningTask extends Thread {
    private  Socket socket; 
    private volatile AtomicBoolean atomicBoolean;
    
    
    public LongRunningTask() {
        atomicBoolean = new AtomicBoolean(false);
    }
    
    @Override
    public void interrupt() {
        try {
            //clean up any resources, close connections etc.
            socket.close();
        } catch(Throwable e) {
        } finally {
            atomicBoolean.compareAndSet(true, false);
            //set the interupt status of executing thread.
            super.interrupt();
        }
    }
    
    public boolean isRunning() {
        return atomicBoolean.get();
    }
    
    @Override
    public void run() {
        atomicBoolean.compareAndSet(false, true);
        //any long running task that might hang..for instance
        try {
            socket  = new Socket("0.0.0.0", 5000);
            socket.getInputStream().read();
        } catch (UnknownHostException e) {
        } catch (IOException e) {
        } finally {
    
        }
    }
    }
    //your task caller thread
    //map of futures and tasks 
        Map<Future, LongRunningTask> map = new HashMap<Future, LongRunningTask>();
        ArrayList<Future> list = new ArrayList<Future>();
        int noOfSubmittedTasks = 0;
    
        for(int i = 0; i < 6; i++) {
            LongRunningTask task = new LongRunningTask();
            Future f = execService.submit(task);
            map.put(f, task);
            list.add(f);
            noOfSubmittedTasks++;
        }
    
        while(noOfSubmittedTasks > 0) {
            for(int i=0;i < list.size();i++) {
                Future f = list.get(i);
                LongRunningTask task = map.get(f);
                if (task.isRunning()) {
                    /*
                     * This ensures that you process only those tasks which are run once
                     */
                    try {
                        f.get(5, TimeUnit.MINUTES);
                        noOfSubmittedTasks--;
                    } catch (InterruptedException e) {
                    } catch (ExecutionException e) {
                    } catch (TimeoutException e) {
                                                //this will call the overridden interrupt method
                        f.cancel(true);
                        noOfSubmittedTasks--;
                    }
                }
    
            }
        }
        execService.shutdown();