有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程Java如何实现对ConcurrentHashMap读取的锁定

TL;DR:在Java中,我有N个线程,每个线程使用一个共享集合。ConcurrentHashMap允许我锁定写操作,但不能锁定读操作。我需要的是锁定集合的特定项,读取以前的数据,进行一些计算,并更新值。如果两个线程从同一个发送方接收到两条消息,那么第二个线程必须等待第一条消息完成,然后才能执行任务


长版本:

这些线程正在接收按时间顺序排列的消息,它们必须根据messageSenderID更新集合

我的代码简化如下:

public class Parent {
    private Map<String, MyObject> myObjects;

    ExecutorService executor;
    List<Future<?>> runnables = new ArrayList<Future<?>>();

    public Parent(){
        myObjects= new ConcurrentHashMap<String, MyObject>();

        executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            WorkerThread worker = new WorkerThread("worker_" + i);
            Future<?> future = executor.submit(worker);
            runnables.add(future);
        }
    }

    private synchronized String getMessageFromSender(){
        // Get a message from the common source
    }

    private synchronized MyObject getMyObject(String id){
        MyObject myObject = myObjects.get(id);
        if (myObject == null) {
            myObject = new MyObject(id);
            myObjects.put(id, myObject);
        }
        return myObject;
    }

    private class WorkerThread implements Runnable {
        private String name;

        public WorkerThread(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            while(!isStopped()) {
                JSONObject message = getMessageFromSender();
                String id = message.getString("id");
                MyObject myObject = getMyObject(id);
                synchronized (myObject) {
                    doLotOfStuff(myObject);
                }
            }
        }
    }
}

所以基本上我有一个生产者和N个消费者,以加快处理速度,但N个消费者必须处理一个共同的数据基础,并且必须遵守时间顺序

我目前正在使用ConcurrentHashMap,但如果需要,我愿意更改它

如果具有相同ID的消息到达时相隔足够远(>;1秒),则代码似乎可以工作,但如果在微秒的距离内收到两条具有相同ID的消息,则会有两个线程处理集合中的同一项

我猜我想要的行为是:

Thread 1                        Thread 2
--------------------------------------------------------------
read message 1
find ID
lock that ID in collection
do computation and update
                                read message 2
                                find ID
                                lock that ID in collection
                                do computation and update

而我认为这就是发生的事情:

Thread 1                        Thread 2
--------------------------------------------------------------
read message 1
                                read message 2
                                find ID
                                lock that ID in collection
                                do computation and update
find ID
lock that ID in collection
do computation and update

我想做点什么

JSONObject message = getMessageFromSender();
synchronized(message){
    String id = message.getString("id");
    MyObject myObject = getMyObject(id);
    synchronized (myObject) {
        doLotOfStuff(myObject);
    } // well maybe this inner synchronized is superfluous, at this point
}

但我认为这会扼杀拥有多线程结构的全部目的,因为我一次只能阅读一条消息,而工人们没有做任何其他事情;这就像我使用的是SynchronizedHashMap,而不是ConcurrentHashMap


为了记录在案,我在这里报告了我最终实现的解决方案。我不确定它是否最优,我仍然需要测试性能,但至少输入是正确的

public class Parent implements Runnable {

    private final static int NUM_WORKERS = 10;
    ExecutorService executor;
    List<Future<?>> futures = new ArrayList<Future<?>>();
    List<WorkerThread> workers = new ArrayList<WorkerThread>();

    @Override
    public void run() {
        executor = Executors.newFixedThreadPool(NUM_WORKERS);
        for (int i = 0; i < NUM_WORKERS; i++) {
            WorkerThread worker = new WorkerThread("worker_" + i);
            Future<?> future = executor.submit(worker);
            futures.add(future);
            workers.add(worker);
        }

        while(!isStopped()) {
            byte[] message = getMessageFromSender();
            byte[] id = getId(message);
            int n = Integer.valueOf(Byte.toString(id[id.length-1])) % NUM_WORKERS;
            if(n >= 0 && n <= (NUM_WORKERS-1)){
                workers.get(n).addToQueue(line);
            }
        }
    }

    private class WorkerThread implements Runnable {
        private String name;
        private Map<String, MyObject> myObjects;
        private LinkedBlockingQueue<byte[]> queue;

        public WorkerThread(String name) {
            this.name = name;
        }

        public void addToQueue(byte[] line) {
            queue.add(line);
        }

        @Override
        public void run() {
            while(!isStopped()) {
                byte[] message= queue.poll();
                if(line != null) {
                    String id = getId(message);
                    MyObject myObject = getMyObject(id);
                    doLotOfStuff(myObject);
                }
            }
        }
    }
}

共 (0) 个答案