Java线程之间的多线程数据交换
我需要做2个java线程数据交换。一个线程,即生产者,正在生成数据,第二个线程,即消费者,正在处理数据。我需要阻塞处理,即生产商应等待其数据被处理。我计划稍后在多个消费者/生产者线程中使用它,但决定从一种mvp/原型开始。 问题是,我无法想象实现这一点的技术。我首先尝试使用BlockingQueue为传入数据创建消费者,并使用方法将数据放入该队列。生产者调用此方法将数据放入队列,然后等待数据处理完成。使用者不断轮询队列以检测传入数据,并在数据到达时进行处理。为了向消费者发出数据已被处理的信号,我在数据类中使用信号量。下面是实现此场景的代码示例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class Adder
{
Integer a, b, res;
Semaphore processed = new Semaphore(0);
Adder(int a, int b)
{
this.a = a;
this.b = b;
this.res = null;
}
int getA()
{
return (a);
}
int getB()
{
return (b);
}
int getRes()
{
return (res);
}
void compute()
{
res = a + b;
}
Semaphore getProcessed()
{
return (processed);
}
@Override
public String toString()
{
return ("Adder: a=" + getA() + ", b=" + getB() + ", result " + (res == null ? "is undefined yet" : getRes()));
}
}
class Consumer implements Runnable
{
BlockingQueue<Adder> requests = new LinkedBlockingQueue<>();
boolean processRequest(Adder adder) throws InterruptedException
{
boolean processingResult = false;
requests.add(adder);
processingResult = adder.getProcessed().tryAcquire(1, TimeUnit.SECONDS);
return (processingResult);
}
@Override
public void run()
{
System.out.println("Consumer thread started");
while (!Thread.interrupted())
{
try
{
Adder adder = requests.poll(100, TimeUnit.MILLISECONDS);
if (adder != null)
{
adder.compute();
adder.getProcessed().release();
Thread.sleep(1000);
}
}
catch (InterruptedException e)
{
System.out.println("Consumer thread interrupted");
break;
}
}
System.out.println("Consumer thread completed");
}
}
class Producer implements Runnable
{
Consumer consumer;
Producer(Consumer consumer)
{
this.consumer = consumer;
}
@Override
public void run()
{
System.out.println("Producer thread started");
while (!Thread.interrupted())
{
Adder adder = new Adder((int) (Math.random() * 20), (int) (Math.random() * 20));
System.out.println("New request generated: " + adder);
try
{
if (consumer.processRequest(adder))
{
System.out.println("+ Request has been processed: " + adder);
}
else
{
System.out.println("- Request processing failed: " + adder);
}
}
catch (InterruptedException e)
{
System.out.println("Producer thread interrupted");
break;
}
}
System.out.println("Producer thread completed");
}
}
public class ThreadsDataExchange
{
public static void main(String[] args) throws InterruptedException
{
Consumer consumer = new Consumer();
Producer producer = new Producer(consumer);
Thread consumerThread = new Thread(consumer);
Thread producerThread = new Thread(producer);
consumerThread.start();
producerThread.start();
Thread.sleep(3000);
System.out.println("Interrupting threads");
producerThread.interrupt();
producerThread.join();
consumerThread.interrupt();
consumerThread.join();
}
}
但在这种情况下,由于我需要限制等待时间,可能会出现这样一种情况:生产者等待处理的时间会超时,假定数据尚未处理,但消费者会立即同时处理数据
因此,作为另一种方法,我认为对传入和处理的消息使用2个队列,但由于实际队列可能相当大,因此在结果队列中搜索处理过的消息需要很长时间。然后我有第三个选择,使用一种SynchronizedMap,或ConcurrentHashMap,但它也应该被轮询,我想这将导致沉重的负载。我一直在考虑的另一种方法是使用java。util。同时发生的但也不确定如何将其用于同步/阻塞执行
问题是:如何在Java中完成这项任务?是否有一些模式/类别?多谢各位
# 1 楼答案
您可以使用一个队列,生产者向其中写入数据,消费者从中消费数据,这是您已经拥有的数据。但是,您可以使用锁将生产/消费与此队列同步。 您可以尝试使用ReentrantLock:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html
还建议使用ExecutorService启动线程:https://dzone.com/articles/executor-framework-in-java-take-control-over-your-1
以下是一个例子:
请注意,我们正在同步队列周围的访问,因此队列本身不需要是线程安全的,因为我们保证使用锁对其进行线程安全的访问
# 2 楼答案
为了回答你的问题,我想看一下ExecutorService。您可以使用提交来接收未来并等待完成;同步或异步
https://www.baeldung.com/java-executor-service-tutorial