有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程为什么自定义阻塞队列在Java中不是线程安全的

我只是想用ReentrantLock实现一个阻塞队列,我定义了两个条件fullempty,源代码如下:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();

    private Condition full = lock.newCondition();
    private Condition empty = lock.newCondition();

    private Integer maxLength = 1 << 4;

    private Integer putIndex = 0, takeIndex = 0;
    private Integer count = 0;

    private Object[] value;

    public BlockQueue(){
        value = new Object[maxLength];
    }

    public BlockQueue(Integer maxLength){
        this.maxLength = maxLength;
        value = new Object[maxLength];
    }

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            if (count.equals(maxLength)){
                log.info("The queue is full!");
                full.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            if (count == 0){
                empty.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            full.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

在两个使用者线程和一个提供者线程中进行测试时,count在某个偶然时间内小于零
为什么阻塞队列不是线程安全的,谁能帮我,给我一些指导?非常感谢你,马赫

更新(2018/10/17)

如果我只使用一个Condition,它能正常运行吗?源代码如下:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    ...

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            while (count.equals(maxLength)){
                log.info("The queue is full!");
                condition.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            while (count == 0){
                condition.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            condition.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

共 (2) 个答案

  1. # 1 楼答案

    类似的问题:Why should wait() always be called inside a loop

    解释

    考虑这种情况:

    1. 消费者1lock.lock();上被阻止
    2. 消费者2在^{上被阻止
    3. producer持有锁并向队列中添加一个元素,该元素生成count = 1并调用empty.signal();
    4. 消费者2获取此信号并从^{唤醒,它需要重新获取锁,而消费者1在它前面
    5. cosumer 1获取锁,它发现count是1,所以它将count减为0
    6. cosumer 2获取锁,因为它已执行

      if (count == 0){    < - consumer 2 will not re-check this condition
          empty.await();  
      }
      

      cosumer 2认为队列不是空的,然后执行:

      takeIndex = takeIndex % maxLength;
      val = value[takeIndex++];
      count ;
      

      这使得计数指令为0

    解决方案

    使用while代替if保证消费者2将重新检查队列是否为空,从而保证count >= 0

    while (count == 0){
        empty.await();
    }
    

    另外,最好使用Product方法执行相同的操作:

    while (count.equals(maxLength)){
        log.info("The queue is full!");
        full.await();
    }
    
  2. # 2 楼答案

    一件显而易见的事情是,这种情况可以“唤醒”,而无需相应地调用“信号”。因此,您需要使用“while”,而不是使用“if”。例如:

    while (count == 0) {
        empty.await();
    }
    

    请参见此处的javadoc:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html

    The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:

    • Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or
    • Some other thread invokes the signalAll() method for this Condition; or
    • Some other thread interrupts the current thread, and interruption of thread suspension is supported; or
    • A "spurious wakeup" occurs.