多线程Java如何实现对ConcurrentHashMap读取的锁定
TL;DR:在Java中,我有N个线程,每个线程使用一个共享集合。ConcurrentHashMap允许我锁定写操作,但不能锁定读操作。我需要的是锁定集合的特定项,读取以前的数据,进行一些计算,并更新值。如果两个线程从同一个发送方接收到两条消息,那么第二个线程必须等待第一条消息完成,然后才能执行任务
长版本:
这些线程正在接收按时间顺序排列的消息,它们必须根据messageSenderID
更新集合
我的代码简化如下:
public class Parent {
private Map<String, MyObject> myObjects;
ExecutorService executor;
List<Future<?>> runnables = new ArrayList<Future<?>>();
public Parent(){
myObjects= new ConcurrentHashMap<String, MyObject>();
executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
WorkerThread worker = new WorkerThread("worker_" + i);
Future<?> future = executor.submit(worker);
runnables.add(future);
}
}
private synchronized String getMessageFromSender(){
// Get a message from the common source
}
private synchronized MyObject getMyObject(String id){
MyObject myObject = myObjects.get(id);
if (myObject == null) {
myObject = new MyObject(id);
myObjects.put(id, myObject);
}
return myObject;
}
private class WorkerThread implements Runnable {
private String name;
public WorkerThread(String name) {
this.name = name;
}
@Override
public void run() {
while(!isStopped()) {
JSONObject message = getMessageFromSender();
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
}
}
}
}
}
所以基本上我有一个生产者和N个消费者,以加快处理速度,但N个消费者必须处理一个共同的数据基础,并且必须遵守时间顺序
我目前正在使用ConcurrentHashMap
,但如果需要,我愿意更改它
如果具有相同ID的消息到达时相隔足够远(>;1秒),则代码似乎可以工作,但如果在微秒的距离内收到两条具有相同ID的消息,则会有两个线程处理集合中的同一项
我猜我想要的行为是:
Thread 1 Thread 2
--------------------------------------------------------------
read message 1
find ID
lock that ID in collection
do computation and update
read message 2
find ID
lock that ID in collection
do computation and update
而我认为这就是发生的事情:
Thread 1 Thread 2
--------------------------------------------------------------
read message 1
read message 2
find ID
lock that ID in collection
do computation and update
find ID
lock that ID in collection
do computation and update
我想做点什么
JSONObject message = getMessageFromSender();
synchronized(message){
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
} // well maybe this inner synchronized is superfluous, at this point
}
但我认为这会扼杀拥有多线程结构的全部目的,因为我一次只能阅读一条消息,而工人们没有做任何其他事情;这就像我使用的是SynchronizedHashMap,而不是ConcurrentHashMap
为了记录在案,我在这里报告了我最终实现的解决方案。我不确定它是否最优,我仍然需要测试性能,但至少输入是正确的
public class Parent implements Runnable {
private final static int NUM_WORKERS = 10;
ExecutorService executor;
List<Future<?>> futures = new ArrayList<Future<?>>();
List<WorkerThread> workers = new ArrayList<WorkerThread>();
@Override
public void run() {
executor = Executors.newFixedThreadPool(NUM_WORKERS);
for (int i = 0; i < NUM_WORKERS; i++) {
WorkerThread worker = new WorkerThread("worker_" + i);
Future<?> future = executor.submit(worker);
futures.add(future);
workers.add(worker);
}
while(!isStopped()) {
byte[] message = getMessageFromSender();
byte[] id = getId(message);
int n = Integer.valueOf(Byte.toString(id[id.length-1])) % NUM_WORKERS;
if(n >= 0 && n <= (NUM_WORKERS-1)){
workers.get(n).addToQueue(line);
}
}
}
private class WorkerThread implements Runnable {
private String name;
private Map<String, MyObject> myObjects;
private LinkedBlockingQueue<byte[]> queue;
public WorkerThread(String name) {
this.name = name;
}
public void addToQueue(byte[] line) {
queue.add(line);
}
@Override
public void run() {
while(!isStopped()) {
byte[] message= queue.poll();
if(line != null) {
String id = getId(message);
MyObject myObject = getMyObject(id);
doLotOfStuff(myObject);
}
}
}
}
}
共 (0) 个答案