有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java并发:multiproducer-oneconsumer

我有一种情况,不同的线程填充一个队列(生产者)和一个消费者从这个队列中检索元素。我的问题是,当从队列中检索到其中一个元素时,会丢失一些元素(丢失信号?)。生产商代码为:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消费者代码为:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

当代码运行时,我有时会添加20个元素并检索20个,但在其他情况下,检索的元素少于20个。知道怎么解决吗


共 (3) 个答案

  1. # 1 楼答案

    同时使用ConcurrentLinkedQueue和同步看起来不是个好主意。它首先违背了并发数据结构的目的

    ConcurrentLinkedQueue数据结构没有问题,用BlockingQueue替换它可以解决问题,但这不是根本原因

    问题在于排队。等等(10)。这是一种定时等待方法。它将在10毫秒后再次获取锁

    1. 通知(queue.notify())将丢失,因为如果已过10毫秒,则没有使用者线程在等待它

    2. 生产者将无法添加到队列中,因为消费者再次声明了锁,所以他们无法获取锁

    移动到BlockingQueue解决了您的问题,因为您删除了等待(10)代码,而等待和通知由BlockingQueue数据结构处理

  2. # 2 楼答案

    代码中的问题可能是因为使用了notify而不是notifyAll。前者只会唤醒一个线程,如果有一个线程正在等待锁。这允许在没有线程等待且信号丢失的情况下出现争用状态。notifyAll将通过要求所有线程唤醒以检查它们是否可以获得锁来强制执行正确性,但性能代价很小

    这在{a1}中得到了最好的解释(见第150页)。第二版删除了这个技巧,因为程序员应该使用java。util。并发,它提供了更强的正确性保证

  3. # 3 楼答案

    我建议您使用阻塞队列而不是队列。LinkedBlockingDeque可能是您的理想人选

    您的代码如下所示:

    void send(String message) {
        synchronized (queue) {
            queue.put(message);
            System.out.println("SIZE: " + queue.size());
        }
    }
    

    然后你就需要

    queue.take()
    

    在您的消费者线程上

    想法是这样的。take()将阻塞,直到队列中有一个可用项,然后正好返回一个(这就是我认为您的实现遇到的问题:轮询时缺少通知)。put()负责为您执行所有通知。无需等待/通知