用于异步处理的java短期执行器服务
我有一个方法,可以以fire and forget的方式执行异步请求
方法的实现如下所示:
private void publishWorkItem(final Object payload, final ZkWorkCompleteCallback callback)
{
if (payload == null)
throw new NullPointerException();
final ExecutorService executor = Executors.newSingleThreadExecutor(PUBLISH_WORK_THREAD_FACTORY);
try
{
executor.execute(() -> {
try
{
if (callback != null)
{
final ZkWorkItem retval = publishWorkItem(payload);
callback.onCompleted(retval);
}
}
catch (final InterruptedException e)
{
// suppressed
}
catch (final Exception e)
{
LOGGER.error("Unhandled exception", e);
if (callback != null)
callback.onError(e);
}
});
}
finally
{
executor.shutdown();
}
}
问题是我正在为每个异步请求创建新的ExecutorServiceExecutors.newSingleThreadExecutor
,而不是使用固定线程池。原因是publishWorkItem(payload)
方法使用了CountDownLatch#await()
,而CountDownLatch#await()
又会阻塞正在执行的线程,因为is等待Watcher
完成。这可能会很快耗尽固定大小的池
简化的publishWorkItem(payload)
代码
final CountDownLatch latch = new CountDownLatch(1);
zkClient.exists(pathToWatch, new Watcher()
{
@Override
public void process(final WatchedEvent event)
{
try
{
extractAndDelete(baos, event.getPath());
}
catch (final Exception e)
{
LOGGER.error("Unable to perform cleanup", e);
}
finally
{
latch.countDown();
}
}
}, true);
------ THIS IS THE PROBLEM (Blocks current thread) ------
latch.await();
所以我的问题是:有没有更好的方法来解决这类问题
我确实分析了应用程序,但没有发现任何性能问题,我担心的是它会创建大量线程
# 1 楼答案
你为什么不使用
ExecutorService.newCachedThreadPool()
根据javadoc,它适合您的用例
不是在每次调用
publishWorkItem()
时创建一个新的单线程池,而是创建一个缓存线程池,用于所有查询。线程的数量由Integer.MAX_VALUE
限制,因此您不会像固定线程池那样受到限制,但总体上应该创建更少的线程