java并发对匹配
我正在寻找一种java并发性习惯用法,以使匹配与具有最高吞吐量的大量元素配对
考虑我有“人”来自多个线程。每个“人”都在寻找匹配项。当它找到另一个等待它匹配的“人”时,他们都被分配给对方并被移除以进行处理我不想锁定一个大的结构来改变状态。考虑人已经匹配和设置匹配。在提交之前,每个人的#getMatch为空。但当它们被解锁(或被捕获)时,它们要么因为等待匹配时间过长而过期,要么#getMatch为非null
保持高吞吐量的一些问题是,如果PersonA与PersonB同时提交。他们彼此匹配,但也匹配已经在等待的人。提交时,人员B的状态更改为“可用”。但是当PersonB与PersonC匹配时,PersonA不需要意外地获得PersonB。有道理?另外,我希望以一种异步工作的方式来实现这一点。换句话说,我不希望每个提交者都必须用waitForMatch类型的东西抓住线程上的一个人
同样,我不希望请求必须在单独的线程上运行,但是如果有一个额外的match maker线程,也没关系
这似乎应该有一些习语,因为这似乎是一件很常见的事情。但我在谷歌上的搜索结果是枯燥无味的(我可能用错了词)
更新
有两件事让我很难解决这个问题。一个是,我不想在内存中有对象,我希望所有等待的候选对象都在redis或memcache或类似的东西中。另一个是,任何人都可能有几个可能的匹配项。考虑如下界面:
person.getId(); // lets call this an Integer
person.getFriendIds(); // a collection of other person ids
然后我有一个服务器,看起来像这样:
MatchServer:
submit( personId, expiration ) -> void // non-blocking returns immediately
isDone( personId ) -> boolean // either expired or found a match
getMatch( personId ) -> matchId // also non-blocking
这是一个rest接口,它将使用重定向,直到得到结果。我的第一个想法是在MatchServer中有一个缓存,该缓存由redis之类的东西支持,并具有一个并发的弱值散列映射,用于当前锁定并执行操作的对象。每个personId将由一个持久状态对象包装,该对象具有提交、匹配和过期等状态
跟踪到目前为止?非常简单,提交代码完成了最初的工作,大致如下:
public void submit( Person p, long expiration ) {
MatchStatus incoming = new MatchStatus( p.getId(), expiration );
if ( !tryMatch( incoming, p.getFriendIds() ) )
cache.put( p.getId(), incoming );
}
public boolean isDone( Integer personId ) {
MatchStatus status = cache.get( personId );
status.lock();
try {
return status.isMatched() || status.isExpired();
} finally {
status.unlock();
}
}
public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
for ( Integer friend : friends ) {
if ( match( incoming, friend ) )
return true;
}
return false;
}
private boolean match( MatchStatus incoming, Integer waitingId ) {
CallStatus waiting = cache.get( waitingId );
if ( waiting == null )
return false;
waiting.lock();
try {
if ( waiting.isMatched() )
return false;
waiting.setMatch( incoming.getId() );
incoming.setMatch( waiting.getId() );
return true
} finally {
waiting.unlock();
}
}
所以这里的问题是,如果两个人同时进来,而他们是他们唯一的对手,他们就找不到对方了。比赛条件对吗?解决这个问题的唯一方法是同步“tryMatch()”。但这会扼杀我的吞吐量。我不能让tryMatch无限循环,因为我需要这些非常短的呼叫
那么,有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会迫使人们一次集中在一个解决方案中,这对吞吐量来说不是很好。例如,创建一个后台线程并使用阻塞队列一次放置和获取传入线程
任何指导都将不胜感激
# 1 楼答案
异步处理=在“人员提交者”和“人员匹配者”之间的一对逻辑队列:
1。解决方案1:在单个线程内进行串行匹配处理
2。解决方案2:多个线程用于匹配处理,具有最小临界区域的悲观锁定:
3。解决方案3:使用事务性持久存储和*乐观锁定的多线程匹配处理:*
# 2 楼答案
在提出更好更简单的方案之前,我一直采用超级简单的方法。处理BlockingQueue的单个后台线程。它的吞吐量不高,但提交者不必阻塞。它还有一个好处,就是不需要在等待者的持久缓存上进行同步。我可以很容易地将BlockingQueue更改为持久性支持的BlockingQueue。提交者只需在队列已满时等待
唯一的问题是,如果同时有如此多的提交者和轮询者,那么处理队列将完全落后于提交者。下面是泵的简化实现。match方法只是通过#getFriendID进行迭代器,并进行键控查找,以查看该id的人是否存在于redis(或其他)缓存中。如果它们在缓存中,则它们是可匹配的。我交换彼此的ID以匹配它们
#setMatch方法本质上表示一个已完成的条件,该条件是MatchStatus中可重入锁的一部分
# 3 楼答案
我仍然不清楚你们匹配系统的细节,但我可以给你们一些一般性的指导
基本上,如果没有原子读修改写功能,就无法同步进程。我将不讨论如何从数据库中获得它,因为它从简单(带有事务隔离的SQL数据库)到不可能(一些NoSQL数据库)不等。如果无法从数据库中获取,则除了在内存中执行同步之外别无选择
其次,您需要能够以原子方式同时从可用性池中删除两个匹配的人。但是,作为同一个原子操作的一部分,您还需要在将它们分配给彼此之前验证它们是否仍然可用
第三,为了最大化吞吐量,您将系统设计为检测竞争条件,而不是防止竞争条件,并在检测到竞争时实施恢复过程
所有这些在内存中比在数据库中更容易实现(性能也更高)。所以,如果可能的话,我会在记忆中这样做
在该系统中,唯一的阻塞同步操作是插入到匹配池中,然后从匹配池中删除,这些是单独的锁。(请求线程在从匹配池中删除其请求之前,必须获得一个锁,查看它是否仍在匹配池中,如果不在匹配池中,则转移到比赛恢复过程。)我相信这就是同步的理论极限。(好的,我想当池已满时,您还必须阻止插入到池中,但是您还能做什么?如果您可以创建一个新池,那么您可以扩展现有池。)
请注意,通过对请求队列进行排序并按顺序进行搜索,可以保证请求线程可以执行完整的搜索。如果它找不到搜索,那么唯一的希望就是后面的请求将匹配,并且该匹配将由后面的请求线程找到
# 4 楼答案
您可能可以使用
ConcurrentHashMap
。我假设您的对象具有可以匹配的键,例如PersonA和PersonB将具有“Person”键在将匹配项添加到地图或删除现有匹配项并将其配对之前,您将保持循环
remove
和putIfAbsent
都是原子的Edit:因为您想将数据卸载到磁盘上,因此可以使用例如MongoDB及其findAndModify方法。如果已存在具有密钥的对象,则命令将删除并返回该对象,以便您可以将旧对象与新对象配对,并可能存储与新密钥关联的配对;如果带有键的对象不存在,则命令将存储带有键的对象。这相当于
ConcurrentHashMap
的行为,只是数据存储在磁盘上而不是内存中;您不必担心两个对象同时写入,因为findAndModify
逻辑防止它们无意中占用同一个键如果需要将对象序列化为JSON,请使用Jackson
Mongo还有其他选择,例如DynamoDB,尽管Dynamo只免费提供少量数据
编辑:鉴于好友列表不是自反式的,我认为您可以通过MongoDB(或另一个带有原子更新的键值数据库)和
ConcurrentHashMap
的组合来解决这个问题ConcurrentHashMap<key, boolean>
,可能是在全局ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>
中李>findAndModify
将其自动设置为“匹配”,然后以“匹配”状态将此人写入MongoDB,最后将该对添加到MongoDB中的“对”集合中,最终用户可以查询该集合。从全局映射中删除此人的ConcurrentHashMap
李>ConcurrentHashMap
。它有,然后什么也不做;如果没有,那么检查朋友是否有一个与之关联的ConcurrentHashMap
;如果是,则将与当前人员密钥关联的值设置为“true”(请注意,两个朋友仍有可能写入彼此的哈希映射,因为当前用户无法检查自己的映射并使用一个原子操作修改朋友的映射,但自哈希映射检查减少了这种可能性。)李>ConcurrentHashMap
,并创建一个延迟任务,该任务将遍历所有写信给此人ConcurrentHashMap
的朋友的ID(即使用ConcurrentHashMap#keySet()
)。此任务的延迟应该是随机的(例如Thread.sleep(500 * rand.nextInt(30))
),这样两个朋友就不会总是试图同时匹配。如果当前用户没有任何需要重新检查的朋友,则不要为其创建延迟任务李>在常见情况下,一个人要么与朋友匹配,要么在迭代朋友列表时,在没有将朋友添加到系统的情况下无法匹配(即,该人的
ConcurrentHashMap
将为空)。如果朋友同时写信:同时添加Friend1和Friend2
ConcurrentHashMap
,表示他们错过了对方李>ConcurrentHashMap
以表示相同(只有当Friend2检查Friend1是否在Friend1写入映射的同时写入映射时,才会发生这种情况-序号ily Friend2会检测到Friend1已写入其地图,因此不会写入Friend1的地图)李>几个小问题:
ConcurrentHashMaps
,例如,Friend1检查映射是否在内存中时,Friend2是否仍在初始化其哈希映射。这很好,因为Friend2将写入Friend1的哈希映射,因此我们保证最终将尝试匹配-其中至少一个将具有哈希映射,而另一个正在迭代,因为哈希映射创建在迭代之前李>ConcurrentHashMap
的朋友列表合并,然后下一次迭代应该将其用作新的朋友列表。最终,此人将被匹配,否则此人的“重新检查”好友列表将被清空李>Thread.sleep(500 * rand.nextInt(30))
,第二次迭代中的Thread.sleep(500 * rand.nextInt(60))
,第三次迭代中的Thread.sleep(500 * rand.nextInt(90))
,等等)李>ConcurrentHashMap
,否则您将面临数据竞争。同样,在遍历MongoDB的潜在匹配项时,必须将其从MongoDB中删除,否则可能会无意中匹配两次李>编辑:一些代码:
方法
addUnmatchedToMongo(person1)
将“不匹配”的person1写入MongoDBsetToMatched(friend1)
使用findAndModify
以原子方式将friend1
设置为“匹配”;如果friend1
已匹配或不存在,则该方法将返回false;如果更新成功,则该方法将返回trueisMatched(friend1)
如果friend1
存在且匹配,则返回true;如果不存在或不存在且“不匹配”,则返回false