有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

带有原子替换的java线程安全可序列化集合

当多个线程通过RMI访问同一台服务器时,我的程序面临一个问题。服务器包含一个列表作为缓存,并执行一些昂贵的计算,有时会更改该列表。计算完成后,列表将被序列化并发送到客户端

第一个问题:如果列表在序列化时发生更改(例如,由另一个请求某些数据的客户机更改),则会(可能)抛出ConcurrentModificationException,从而导致RMI调用的EOFException在客户端进行反序列化

因此,我需要一种列表结构,这种结构对于序列化来说是“稳定的”,同时可能会被另一个线程更改

我们尝试的解决方案

  • 常规ArrayList/Set-由于并发而无法工作
  • 在每次序列化之前深度复制整个结构-faaar太贵了
  • CopyOnWriteArrayList也很昂贵,因为它复制了列表

揭示第二个问题:我们需要能够以原子方式替换列表中当前不安全的任何元素(先删除,然后添加(这更昂贵)),或者只通过锁定列表,从而只按顺序执行不同的线程

因此,我的问题是:

Do you know of a Collection implementation which allows us to serialize the Collection thread-safe while other Threads modify it and which contains some way of atomically replacing elements?

A bonus would be if the list would not need to be copied before serialization! Creating a snapshot for every serialization would be okay, but still meh :/

问题说明(C=compute,A=add to list,R=remove from list,S=serialize)

Thread1 Thread2
      C 
      A
      A C
      C A
      S C
      S R <---- Remove and add have to be performed without Thread1 serializing 
      S A <---- anything in between (atomically) - and it has to be done without 
      S S       blocking other threads computations and serializations for long
        S       and not third thread must be allowed to start serializing in this
        S       in-between state
        S

共 (2) 个答案

  1. # 1 楼答案

    我最初错误的想法是CopyOnWriteArrayList是个坏主意,因为它复制了所有东西。当然,它只执行浅层复制,只复制引用,而不是深度复制,同时复制所有对象

    因此,我们显然选择了CopyOnWriteArrayList,因为它已经提供了许多所需的功能。唯一剩下的问题是replace,它甚至变得更复杂,成为addIfAbsentOrReplace

    我们尝试了CopyOnWriteArraySet,但这不符合我们的需要,因为它只提供addIfAbsent。但在我们的例子中,我们有一个名为C的类c1的实例,我们需要存储它,然后用更新的新实例c2替换它。当然,我们覆盖equalshashCode。现在,我们必须选择是否希望等式返回truefalse来表示两个只有极小差异的对象。两种选择都不起作用,因为

    • true意味着对象是相同的,集合甚至不会费心添加新对象c2,因为c1已经存在
    • false意味着c2将被添加,但c1将不会被删除

    因此CopyOnWriteArrayList。这份清单已经提供了一个解决方案

    public void replaceAll(UnaryOperator<E> operator) { ... }
    

    这有点符合我们的需要。它允许我们通过自定义比较替换所需的对象

    我们以以下方式利用它:

    protected <T extends OurSpecialClass> void addIfAbsentOrReplace(T toAdd, List<T> elementList) {
        OurSpecialClassReplaceOperator<T> op = new OurSpecialClassReplaceOperator<>(toAdd);
        synchronized (elementList) {
            elementList.replaceAll(op);
            if (!op.isReplaced()) {
                elementList.add(toAdd);
            }
        }
    }
    
    private class OurSpecialClassReplaceOperator<T extends OurSpecialClass> implements UnaryOperator<T> {
    
        private boolean replaced = false;
    
        private T toAdd;
    
        public OurSpecialClassReplaceOperator(T toAdd) {
            this.toAdd = toAdd;
        }
    
        @Override
        public T apply(T toAdd) {
            if (this.toAdd.getID().equals(toAdd.getID())) {
                replaced = true;
                return this.toAdd;
            }
    
            return toAdd;
        }
    
        public boolean isReplaced() {
            return replaced;
        }
    }
    
  2. # 2 楼答案

    最简单的解决方案是暗示到ArrayList的外部同步,可能通过读写锁实现,如下所示:

    public class SyncList<T> implements Serializable {
        private static final long serialVersionUID = -6184959782243333803L;
    
        private List<T> list = new ArrayList<>();
        private transient Lock readLock, writeLock;
    
        public SyncList() {
            ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            readLock = readWriteLock.readLock();
            writeLock = readWriteLock.writeLock();
        }
    
        public void add(T element) {
            writeLock.lock();
            try {
                list.add(element);
            } finally {
                writeLock.unlock();
            }
        }
    
        public T get(int index) {
            readLock.lock();
            try {
                return list.get(index);
            } finally {
                readLock.unlock();
            }
        }
    
        public String dump() {
            readLock.lock();
            try {
                return list.toString();
            } finally {
                readLock.unlock();
            }
        }
    
        public boolean replace(T old, T newElement) {
            writeLock.lock();
            try {
                int pos = list.indexOf(old);
                if (pos < 0)
                    return false;
                list.set(pos, newElement);
                return true;
            } finally {
                writeLock.unlock();
            }
        }
    
        private void writeObject(ObjectOutputStream out) throws IOException {
            readLock.lock();
            try {
                out.writeObject(list);
            } finally {
                readLock.unlock();
            }
        }
    
        @SuppressWarnings("unchecked")
        private void readObject(ObjectInputStream in) throws IOException,
                ClassNotFoundException {
            list = (List<T>) in.readObject();
            ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            readLock = readWriteLock.readLock();
            writeLock = readWriteLock.writeLock();
        }
    }
    

    提供您喜欢的任何操作,只需正确使用读锁或写锁即可