并发性这个java代码是线程安全的吗?
我计划在我的应用程序中使用此模式,但我不确定这是否安全
为了提供一点背景信息,一组服务器将计算属于单个任务的子任务的结果,并将它们报告回中央服务器。这段代码用于注册结果,并检查任务的所有子任务是否已完成,如果已完成,则只报告一次该事实
重要的一点是,所有任务必须报告一次,并且在完成后仅报告一次(所有子任务结果都已设置)
有人能帮忙吗?非常感谢。(另外,如果您有更好的解决此问题的方法,请让我知道!)
*请注意,为了简洁起见,我简化了代码
解决方案一
class Task {
//Populate with bunch of (Long, new AtomicReference()) pairs
//Actual app uses read only HashMap
Map<Id, AtomicReference<SubTaskResult>> subtasks = populatedMap();
Semaphore permission = new Semaphore(1);
public Task set(id, subTaskResult){
//null check omitted
subtasks.get(id).set(result);
return check() ? this : null;
}
private boolean check(){
for(AtomicReference ref : subtasks){
if(ref.get()==null){
return false;
}
}//for
return permission.tryAquire();
}
}//class
斯蒂芬·C好心地建议使用柜台。事实上,我曾经考虑过这一点,但我认为JVM可以重新排序操作,因此,在AtomicReference(由另一个线程)中设置结果之前,一个线程可以观察(由另一个线程)递减的计数器
*编辑:我现在看到这是线程安全的。我同意这个解决方案。谢谢你,斯蒂芬
解决方案二
class Task {
//Populate with bunch of (Long, new AtomicReference()) pairs
//Actual app uses read only HashMap
Map<Id, AtomicReference<SubTaskResult>> subtasks = populatedMap();
AtomicInteger counter = new AtomicInteger(subtasks.size());
public Task set(id, subTaskResult){
//null check omitted
subtasks.get(id).set(result);
//In the actual app, if !compareAndSet(null, result) return null;
return check() ? this : null;
}
private boolean check(){
return counter.decrementAndGet() == 0;
}
}//class
# 1 楼答案
第二种解决方案提供了线程安全闩锁,但是它很容易受到对
set()
的调用的攻击,这些调用提供了一个不在映射中的ID——这将触发一个NullPointerException
——或者对具有相同ID的set()
的多个调用。后者会错误地将计数器减量太多次,并在可能存在其他子任务ID时错误地报告完成情况,而这些子任务ID没有得到结果提交。我的批评不是关于线程安全,而是关于不变维护;即使没有与线程相关的问题,也会出现相同的缺陷解决这个问题的另一种方法是使用^{} ,但这有点不必要:您可以实现一个精简的计数信号量,其中每个调用
set()
将调用releaseShared()
,通过compareAndSetState()
上的旋转来减少计数器,并且tryAcquireShared()
只有在计数为零时才会成功。这或多或少是您在上面用AtomicInteger
实现的,但您将重用一个工具,它提供了更多可用于设计其他部分的功能为了充实基于
AbstractQueuedSynchronizer
的解决方案,需要再添加一个操作来证明复杂性:能够等待所有子任务的结果返回,这样整个任务就完成了。在下面的代码中是Task#awaitCompletion()
和Task#awaitCompletion(long, TimeUnit)
再说一遍,这可能有点过头了,但为了讨论的目的,我将与大家分享
# 2 楼答案
我假设您的用例是有多个多线程调用
set
,但是对于id
的任何给定值,set
方法将只被调用一次。我还假设populateMap
为所有使用的id
值创建条目,并且subtasks
和permission
确实是私有的如果是这样,我认为代码是线程安全的
每个线程都应该看到
subtasks
映射的初始化状态,包括所有键和所有原子引用。这种状态永远不会改变,因此subtasks.get(id)
将始终提供正确的引用。set(result)
调用对原子引用进行操作,因此check()
中随后的get()
方法调用将提供最新的值。。。在所有线程中。任何有多个线程调用check的潜在竞争似乎都会自行解决然而,这是一个相当复杂的解决方案。更简单的解决方案是使用并发计数器;e、 g.用
AtomicInteger
替换Semaphore
,并使用decrementAndGet
,而不是重复扫描check
中的subtasks
映射针对更新解决方案中的这一评论:
根据定义,AtomicInteger和AtomicReference是原子的。任何试图访问一个线程的线程都保证在访问时看到“当前”值
在这种特殊情况下,每个线程在调用原子整数上的
decrementAndGet
之前,先调用相关原子引用上的^{。这不能重新排序。线程执行的操作是按顺序执行的。由于这些都是原子操作,因此其他线程也可以依次看到这些效果换句话说,它应该是线程安全的。。。阿法克
# 3 楼答案
阅读示例程序时,我有一种奇怪的感觉,但这取决于程序的更大结构。同时检查完成情况的set函数几乎就是一种代码气味。:-)只是一些想法
如果与服务器进行同步通信,则可以使用ExecutorService,其线程数与进行通信的服务器数相同。从这里你可以得到一堆Futures,你可以自然地继续你的计算——get调用会在需要结果的时候阻塞,但现在还没有
如果与服务器进行异步通信,则在将任务提交到服务器后,也可以使用CountDownLatch。wait调用阻塞主线程,直到所有子任务完成,其他线程可以接收结果并对每个接收到的结果调用倒计时
使用所有这些方法,除了在结构中并发存储结果是线程安全的之外,您不需要特殊的线程安全措施。我敢打赌有更好的模式
# 4 楼答案
原子性保证(每个类文档)明确用于原子性引用。compareAndSet扩展到set和get方法(根据包文档),因此在这方面,您的代码似乎是线程安全的
然而,我不确定你为什么有信号灯。tryAquire是一个副作用,但是如果没有免费的代码来释放信号量,那么代码的这一部分看起来是错误的