有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何处理CompletableFuture中未捕获的异常。运行异步

我们的应用程序有一些异步运行的代码失败了。像这样:

CompletableFuture.runAsync(
    () -> { throw new RuntimeException("bad"); },
    executorService
);

我们需要能够捕获这些错误的默认异常处理代码,以防特定的用户忘记处理异常(这来自于生产缺陷)

这显然很棘手。在Handling exceptions from Java ExecutorService tasks中给出的答案不起作用

它依赖于任务是Future<?>,然后对其调用get(),导致再次抛出异常。但是runAsync代码并非如此

runAsync创建一个java.util.concurrent.CompletableFuture.AsyncRun类,该类似乎试图抑制所有异常。尽管它本身是一个Future,但它并不表示它是isDone(),并且似乎无法从中获得异常

那么,考虑到下面的样板文件,我们应该如何捕捉这些异常呢

请注意,我们确实希望在runAsync代码中捕获所有未处理的异常,而不是可以添加到每个runAsync调用中的内容。很容易忘记为每一个添加处理代码

public class ExceptionTest {
    public static void main(String[] args) throws RuntimeException {
        ExecutorService executorService = new ThreadPoolExecutor(
            1, 1, 0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue()
        ) {
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);

                // TODO: Magically extract the exception from `r`
            }
        };

        CompletableFuture.runAsync(
            () -> { throw new RuntimeException("bad"); },
            executorService
        );
    }
}

共 (1) 个答案

  1. # 1 楼答案

    因此,这是一个可怕的攻击,但它确实可以处理在使用runAsync时忘记调用exceptionally的情况。我希望看到更通用、更少黑客的解决方案

    它通过在执行AsyncRun之前拦截AsyncRun并修补exceptionally块来工作

    不过,说真的,詹奇。但在甲骨文改变runAsync的工作方式之前,它可能会起作用

        ExecutorService executorService = new ThreadPoolExecutor(
            1,
            1,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue()
        ) {
            @Override
            protected void beforeExecute(final Thread t, final Runnable r) {
                super.beforeExecute(t, r);
    
                if (r.getClass().getName().equals("java.util.concurrent.CompletableFuture$AsyncRun")) {
                    try {
                        final Field f = r.getClass().getDeclaredField("dep");
                        f.setAccessible(true);
                        ((CompletableFuture<?>) f.get(r)).exceptionally(e -> {
                            LoggerFactory.getLogger(ExceptionTest.class).error("Error in runAsync " + r, e);
                            UnsafeUtils.getUnsafe().throwException(e);
                            return null;
                        });
                    } catch (Exception e) {
                        System.out.println("Failed to hack CompletableFuture$AsyncRun to report exceptions.");
                    }
                }
            }
    
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
    
                if (t == null && r instanceof Future<?>) {
                    try {
                        Future<?> future = (Future<?>) r;
                        if (future.isDone()) {
                            future.get();
                        }
                    } catch (CancellationException ce) {
                        t = ce;
                    } catch (ExecutionException ee) {
                        t = ee.getCause();
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (t != null) {
                    LoggerFactory.getLogger(ExceptionTest.class).error("Error in async task " + r, t);
                }
            }
        };