有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java线程之间的多线程数据交换

我需要做2个java线程数据交换。一个线程,即生产者,正在生成数据,第二个线程,即消费者,正在处理数据。我需要阻塞处理,即生产商应等待其数据被处理。我计划稍后在多个消费者/生产者线程中使用它,但决定从一种mvp/原型开始。 问题是,我无法想象实现这一点的技术。我首先尝试使用BlockingQueue为传入数据创建消费者,并使用方法将数据放入该队列。生产者调用此方法将数据放入队列,然后等待数据处理完成。使用者不断轮询队列以检测传入数据,并在数据到达时进行处理。为了向消费者发出数据已被处理的信号,我在数据类中使用信号量。下面是实现此场景的代码示例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class Adder
{
    Integer a, b, res;
    Semaphore processed = new Semaphore(0);

    Adder(int a, int b)
    {
        this.a = a;
        this.b = b;
        this.res = null;
    }

    int getA()
    {
        return (a);
    }

    int getB()
    {
        return (b);
    }

    int getRes()
    {
        return (res);
    }

    void compute()
    {
        res = a + b;
    }

    Semaphore getProcessed()
    {
        return (processed);
    }

    @Override
    public String toString()
    {
        return ("Adder: a=" + getA() + ", b=" + getB() + ", result " + (res == null ? "is undefined yet" : getRes()));
    }
}

class Consumer implements Runnable
{
    BlockingQueue<Adder> requests = new LinkedBlockingQueue<>();

    boolean processRequest(Adder adder) throws InterruptedException
    {
        boolean processingResult = false;
        requests.add(adder);
        processingResult = adder.getProcessed().tryAcquire(1, TimeUnit.SECONDS);
        return (processingResult);
    }

    @Override
    public void run()
    {
        System.out.println("Consumer thread started");
        while (!Thread.interrupted())
        {
            try
            {
                Adder adder = requests.poll(100, TimeUnit.MILLISECONDS);
                if (adder != null)
                {
                    adder.compute();
                    adder.getProcessed().release();
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                System.out.println("Consumer thread interrupted");
                break;
            }
        }
        System.out.println("Consumer thread completed");
    }
}

class Producer implements Runnable
{
    Consumer consumer;

    Producer(Consumer consumer)
    {
        this.consumer = consumer;
    }

    @Override
    public void run()
    {
        System.out.println("Producer thread started");
        while (!Thread.interrupted())
        {
            Adder adder = new Adder((int) (Math.random() * 20), (int) (Math.random() * 20));
            System.out.println("New request generated: " + adder);

            try
            {
                if (consumer.processRequest(adder))
                {
                    System.out.println("+ Request has been processed: " + adder);
                }
                else
                {
                    System.out.println("- Request processing failed: " + adder);
                }
            }
            catch (InterruptedException e)
            {
                System.out.println("Producer thread interrupted");
                break;
            }
        }
        System.out.println("Producer thread completed");
    }
}

public class ThreadsDataExchange
{

    public static void main(String[] args) throws InterruptedException
    {
        Consumer consumer = new Consumer();
        Producer producer = new Producer(consumer);
        Thread consumerThread = new Thread(consumer);
        Thread producerThread = new Thread(producer);

        consumerThread.start();
        producerThread.start();

        Thread.sleep(3000);
        System.out.println("Interrupting threads");
        producerThread.interrupt();
        producerThread.join();
        consumerThread.interrupt();
        consumerThread.join();
    }
}

但在这种情况下,由于我需要限制等待时间,可能会出现这样一种情况:生产者等待处理的时间会超时,假定数据尚未处理,但消费者会立即同时处理数据

因此,作为另一种方法,我认为对传入和处理的消息使用2个队列,但由于实际队列可能相当大,因此在结果队列中搜索处理过的消息需要很长时间。然后我有第三个选择,使用一种SynchronizedMap,或ConcurrentHashMap,但它也应该被轮询,我想这将导致沉重的负载。我一直在考虑的另一种方法是使用java。util。同时发生的但也不确定如何将其用于同步/阻塞执行

问题是:如何在Java中完成这项任务?是否有一些模式/类别?多谢各位


共 (2) 个答案

  1. # 1 楼答案

    您可以使用一个队列,生产者向其中写入数据,消费者从中消费数据,这是您已经拥有的数据。但是,您可以使用锁将生产/消费与此队列同步。 您可以尝试使用ReentrantLock:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html

    还建议使用ExecutorService启动线程:https://dzone.com/articles/executor-framework-in-java-take-control-over-your-1

    以下是一个例子:

    import java.util.concurrent.locks.ReentrantLock;
    import java.util.concurrent.*;
    import java.util.Queue;
    import java.util.ArrayDeque;
    import java.util.Random;
    
    public class Main {
        private static final class Producer implements Runnable {
            private final ReentrantLock lock;
            private final Queue < Integer > queue;
            private final Random random = new Random();
    
            public Producer(ReentrantLock lock, Queue < Integer > queue) {
                this.lock = lock;
                this.queue = queue;
            }
    
            @Override
            public void run() {
                System.out.println("Running producer in thread " + Thread.currentThread());
                while (true) {
                    lock.lock();
                    try {
                        System.out.println("Producer has the lock");
                        // Produce data here
                        for (int i = 0; i < 5; i++) {
                            int randomNumber = random.nextInt(100);
                            System.out.println("Producing " + randomNumber);
                            queue.add(randomNumber);
                        }
    
                        Thread.sleep(2000);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        System.out.println(ex);
                    } finally {
                        System.out.println("Producer letting go of the lock");
                        lock.unlock();
                    }
    
                    try {
                        // Sleep here so this thread doesn't re-acquire the lock immediately
                        Thread.sleep(100);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        System.out.println(ex);
                    }
                }
            }
        }
    
    
        private static final class Consumer implements Runnable {
            private final ReentrantLock lock;
            private final Queue < Integer > queue;
    
            public Consumer(ReentrantLock lock, Queue < Integer > queue) {
                this.lock = lock;
                this.queue = queue;
            }
    
            @Override
            public void run() {
    
                System.out.println("Running consumer in thread " + Thread.currentThread());
    
                while (true) {
                    lock.lock();
                    try {
                        System.out.println("Consumer has the lock");
    
                        while (!queue.isEmpty()) {
                            System.out.println("Consuming " + queue.poll());
                        }
                        Thread.sleep(2000);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        System.out.println(ex);
                    } finally {
                        System.out.println("Consumer letting go of the lock");
                        lock.unlock();
                    }
    
                    try {
                        // Sleep here so this thread doesn't re-acquire the lock immediately
                        Thread.sleep(100);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        System.out.println(ex);
                    }
                }
            }
        }
    
        public static void main(String[] args) {
            int capacity = 10;
            ReentrantLock lock = new ReentrantLock();
            Queue < Integer > queue = new ArrayDeque();
    
            Producer producer = new Producer(lock, queue);
            Consumer consumer = new Consumer(lock, queue);
    
            ExecutorService service = Executors.newFixedThreadPool(2);
    
            service.submit(consumer);
            service.submit(producer);
    
            service.shutdown();
    
        }
    }
    

    请注意,我们正在同步队列周围的访问,因此队列本身不需要是线程安全的,因为我们保证使用锁对其进行线程安全的访问