有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

并发性这个java代码是线程安全的吗?

我计划在我的应用程序中使用此模式,但我不确定这是否安全

为了提供一点背景信息,一组服务器将计算属于单个任务的子任务的结果,并将它们报告回中央服务器。这段代码用于注册结果,并检查任务的所有子任务是否已完成,如果已完成,则只报告一次该事实

重要的一点是,所有任务必须报告一次,并且在完成后仅报告一次(所有子任务结果都已设置)

有人能帮忙吗?非常感谢。(另外,如果您有更好的解决此问题的方法,请让我知道!)

*请注意,为了简洁起见,我简化了代码

解决方案一

class Task {
    //Populate with bunch of (Long, new AtomicReference()) pairs
    //Actual app uses read only HashMap
    Map<Id, AtomicReference<SubTaskResult>> subtasks = populatedMap();

    Semaphore permission = new Semaphore(1);

    public Task set(id, subTaskResult){
           //null check omitted
           subtasks.get(id).set(result);
           return check() ? this : null;
    }

    private boolean check(){
          for(AtomicReference ref : subtasks){
              if(ref.get()==null){
                  return false;
              }
          }//for
          return permission.tryAquire();
    }

  }//class

斯蒂芬·C好心地建议使用柜台。事实上,我曾经考虑过这一点,但我认为JVM可以重新排序操作,因此,在AtomicReference(由另一个线程)中设置结果之前,一个线程可以观察(由另一个线程)递减的计数器

*编辑:我现在看到这是线程安全的。我同意这个解决方案。谢谢你,斯蒂芬

解决方案二

class Task {
    //Populate with bunch of (Long, new AtomicReference()) pairs
    //Actual app uses read only HashMap
    Map<Id, AtomicReference<SubTaskResult>> subtasks = populatedMap();
    AtomicInteger counter = new AtomicInteger(subtasks.size());

    public Task set(id, subTaskResult){
           //null check omitted
           subtasks.get(id).set(result);
           //In the actual app, if !compareAndSet(null, result) return null;
           return check() ? this : null;
    }

    private boolean check(){
           return counter.decrementAndGet() == 0;
    }

  }//class

共 (4) 个答案

  1. # 1 楼答案

    第二种解决方案提供了线程安全闩锁,但是它很容易受到对set()的调用的攻击,这些调用提供了一个不在映射中的ID——这将触发一个NullPointerException——或者对具有相同ID的set()的多个调用。后者会错误地将计数器减量太多次,并在可能存在其他子任务ID时错误地报告完成情况,而这些子任务ID没有得到结果提交。我的批评不是关于线程安全,而是关于不变维护;即使没有与线程相关的问题,也会出现相同的缺陷

    解决这个问题的另一种方法是使用^{},但这有点不必要:您可以实现一个精简的计数信号量,其中每个调用set()将调用releaseShared(),通过compareAndSetState()上的旋转来减少计数器,并且tryAcquireShared()只有在计数为零时才会成功。这或多或少是您在上面用AtomicInteger实现的,但您将重用一个工具,它提供了更多可用于设计其他部分的功能


    为了充实基于AbstractQueuedSynchronizer的解决方案,需要再添加一个操作来证明复杂性:能够等待所有子任务的结果返回,这样整个任务就完成了。在下面的代码中是Task#awaitCompletion()Task#awaitCompletion(long, TimeUnit)

    再说一遍,这可能有点过头了,但为了讨论的目的,我将与大家分享

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    
    
    final class Task
    {
      private static final class Sync extends AbstractQueuedSynchronizer
      {
        public Sync(int count)
        {
          setState(count);
        }
    
    
        @Override
        protected int tryAcquireShared(int ignored)
        {
          return 0 == getState() ? 1 : -1;
        }
    
    
        @Override
        protected boolean tryReleaseShared(int ignored)
        {
          int current;
          do
          {
            current = getState();
            if (0 == current)
              return true;
          }
          while (!compareAndSetState(current, current - 1));
          return 1 == current;
        }
      }
    
    
      public Task(int count)
      {
        if (count < 0)
          throw new IllegalArgumentException();
        sync_ = new Sync(count);
      }
    
    
      public boolean set(int id, Object result)
      {
        // Ensure that "id" refers to an incomplete task. Doing so requires
        // additional synchronization over the structure mapping subtask 
        // identifiers to results.
        // Store result somehow.
        return sync_.releaseShared(1);
      }
    
    
      public void awaitCompletion()
        throws InterruptedException
      {
        sync_.acquireSharedInterruptibly(0);
      }
    
    
      public void awaitCompletion(long time, TimeUnit unit)
        throws InterruptedException
      {
        sync_.tryAcquireSharedNanos(0, unit.toNanos(time));
      }
    
    
      private final Sync sync_;
    }
    
  2. # 2 楼答案

    我假设您的用例是有多个多线程调用set,但是对于id的任何给定值,set方法将只被调用一次。我还假设populateMap为所有使用的id值创建条目,并且subtaskspermission确实是私有的

    如果是这样,我认为代码是线程安全的

    每个线程都应该看到subtasks映射的初始化状态,包括所有键和所有原子引用。这种状态永远不会改变,因此subtasks.get(id)将始终提供正确的引用。set(result)调用对原子引用进行操作,因此check()中随后的get()方法调用将提供最新的值。。。在所有线程中。任何有多个线程调用check的潜在竞争似乎都会自行解决

    然而,这是一个相当复杂的解决方案。更简单的解决方案是使用并发计数器;e、 g.用AtomicInteger替换Semaphore,并使用decrementAndGet,而不是重复扫描check中的subtasks映射


    针对更新解决方案中的这一评论:

    Actually, I have considered that once, but I reasoned that the JVM could reorder the operations and thus, a thread can observe a decremented counter (by another thread) before the result is set in AtomicReference (by that other thread).

    根据定义,AtomicInteger和AtomicReference是原子的。任何试图访问一个线程的线程都保证在访问时看到“当前”值

    在这种特殊情况下,每个线程在调用原子整数上的decrementAndGet之前,先调用相关原子引用上的^{。这不能重新排序。线程执行的操作是按顺序执行的。由于这些都是原子操作,因此其他线程也可以依次看到这些效果

    换句话说,它应该是线程安全的。。。阿法克

  3. # 3 楼答案

    阅读示例程序时,我有一种奇怪的感觉,但这取决于程序的更大结构。同时检查完成情况的set函数几乎就是一种代码气味。:-)只是一些想法

    如果与服务器进行同步通信,则可以使用ExecutorService,其线程数与进行通信的服务器数相同。从这里你可以得到一堆Futures,你可以自然地继续你的计算——get调用会在需要结果的时候阻塞,但现在还没有

    如果与服务器进行异步通信,则在将任务提交到服务器后,也可以使用CountDownLatch。wait调用阻塞主线程,直到所有子任务完成,其他线程可以接收结果并对每个接收到的结果调用倒计时

    使用所有这些方法,除了在结构中并发存储结果是线程安全的之外,您不需要特殊的线程安全措施。我敢打赌有更好的模式

  4. # 4 楼答案

    原子性保证(每个类文档)明确用于原子性引用。compareAndSet扩展到set和get方法(根据包文档),因此在这方面,您的代码似乎是线程安全的

    然而,我不确定你为什么有信号灯。tryAquire是一个副作用,但是如果没有免费的代码来释放信号量,那么代码的这一部分看起来是错误的