有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用联接和正确处理中断执行的java多线程

我已经在我的服务层实现了多线程,并希望确保我已经处理了所有线程得到正确处理的情况。我不希望最后出现某种异常(如RuntimeEx或InterruptedEx),这会使我的应用程序处于糟糕的状态

我的代码如下。如果您能看到任何错误,请告诉我。建议是最受欢迎的。我正在使用Java6

public class MyRunnable implements Runnable {

    private List<MyData> list;
    private Person p;

    public MyRunnable(List<MyData> list, Person p) {
        this.list = list;  // this list is passed in and cannot be null
        this.p = p;
    }

    @Override
    public void run() {
        // before calling any of the services that gets data from the
        // database, check if the thread has been interrupted
        if (Thread.interrupted()) return;

        List<TypeA> aList;
        try {
            aList = getTypeAFromDatabase1(p);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        if (Thread.interrupted()) return;

        List<TypeB> bList;
        try {
            bList = getTypeBFromDatabase2(p);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        if (Thread.interrupted()) return;

        List<TypeC> cList;
        try {
            cList = getTypeCFromSomeWebService(p);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        MyData d = new MyData();
        d.setPerson(p);
        d.setTypeA(aList);
        d.setTypeB(bList);
        d.setTypeC(cList);
        list.add(d);
    }
}

使用Runnable的服务:

@JsonOperation
public static List<MyData> getMyData(MyParams params) throws Exception {

    List<Person> persons = params.getPersonList();

    try {
        // use synchronized list since all threads will add to this list
        List<MyData> retList = Collections.synchronizedList(new ArrayList<MyData>());

        List<Thread> threads = new ArrayList<Thread>();

        // For each person, start a new thread. It there are any runtime
        // exceptions thrown by any one thread, it will be caught by the
        // bigger try catch block. In case of runtime exception, we will
        // return back to the client right away but the other threads
        // are still processing
        try {

            for (Person p : persons) {
                // create a thread per person and start it
                Runnable task = new MyRunnable(retList, p);
                Thread worker = new Thread(task);
                threads.add(worker);
                worker.start();

                // remember the thread for later use
                threads.add(worker);
            }

            for (Thread thread : threads) {
                // wait for all threads (by waiting on one thread at a time)
                thread.join(3000);  //3 seconds between checking on this thread
            }

        } catch (RuntimeException e) {
            log.error(e);
            for (Thread thread : threads) {
                // try and send an interrupt to all threads so that they
                // don't fetch any more data from the database
                thread.interrupt();
            }
            throw e;
        }

        return retList;

    } catch (Exception e) {
        log.error(e);
        throw e;
    }
}

共 (1) 个答案

  1. # 1 楼答案

    在大多数情况下,使用任务比使用线程更容易。从ExecutorService开始,它限制线程的数量,并在所有服务操作中共享:

    // inject with IoC framework
    ExecutorService executor = Executors.newFixedThreadPool(10);
    

    您可以使用方法invokeAll为每个人执行任务。如果任务未在给定时间内完成,则剩余任务将自动取消。在这种情况下,调用相应的futureget方法时会引发异常。这意味着不需要额外的异常处理

    public List<MyData> getMyData(MyParams params) throws Exception {
        List<Callable<MyData>> tasks = new ArrayList<>();
        for (Person p : persons) {
            tasks.add(new Callable<MyData>() { // use Lambda in Java 8
                public MyData call() {
                    MyData d = new MyData();
                    d.setPerson(p);
                    d.setTypeA(getTypeAFromDatabase1(p));
                    d.setTypeB(getTypeBFromDatabase2(p));
                    d.setTypeC(getTypeCFromSomeWebService(p));
                    return d;
                }
            });
        }
        List<MyData> result = new ArrayList<>();
        for (Future<MyData> future : executor.invokeAll(tasks, 3000, TimeUnit.MILLISECONDS)) {
            result.add(future.get());
        }
        return result;
    }
    

    无需检查可调用中的中断状态。如果在其中一个方法中调用了阻塞操作,该方法将自动中止执行,并出现中断异常或其他异常(如果正确实现)。也可以设置中断状态,而不是引发异常。但是,对于具有返回值的方法来说,这没有什么意义