有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

初学者Java多线程问题

我正在制作的Java多线程文件爬虫有问题。我的问题是,我有一个工作队列,它是一个linkedBlockingQueue,包含我想用线程爬过的文件名,每个线程将take()从工作队列中,在扫描文件时,它可能put()另一个文件名进入工作队列(这是一个依赖项检查程序)。因此,我从来没有真正确定工作何时全部完成,当所有线程试图从(最终)空工作队列中take()时,它们最终将进入等待状态

所以我想我的问题是,有没有一种有效的方法可以在所有工作完成后(当所有线程都进入等待状态时)终止所有线程?目前,我只在主线程上使用sleep(),然后在所有工作线程上使用interrupt()

抱歉,如果这个问题听起来很困惑


共 (3) 个答案

  1. # 1 楼答案

    有一种叫做Poison Pill的模式对这个很有好处。基本上,当生产者完成时,在队列中插入一个特殊值,告诉消费者停止。您可以为每个消费者插入一颗药丸,或者,一旦消费者获得毒药药丸,将其返回到下一个消费者的队列中。因为听起来你只是在排队

    public static final String POISON_PILL = "DONE";
    

    或者在Java8中,使用Optional来包装您的值,然后就没有了药丸

    BlockingQueue<Optional<...>> queue;
    

    另一种选择是使用ExecutorService(实际上由BlockingQueue支持),并将每个文件作为自己的任务提交,然后在完成时使用executorService.shutdown()。这样做的问题是,它将代码连接得比需要的更紧密,并且使重用数据库和HTTP连接等资源变得更加困难

    我会避免打断你的员工给他们发信号,因为这会导致阻塞IO操作失败

  2. # 2 楼答案

    我以前遇到过这个问题,我找到的唯一方法是向BlockingQueue发送一个特殊的标记对象。当队列.take()结束对象时,如果这是标记,则Thread结束自身

    我尝试过其他解决方案,比如唤醒线程并检测异常,但没有成功

  3. # 3 楼答案

    你可以使用下面的方法。如果需要,可以添加观察者模式。 或者简单地说——收集等待线程的列表,然后中断它们,而不是用死亡数据包发送信号

    public class AccessCountingLinkedPrioQueue<T> {
    
        private final LinkedBlockingQueue<T>    mWrappingQueue                  = new LinkedBlockingQueue<>();
        private final Object                    mSyncLockObj                    = new Object();
    
        private final int                       mMaxBlockingThreads;
        private final T                         mDeathSignallingObject;
    
        private volatile int                    mNumberOfThreadsInAccessLoop    = 0;
    
        public AccessCountingLinkedPrioQueue(final int pMaxBlockingThreads, final T pDeathSignallingObject) {
            mMaxBlockingThreads = pMaxBlockingThreads;
            mDeathSignallingObject = pDeathSignallingObject;
        }
    
        public T take() throws InterruptedException {
            final T retVal;
            synchronized (mSyncLockObj) {
                ++mNumberOfThreadsInAccessLoop;
            }
            synchronized (mWrappingQueue) {
                if (mNumberOfThreadsInAccessLoop >= mMaxBlockingThreads && mWrappingQueue.isEmpty()) signalDeath();
                retVal = mWrappingQueue.take();
            }
            synchronized (mSyncLockObj) {
                 mNumberOfThreadsInAccessLoop;
            }
            return retVal;
        }
    
        private void signalDeath() {
            for (int i = 0; i < mMaxBlockingThreads; i++) {
                mWrappingQueue.add(mDeathSignallingObject);
            }
        }
    
        public int getNumberOfThreadsInAccessLoop() {
            return mNumberOfThreadsInAccessLoop;
        }
    }
    
    class WorkPacket {
        // ... your content here
    }
    
    class MultiThreadingBoss {
        static public final WorkPacket  DEATH_FROM_ABOVE    = new WorkPacket();
    
        public MultiThreadingBoss() {
            final int THREADS = 7;
            final AccessCountingLinkedPrioQueue<WorkPacket> prioQ = new AccessCountingLinkedPrioQueue<>(THREADS, DEATH_FROM_ABOVE);
            for (int i = 0; i < THREADS; i++) {
                final ThreadedWorker w = new ThreadedWorker(prioQ);
                new Thread(w).start();
            }
        }
    }
    
    class ThreadedWorker implements Runnable {
        private final AccessCountingLinkedPrioQueue<WorkPacket> mPrioQ;
    
        public ThreadedWorker(final AccessCountingLinkedPrioQueue<WorkPacket> pPrioQ) {
            mPrioQ = pPrioQ;
        }
    
        @Override public void run() {
            while (true) {
                try {
                    final WorkPacket p = mPrioQ.take();
                    if (p == MultiThreadingBoss.DEATH_FROM_ABOVE) break; // or return
    
                    // ... do your normal work here
    
                } catch (final InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }