有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java中具有可重入锁和条件的并发生产者-消费者场景

我用可重入锁和条件编写了一个生产者-消费者程序。它工作正常,但我不确定实施是否正确。此外,这似乎不是最优的。有人能验证一下这是否是一个正确的实现吗?此外,你能告诉我,如何优化它吗?比如,在真正需要的地方锁定

public class TestRL {

static class Observed {
    boolean filled = false;

    public void setFilled(boolean filled) {
        this.filled = filled;
    }

    public boolean getFilled() {
        return filled;
    }
}

static Observed observed = new Observed();

static class Consumer implements Runnable {
    Observed observed;
    ReentrantLock lock;
    Condition condition;

    Consumer(Observed observed, ReentrantLock lock, Condition condition) {
        this.observed = observed;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                if (observed.getFilled() == false) {
                    try {
                        System.out.println("consumer waiting");
                        condition.await();
                        System.out.println("consumed");

                        Thread.sleep(400 + 1000 * i % 2);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    observed.setFilled(false);
                    condition.signalAll();
                }else{
                    observed.setFilled(false);
                    condition.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }

    }
}

static class Producer implements Runnable {
    Observed observed;
    ReentrantLock lock;
    Condition condition;

    Producer(Observed observed, ReentrantLock lock, Condition condition) {
        this.observed = observed;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                if (observed.getFilled() == true) {
                    try {
                        System.out.println("producer waiting");
                        condition.await();
                        System.out.println("produced");
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    observed.setFilled(true);
                    condition.signalAll();
                }else{
                    observed.setFilled(true);
                    condition.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }

    }
}

/**
 * @param args
 */
public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    Producer producer = new Producer(observed, lock, condition);
    Consumer consumer = new Consumer(observed, lock, condition);
    Thread t1 = new Thread(producer);
    Thread t2 = new Thread(consumer);
    t1.start();
    t2.start();

}

}


共 (4) 个答案

  1. # 1 楼答案

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class prodConsumeJava8 
    {
    
        public static void main(String[] args)
        {
            BlockingQueue<Integer> que=new LinkedBlockingQueue<>();
    
            final Runnable singleproducer = () ->{
                try
                {
                    for (int i = 0; i < 5; i++) 
                    {
                        System.out.println("Produced: " + i);
                        que.put(i);
                    }
    
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            };
    
            final Runnable singleConsumer = () ->{
    
                while(true)
                {
                    try
                    {
                        System.out.println("Consumer"+que.take());
                    }
                    catch (Exception e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            };
    
            new Thread(singleproducer).start();
            new Thread(singleConsumer).start();
    
        }
    
    }
    
  2. # 2 楼答案

        import java.util.ArrayList;
        import java.util.List;
        import java.util.concurrent.locks.Condition;
        import java.util.concurrent.locks.ReentrantLock;
        import java.util.logging.Level;
        import java.util.logging.Logger;
    
        /*
         * To change this license header, choose License Headers in Project Properties.
         * To change this template file, choose Tools | Templates
         * and open the template in the editor.
         */
        /**
         *
         * @author sakshi
         */
        public class ReentrantLockDemo {
    
            static List<Integer> list = new ArrayList<Integer>();
            static ReentrantLock lock = new ReentrantLock();
            static Condition ProdCons = lock.newCondition();
    
          static class Producer extends Thread {
    
                List<Integer> list;
                Producer(List<Integer> list) {
                    this.list = list;
    
                }
    
                public void run() {
    
                    for (int i = 0; i < 10; i++) {
                        lock.lock();
    
                        if (list.size() >=1) {
                            try {
                                ProdCons.await();
                            } catch (InterruptedException ex) {
                                Logger.getLogger(ReentrantLockDemo.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
    
                        list.add(i);
                        System.out.println("produce="+i);
                         ProdCons.signalAll();
    
                          lock.unlock(); 
    
                    }
    
                }
    
            }
    
            static class Consumer extends Thread {
    
                List<Integer> list;
    
    
                Consumer(List<Integer> list) {
                    this.list = list;
    
                }
    
                @Override
                public void run() {
    
                    for (int i = 0; i < 10; i++) {
                          lock.lock();
                        while (list.isEmpty()) {
                            try {
                                ProdCons.await();
                            } catch (InterruptedException ex) {
                                Logger.getLogger(ReentrantLockDemo.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
    
                            System.out.println("consume=" +list.remove(0));
                            ProdCons.signalAll();
    
                            lock.unlock();
                        }
    
                }
            }
    
    
            public static void main(String[] args) {
                Producer produce = new Producer(list);
                Consumer consume = new Consumer(list);
                produce.start();
                consume.start();
            }
        }
    
    
    output:
    produce=0
    consume=0
    produce=1
    consume=1
    produce=2
    consume=2
    produce=3
    consume=3
    produce=4
    consume=4
    produce=5
    consume=5
    produce=6
    consume=6
    produce=7
    consume=7
    produce=8
    consume=8
    produce=9
    consume=9
    
  3. # 3 楼答案

    下面是使用ReentrantLock&;条件以防万一,如果有人想要的话

    package reentrant_prodcons;
    
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    
    public class Reentrant_ProdCons {
    
        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) {
            // TODO code application logic here
    
            Queue<Integer> queue=new LinkedList<Integer>();
            ReentrantLock lock=new ReentrantLock();
            Condition con=lock.newCondition();
            final int size = 5;
    
            new Producer(lock, con, queue, size).start();
            new Consumer(lock, con, queue).start();
    
        }
    
    }
    
    
    class Producer extends Thread{
    
        ReentrantLock  lock;
        Condition con;
        Queue<Integer> queue;
        int size;
    
        public Producer(ReentrantLock lock, Condition con, Queue<Integer> queue, int size) {
            this.lock = lock;
            this.con = con;
            this.queue = queue;
            this.size=size;
        }
    
    
        public void run(){
            for(int i=0;i<10;i++){
                lock.lock();
                while(queue.size()==size){
                    try {
                        con.await();
                    } catch (InterruptedException ex) {
                        Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
                queue.add(i);
                System.out.println("Produced : "+i);
                con.signal();
                lock.unlock();
            }
        }
    
    }
    
    class Consumer extends Thread{
    
    
        ReentrantLock lock;
        Condition con;
        Queue<Integer> queue;
    
    
        public Consumer(ReentrantLock lock, Condition con, Queue<Integer> queue) {
            this.lock = lock;
            this.con = con;
            this.queue = queue;
        }
    
        public void run(){
            for(int i=0;i<10;i++){
               lock.lock();
               while(queue.size()<1){
                   try {
                       con.await();
                   } catch (InterruptedException ex) {
                       Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
                   }
               }
                System.out.println("Consumed : "+queue.remove());
                con.signal();
                lock.unlock();
            }
        }
    }
    
  4. # 4 楼答案

    希望有帮助。通过更改使用者/生产者等待时间,可以轻松检查代码中的溢出/下溢情况

    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /*
       Re-entrant Locks can be acquired again by the same thread and the thread 
       keeps a count of that.
       Then that thread has to unlock it same number of times.
    
       normally usually it is only once.
    
       unlock should be called in finally and code should be in try block.
    
       This also has a wait in form of Condition.await(). (which obviouly should be 
       called inside a locked/synchronized block)
    
       This is basically helpful to avoid deadlocks which we will see in next 
       program
    
     */
    public class ReentrantLock1 {
    
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    
    private static Queue<Integer> queue = new LinkedList<>() ;
    private static int maxQueueSize = 10;
    
    private static int consumer_wait_time = 00;
    private static int producer_wait_time = 300;
    
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> producer());
        Thread t2 = new Thread(() -> consumer());
    
        t1.start();
        t2.start();
    
        try {
            t1.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
    }
    
    private static void producer() {
        int i = 1 ;
        while(true) {
            try {
                Thread.sleep(producer_wait_time);
                lock.lock();
                if(queue.size() == maxQueueSize)
                    condition.await();
                queue.add(i);
                i++;
                condition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
        }
    
    }
    
    private static void consumer() {
        while(true) {
            try {
                Thread.sleep(consumer_wait_time);
                lock.lock();
                System.out.print("queue size is: "+ queue.size());
                if(queue.size()==0)
                    condition.await();
                int i = queue.peek();
                queue.remove();
                System.out.println("Element on top is: "+i);
                condition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //unlock should be called in finally and code should be in try block.
                lock.unlock();
            }
        }
     }
    }