有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

用于异步处理的java短期执行器服务

我有一个方法,可以以fire and forget的方式执行异步请求

方法的实现如下所示:

private void publishWorkItem(final Object payload, final ZkWorkCompleteCallback callback)
{
    if (payload == null)
        throw new NullPointerException();

    final ExecutorService executor = Executors.newSingleThreadExecutor(PUBLISH_WORK_THREAD_FACTORY);

    try
    {
        executor.execute(() -> {

            try
            {
                if (callback != null)
                {
                    final ZkWorkItem retval = publishWorkItem(payload);
                    callback.onCompleted(retval);
                }
            }
            catch (final InterruptedException e)
            {
                // suppressed
            }
            catch (final Exception e)
            {
                LOGGER.error("Unhandled exception", e);

                if (callback != null)
                    callback.onError(e);
            }
        });
    }
    finally
    {
        executor.shutdown();
    }
}

问题是我正在为每个异步请求创建新的ExecutorServiceExecutors.newSingleThreadExecutor,而不是使用固定线程池。原因是publishWorkItem(payload)方法使用了CountDownLatch#await(),而CountDownLatch#await()又会阻塞正在执行的线程,因为is等待Watcher完成。这可能会很快耗尽固定大小的池

简化的publishWorkItem(payload)代码

  final CountDownLatch latch = new CountDownLatch(1);

        zkClient.exists(pathToWatch, new Watcher()
        {
            @Override
            public void process(final WatchedEvent event)
            {
                try
                {
                    extractAndDelete(baos, event.getPath());
                }
                catch (final Exception e)
                {
                    LOGGER.error("Unable to perform cleanup", e);
                }
                finally
                {
                    latch.countDown();
                }
            }
        }, true);

       ------ THIS IS THE PROBLEM (Blocks current thread) ------ 
       latch.await(); 

所以我的问题是:有没有更好的方法来解决这类问题

我确实分析了应用程序,但没有发现任何性能问题,我担心的是它会创建大量线程


共 (1) 个答案

  1. # 1 楼答案

    你为什么不使用ExecutorService.newCachedThreadPool()

    根据javadoc,它适合您的用例

    These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks ... will reuse previously constructed threads if available

    不是在每次调用publishWorkItem()时创建一个新的单线程池,而是创建一个缓存线程池,用于所有查询。线程的数量由Integer.MAX_VALUE限制,因此您不会像固定线程池那样受到限制,但总体上应该创建更少的线程