有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java并行线程需要到达一个点,等待另一个线程,然后恢复最佳方法?

以下是(简化的)场景:

  • 我有线程A,B,C并行运行
  • 一旦所有3个线程都达到执行的第7步,它们就必须等待。(每个线程将在不同的时间到达步骤7,这无法提前知道)
  • 线程D随后启动并运行到完成
  • 只有在D完成后,A、B和C才能恢复并运行到完成

用什么样的工具或设计来解决这个问题

到目前为止,我看到的并发和信号量示例似乎只处理两个线程,或者假设并行线程正在做类似的事情,但只是共享一个var或消息传递。我还没有发现任何与上述场景相关的东西。我正在继续调查,并将更新任何发现

如果倒计时锁存器不需要退出其他线程,那么倒计时锁存器可能会工作。或者,如果它可以反向工作,则使n线程等待线程D退出

我对并发(concurrency)还比较陌生(大学课程不会在需要的时间附近提供它),所以请容忍我。如果有人遇到类似问题,请将此链接放到此处:HowToDoInJava - Concurrency


共 (2) 个答案

  1. # 1 楼答案

    我会用Phaser

    import java.util.concurrent.Phaser;
    
    class Scratch {
        public static class ThreadABCWorker implements Runnable {
            String threadName;
            Phaser phaser;
    
            public ThreadABCWorker(String threadName, Phaser phaser) {
                this.threadName =  threadName;
                this.phaser = phaser;
            }
    
            @Override
            public void run() {
                System.out.println("Thread " + threadName + " has started");
                // do steps 1-7 as part of phase 0
                phaser.arriveAndAwaitAdvance();
                // All the work for phase 1 is done in Thread D, so just arrive again and wait for D to do its thing
                phaser.arriveAndAwaitAdvance();
                System.out.println("Continue Thread" + threadName);
            }
        }
    
    
        public static void main(String[] args) {
            var phaser = new Phaser(4);
            var threadA = new Thread(new ThreadABCWorker("A", phaser));
            var threadB = new Thread(new ThreadABCWorker("B", phaser));
            var threadC = new Thread(new ThreadABCWorker("C", phaser));
            var threadD = new Thread(() -> {
                phaser.arriveAndAwaitAdvance(); // D shouldn't start doing its thing until phase 1
                System.out.println("Thread D has started");
    
                try {
                    System.out.println("sleep 100");
                    Thread.sleep(100);
                    System.out.println("Thread D has finished");
                    phaser.arriveAndDeregister(); // All done, let ths other threads continue
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            threadA.start();
            threadB.start();
            threadC.start();
            threadD.start();
        }
    }
    

    输出示例:

    Thread A has started
    Thread C has started
    Thread B has started
    Thread D has started
    sleep 100
    Thread D has finished
    Continue ThreadB
    Continue ThreadA
    Continue ThreadC
    
  2. # 2 楼答案

    下面是我用CountDownLatch本身想到的。它不一定只有在线程完成时才需要倒计时

    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Sample class that utilizes very simple thread creation/execution.
     * <p>
     * DISCLAIMER: This class isn't meant to show all cross-cutting concerns. It just focuses on the task presented.
     * Naming conventions, access modifiers, etc. may not be optimal.
     */
    public class ATech {
    
        private static long startThreadTime;
        private static CountDownLatch primaryCountDownLatch = new CountDownLatch(3);
        private static CountDownLatch secondaryCountDownLatch = new CountDownLatch(1);
    
        public static void main(String[] args) {
            List<Thread> threads = Arrays.asList(
                    new Thread(new ThreadType1("A", 1, 5)),
                    new Thread(new ThreadType1("B", 5, 1)),
                    new Thread(new ThreadType1("C", 10, 10)),
                    new Thread(new ThreadType2("D", 5))
            );
    
            startThreadTime = System.currentTimeMillis();
            System.out.println("Starting threads at (about) time 0");
    
            threads.forEach(Thread::start);
    
            try {
                for (Thread thread : threads) {
                    thread.join();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    
            System.out.println("All threads completed at time " + (System.currentTimeMillis() - startThreadTime));
        }
    
        static class ThreadType1 implements Runnable {
    
            public ThreadType1(String name, int executionTimePreWaitInSeconds, int executionTimePostWaitInSeconds) {
                this.execTimePreWait = executionTimePreWaitInSeconds;
                this.execTimePostWait = executionTimePostWaitInSeconds;
                this.name = name;
            }
    
            int execTimePreWait;
            int execTimePostWait;
            String name;
    
            @Override
            public void run() {
                System.out.println("Execution thread " + name + ". Waiting for " + execTimePreWait + " seconds");
    
                try {
                    Thread.sleep(execTimePreWait * 1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
    
                // Done doing whatever we were doing, now we let the other thread now we're done (for now)
                System.out.println("Thread " + name + " completed at time " + (System
                        .currentTimeMillis() - startThreadTime) + ". Waiting for latch");
    
                primaryCountDownLatch.countDown();
    
                try {
                    secondaryCountDownLatch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
    
                }
    
                System.out.println(
                        "Thread " + name + " awoken again at time " + (System.currentTimeMillis() - startThreadTime));
                System.out.println("Thread " + name + " will sleep for " + execTimePostWait + " seconds");
    
                try {
                    Thread.sleep(execTimePostWait * 1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
    
                System.out.println(
                        "Thread " + name + " completed fully at time " + (System.currentTimeMillis() - startThreadTime));
            }
        }
    
        static class ThreadType2 implements Runnable {
    
            String name;
            int execTime;
    
            public ThreadType2(String name, int executionTimeInSeconds) {
                this.name = name;
                this.execTime = executionTimeInSeconds;
            }
    
            @Override
            public void run() {
                System.out.println("Thread " + name + " waiting for other threads to complete");
                try {
                    primaryCountDownLatch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
    
                System.out.println("Thread " + name + " woke up at time " + (System.currentTimeMillis() - startThreadTime));
                System.out.println("Thread " + name + " will work for " + execTime + " seconds");
    
                try {
                    Thread.sleep(execTime * 1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
    
                System.out.println("Thread " + name + " completed at time " + (System.currentTimeMillis() - startThreadTime));
                secondaryCountDownLatch.countDown();
            }
        }
    }
    

    样本输出:

    Starting threads at (about) time 0
    Execution thread A. Waiting for 1 seconds
    Execution thread C. Waiting for 10 seconds
    Execution thread B. Waiting for 5 seconds
    Thread D waiting for other threads to complete
    Thread A completed at time 1033. Waiting for latch
    Thread B completed at time 5034. Waiting for latch
    Thread C completed at time 10034. Waiting for latch
    Thread D woke up at time 10034
    Thread D will work for 5 seconds
    Thread D completed at time 15034
    Thread A awoken again at time 15034
    Thread C awoken again at time 15034
    Thread A will sleep for 5 seconds
    Thread B awoken again at time 15034
    Thread B will sleep for 1 seconds
    Thread C will sleep for 10 seconds
    Thread B completed fully at time 16035
    Thread A completed fully at time 20034
    Thread C completed fully at time 25034
    All threads completed at time 25034
    
    Process finished with exit code 0