当ForkJoinPool结束时的java
我试图找出所有ForkJoinPool线程何时完成了任务。 我编写了这个测试应用程序(我使用System.out,因为它只是一个快速测试应用程序,并且没有错误检查/处理):
public class TestForkJoinPoolEnd {
private static final Queue<String> queue = new LinkedList<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
enqueue("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!customThreadPool.isTerminating()) {
String s = dequeue();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
}
static List<String> makeList() {
return Stream.generate(() -> makeString())
.limit(MAX_SIZE)
.collect(Collectors.toList());
}
static String makeString() {
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = leftLimit + (int)
(random.nextFloat() * (rightLimit - leftLimit + 1));
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
int sum = 0;
for (int i = 0; i < s.length(); i++) {
sum += s.charAt(i);
}
return (sum / SPEED_UP);
}
static void process(String s) {
StringBuilder sb = new StringBuilder(s);
long start = System.currentTimeMillis();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.currentTimeMillis();
sb.append(" slept for ")
.append((end - start))
.append(" milliseconds");
enqueue(sb.toString());
}
static void enqueue(String s) {
synchronized (queue) {
queue.offer(s);
}
}
static String dequeue() {
synchronized (queue) {
return queue.poll();
}
}
}
这段代码被卡住了,永远无法完成。如果我将while循环的条件更改为!customThreadPool.isQuiescent()
,它将终止循环,并将计数器和队列大小设置为1
我应该使用什么来确定线程何时完成
# 1 楼答案
一个
ExecutorService
不会因为一个作业(及其子作业)完成而自行终止。线程池背后的整个理念是可重用的因此,它只有在应用程序调用
shutdown()
时才会终止您可以使用
isQuiescent()
来确定是否没有挂起的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用submit
返回的未来来检查实际作业的完成情况要干净得多在任何一种情况下,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中是否有挂起的元素
此外,建议使用线程安全的
BlockingQueue
实现,而不是用synchronized
块装饰LinkedList
。再加上其他一些需要清理的东西,代码如下所示:如果你在接收端的
sleep
调用是为了模拟一些工作负载,而不是等待新的项目,那么你也可以使用但逻辑没有改变。在
future.isDone()
返回true
后,我们必须重新检查队列中的挂起元素。我们只保证不会有新物品到达,而不是队列已经空了作为旁注,
makeString()
方法可以进一步改进,以甚至