并行处理Java 8并行流findFirst
假设我们有一个这样的工人列表:
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
我想找到第一个完成工作的工人,所以:
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
但是有一个问题,我不想等待所有工人完成他们的工作,然后再找到第一个工人,而是第一个工人在他完成工作后就找到了
public class Test {
public static void main(String[] args) {
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
if (first != null) {
System.out.println("id : " + first.id);
}
}
static class Worker {
int id;
Worker(int id) {
this.id = id;
}
boolean finish() {
int t = id * 1000;
System.out.println(id + " -> " + t);
try {
Thread.sleep(t);
} catch (InterruptedException ignored) {
}
return true;
}
}
}
有没有办法用java.util.Stream
来实现它
谢谢
# 1 楼答案
你似乎对^{有严重的误解
Stream
并不意味着启动工人。事实上,如果你使用findFirst
,它可能只启动第一个工人,而不启动任何工人。因此,它也不会等待“所有工作人员完成”,而是只等待当前挂起的线程。但是,由于您有一个相当小的流,因此可能所有的工作线程都已经启动,因为您的环境中有尽可能多的可用线程。但这并不是一个可以保证的行为请注意,如果使用顺序流而不是并行流,它肯定只处理第一项(因为它返回
true
),而不处理其他项。但由于流实现无法预测结果,它将尊重您通过并行执行“加速”操作的请求,并可能开始使用更多线程提前处理更多项# 2 楼答案
我知道这是个老问题,但在这里找到了一些不错的解决方案: https://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/
在调用下:
将流更改为可调用的集合。看起来很干净
# 3 楼答案
当您使用
finish
方法作为流的过滤器时,这意味着为了计算特定工作程序的过滤器谓词,工作程序必须完成其工作但是,当您以并行流的形式运行此代码时,可能会同时对多个工作进程应用过滤器,在这种情况下,第一个完成的工作进程将为您提供输出。但是,您无法控制并行流将使用多少线程。它可能会决定一些辅助线程应该在同一个线程上处理,在这种情况下,其中一些根本不会被处理(因为终端操作要求只有一个辅助线程完成其处理)
因此,如果您的目标是同时对所有worker执行
finish
,那么就不能使用流(甚至不能使用并行流)# 4 楼答案
您可以尝试使用Java的反应式扩展(
RxJava
)中的Observable
,而不是使用Stream
。下面是示例代码示例输出: