有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java并发对匹配

我正在寻找一种java并发性习惯用法,以使匹配与具有最高吞吐量的大量元素配对

考虑我有“人”来自多个线程。每个“人”都在寻找匹配项。当它找到另一个等待它匹配的“人”时,他们都被分配给对方并被移除以进行处理

我不想锁定一个大的结构来改变状态。考虑人已经匹配和设置匹配。在提交之前,每个人的#getMatch为空。但当它们被解锁(或被捕获)时,它们要么因为等待匹配时间过长而过期,要么#getMatch为非null

保持高吞吐量的一些问题是,如果PersonA与PersonB同时提交。他们彼此匹配,但也匹配已经在等待的人。提交时,人员B的状态更改为“可用”。但是当PersonB与PersonC匹配时,PersonA不需要意外地获得PersonB。有道理?另外,我希望以一种异步工作的方式来实现这一点。换句话说,我不希望每个提交者都必须用waitForMatch类型的东西抓住线程上的一个人

同样,我不希望请求必须在单独的线程上运行,但是如果有一个额外的match maker线程,也没关系

这似乎应该有一些习语,因为这似乎是一件很常见的事情。但我在谷歌上的搜索结果是枯燥无味的(我可能用错了词)

更新

有两件事让我很难解决这个问题。一个是,我不想在内存中有对象,我希望所有等待的候选对象都在redis或memcache或类似的东西中。另一个是,任何人都可能有几个可能的匹配项。考虑如下界面:

person.getId();         // lets call this an Integer
person.getFriendIds();  // a collection of other person ids

然后我有一个服务器,看起来像这样:

MatchServer:
   submit( personId, expiration ) -> void // non-blocking returns immediately
   isDone( personId ) -> boolean          // either expired or found a match
   getMatch( personId ) -> matchId        // also non-blocking

这是一个rest接口,它将使用重定向,直到得到结果。我的第一个想法是在MatchServer中有一个缓存,该缓存由redis之类的东西支持,并具有一个并发的弱值散列映射,用于当前锁定并执行操作的对象。每个personId将由一个持久状态对象包装,该对象具有提交、匹配和过期等状态

跟踪到目前为止?非常简单,提交代码完成了最初的工作,大致如下:

public void submit( Person p, long expiration ) {
    MatchStatus incoming = new MatchStatus( p.getId(), expiration );
    if ( !tryMatch( incoming, p.getFriendIds() ) )
        cache.put( p.getId(), incoming ); 
}

public boolean isDone( Integer personId ) {
    MatchStatus status = cache.get( personId );
    status.lock();
    try {
        return status.isMatched() || status.isExpired();

    } finally {
        status.unlock();
    }
}

public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
    for ( Integer friend : friends ) {
        if ( match( incoming, friend ) )
            return true;
    }

    return false;
}

private boolean match( MatchStatus incoming, Integer waitingId ) {
    CallStatus waiting = cache.get( waitingId );
    if ( waiting == null )
        return false;

    waiting.lock();
    try {
        if ( waiting.isMatched() )
            return false;

        waiting.setMatch( incoming.getId() );
        incoming.setMatch( waiting.getId() );

        return true
    } finally {
        waiting.unlock();
    }
}

所以这里的问题是,如果两个人同时进来,而他们是他们唯一的对手,他们就找不到对方了。比赛条件对吗?解决这个问题的唯一方法是同步“tryMatch()”。但这会扼杀我的吞吐量。我不能让tryMatch无限循环,因为我需要这些非常短的呼叫

那么,有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会迫使人们一次集中在一个解决方案中,这对吞吐量来说不是很好。例如,创建一个后台线程并使用阻塞队列一次放置和获取传入线程

任何指导都将不胜感激


共 (4) 个答案

  1. # 1 楼答案

    I want to do this in a way that works asynchronously

    异步处理=在“人员提交者”和“人员匹配者”之间的一对逻辑队列:

    • 一个队列用于匹配请求
    • 另一个匹配响应队列
    • 核心人员匹配器算法无论是同步还是异步都是相同的
    • 换句话说,异步行为被添加为顶部的外观/装饰器,不会影响匹配设计

    There are a couple of things that make this problem hard for me. One is that I don't want to have objects in memory, I'd like to have all waiting candidates in redis or memcache or something like that.

    • 在跳转到解决方案之前,首先描述您的需求/约束可能会有所帮助。我会改用这种语言: “我只有200MB内存可用于匹配处理,希望最大限度地提高性能。在任何时间点,最多可能有10000个匹配挂起,30分钟后匹配超时。”
    • 您不希望存储“在内存中”,而是希望存储“在缓存中”。但是你提到的两个缓存不是100%都在内存中吗?我不清楚他们加了什么。您认为缓存可以解决哪些具体需求?例如,您可以将大量数据存储在200MB内存中,并拥有一个干净、精简的高性能算法

    My first thought was to just have a Cache in MatchServer that was backed by something like redis and has a concurrent weak value hash map for objects that were currently locked and being acted on.

    • 我建议您使用并发队列和普通哈希映射:请求者插入队列,匹配器线程从队列中取出并插入普通哈希映射
    • 并发哈希映射只有在多个线程同时对其进行操作时才有用。如果只有一个线程在它上面运行,那么普通的哈希映射就很好,而且性能也很好。虽然可以让请求者直接插入到并发哈希映射中,但我认为这会导致太多并发争用/锁定
    • 在这里,您谈论的是在进行匹配之前锁定,但这并不一定是必需的

    1。解决方案1:在单个线程内进行串行匹配处理

    • 在一个“匹配线程”内完成所有匹配
    • 无需任何锁定或事务-不存在多线程争用
    • 并发队列序列化所有传入请求
    • Match线程一次处理一个请求,将新的人插入哈希映射
    • 为新的人员运行匹配-如果找到匹配项,则链接这两个人,从hashmap中删除每个人,并通过响应队列将结果返回给two请求者

    2。解决方案2:多个线程用于匹配处理,具有最小临界区域的悲观锁定:

    • 获取对下一个人的引用(从队列)
    • 在不锁定的情况下为该人员运行匹配算法(即乐观匹配处理)
    • 确定最佳候选匹配项(如果有)
    • 然后执行悲观记录写入和存储:
      • 然后锁定该对
      • 确认它们在处于锁定状态后仍然不匹配
      • 将它们链接为匹配项,必要时更新到持久存储
      • 解锁

    3。解决方案3:使用事务性持久存储和*乐观锁定的多线程匹配处理:*

    • 修改person类,添加匹配的时间戳或版本号。这将充当乐观锁定控制标志
    • 获取对下一个人的引用(从队列中,从持久性存储中选择额外的“读取”)
    • 在不锁定的情况下为该人员运行匹配算法(即乐观匹配处理)
    • 确定最佳候选匹配项(如果有)
    • 然后执行乐观记录写入和存储:
      • 启动事务(可能是JTA/DB事务)
      • 将两个人对象链接为匹配项
      • 更新双人对象上的匹配时间戳/版本号(JPA通过@version annotation自动执行此操作))
      • 更新时间戳/版本号与old值匹配的持久存储(例如DB)(JPA通过@version注释自动执行此操作)
      • 提交事务
  2. # 2 楼答案

    在提出更好更简单的方案之前,我一直采用超级简单的方法。处理BlockingQueue的单个后台线程。它的吞吐量不高,但提交者不必阻塞。它还有一个好处,就是不需要在等待者的持久缓存上进行同步。我可以很容易地将BlockingQueue更改为持久性支持的BlockingQueue。提交者只需在队列已满时等待

    唯一的问题是,如果同时有如此多的提交者和轮询者,那么处理队列将完全落后于提交者。下面是泵的简化实现。match方法只是通过#getFriendID进行迭代器,并进行键控查找,以查看该id的人是否存在于redis(或其他)缓存中。如果它们在缓存中,则它们是可匹配的。我交换彼此的ID以匹配它们

    class HoldPump extends Thread {
    
        private final BlockingQueue<Incoming> queue = new ArrayBlockingQueue<>( CAPACITY );
    
        HoldPump() {
            super( "MatchingPump" );
        }
    
        public void submit( Person p ) {
            Incoming incoming = new Incoming( p.getId(), p.getFriendIds() ) );
            queueProcessing( incoming );
        }
    
        public void queueProcessing( Incoming incoming ) ... {
            queue.put( incoming );
        }
    
        @Override
        public void run() {
            try {
                while ( true ) {
                    Incoming incoming = queue.take();
                    tryMatch( incoming );
                }
            } catch ( InterruptedException e ) {
                Thread.interrupted();
            }
        }
    }
    
    
    
    protected void trytMatch( Incoming incoming ) {
        MatchStatus status = incoming.status;
    
        status.answer( incoming.holdDuration );
    
        for ( Integer candidate : incoming.candidates ) {
            MatchStatus waiting = waitingForMatchByPersonId.get( candidate );
            if ( waiting != null ) {
                waiting.setMatch( incoming.status.getPersonId() );
                status.setMatch( waiting.getPersonId() )
            }
        }
    }
    

    #setMatch方法本质上表示一个已完成的条件,该条件是MatchStatus中可重入锁的一部分

  3. # 3 楼答案

    我仍然不清楚你们匹配系统的细节,但我可以给你们一些一般性的指导

    基本上,如果没有原子读修改写功能,就无法同步进程。我将不讨论如何从数据库中获得它,因为它从简单(带有事务隔离的SQL数据库)到不可能(一些NoSQL数据库)不等。如果无法从数据库中获取,则除了在内存中执行同步之外别无选择

    其次,您需要能够以原子方式同时从可用性池中删除两个匹配的人。但是,作为同一个原子操作的一部分,您还需要在将它们分配给彼此之前验证它们是否仍然可用

    第三,为了最大化吞吐量,您将系统设计为检测竞争条件,而不是防止竞争条件,并在检测到竞争时实施恢复过程

    所有这些在内存中比在数据库中更容易实现(性能也更高)。所以,如果可能的话,我会在记忆中这样做

    1. 创建一个按插入顺序排列的内存中匹配池,以便每个请求都知道哪个请求在之前,哪个在之后。(这并不一定要反映请求的顺序,只需反映请求插入池的顺序即可。)
    2. 请求进来了。请求进入内存中的匹配池,数据库状态更改为“搜索”
    3. 请求线程在内存池中搜索旧的匹配请求
      1. 如果找到一个,那就是匹配
      2. 如果没有找到,则请求线程退出
      3. 如果在搜索时,它与较新的请求匹配,它将停止搜索,并让较新的请求将其从池中删除
    4. 匹配时,较新的请求通知较旧的请求停止搜索,并且两个请求都将从池中删除。如果检测到比赛,任何检测到比赛的人都会停止/撤消他们正在做的事情,并根据新信息继续进行。您必须设计竞争检测的顺序,以确保这种行为不会导致孤立匹配(类似于死锁),但这是完全可行的
    5. 从池中删除匹配项后,将更新其数据库状态
    6. 一个单独的工作线程按照从最旧到最新的顺序扫描队列,并删除过期的请求,以新的状态更新数据库

    在该系统中,唯一的阻塞同步操作是插入到匹配池中,然后从匹配池中删除,这些是单独的锁。(请求线程在从匹配池中删除其请求之前,必须获得一个锁,查看它是否仍在匹配池中,如果不在匹配池中,则转移到比赛恢复过程。)我相信这就是同步的理论极限。(好的,我想当池已满时,您还必须阻止插入到池中,但是您还能做什么?如果您可以创建一个新池,那么您可以扩展现有池。)

    请注意,通过对请求队列进行排序并按顺序进行搜索,可以保证请求线程可以执行完整的搜索。如果它找不到搜索,那么唯一的希望就是后面的请求将匹配,并且该匹配将由后面的请求线程找到

  4. # 4 楼答案

    您可能可以使用ConcurrentHashMap。我假设您的对象具有可以匹配的键,例如PersonA和PersonB将具有“Person”键

    ConcurrentHashMap<String, Match> map = new ConcurrentHashMap<>();
    
    void addMatch(Match match) {
        boolean success = false;
        while(!success) {
            Match oldMatch = map.remove(match.key);
            if(oldMatch != null) {
                match.setMatch(oldMatch);
                success = true;
           } else if(map.putIfAbsent(match.key, match) == null) {
                success = true;
           }
       }
    }
    

    在将匹配项添加到地图或删除现有匹配项并将其配对之前,您将保持循环removeputIfAbsent都是原子的

    Edit:因为您想将数据卸载到磁盘上,因此可以使用例如MongoDB及其findAndModify方法。如果已存在具有密钥的对象,则命令将删除并返回该对象,以便您可以将旧对象与新对象配对,并可能存储与新密钥关联的配对;如果带有键的对象不存在,则命令将存储带有键的对象。这相当于ConcurrentHashMap的行为,只是数据存储在磁盘上而不是内存中;您不必担心两个对象同时写入,因为findAndModify逻辑防止它们无意中占用同一个键

    如果需要将对象序列化为JSON,请使用Jackson

    Mongo还有其他选择,例如DynamoDB,尽管Dynamo只免费提供少量数据

    编辑:鉴于好友列表不是自反式的,我认为您可以通过MongoDB(或另一个带有原子更新的键值数据库)和ConcurrentHashMap的组合来解决这个问题

    1. MongoDB中的人员不是“匹配”就是“不匹配”(如果我说“将某人从MongoDB中删除”,我的意思是“将此人的状态设置为“匹配”。)
    2. 添加新人物时,首先为其创建一个ConcurrentHashMap<key, boolean>,可能是在全局ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>
    3. 反复浏览新人的朋友:
    4. 如果一个朋友在MongoDB中,那么使用findAndModify将其自动设置为“匹配”,然后以“匹配”状态将此人写入MongoDB,最后将该对添加到MongoDB中的“对”集合中,最终用户可以查询该集合。从全局映射中删除此人的ConcurrentHashMap
    5. 如果该好友不在MongoDB中,则检查该好友是否已写入当前好友的关联ConcurrentHashMap。它有,然后什么也不做;如果没有,那么检查朋友是否有一个与之关联的ConcurrentHashMap;如果是,则将与当前人员密钥关联的值设置为“true”(请注意,两个朋友仍有可能写入彼此的哈希映射,因为当前用户无法检查自己的映射并使用一个原子操作修改朋友的映射,但自哈希映射检查减少了这种可能性。)
    6. 如果此人未被匹配,则以“未匹配”状态将其写入MongoDB,从全局映射中删除其ConcurrentHashMap,并创建一个延迟任务,该任务将遍历所有写信给此人ConcurrentHashMap的朋友的ID(即使用ConcurrentHashMap#keySet())。此任务的延迟应该是随机的(例如Thread.sleep(500 * rand.nextInt(30))),这样两个朋友就不会总是试图同时匹配。如果当前用户没有任何需要重新检查的朋友,则不要为其创建延迟任务
    7. 延迟结束后,为此人创建一个新的ConcurrentHashMap,从MongoDB中删除不匹配的此人,然后返回到步骤1。如果此人已经匹配,则不要将其从MongoDB中删除并终止延迟的任务

    在常见情况下,一个人要么与朋友匹配,要么在迭代朋友列表时,在没有将朋友添加到系统的情况下无法匹配(即,该人的ConcurrentHashMap将为空)。如果朋友同时写信:

    同时添加Friend1和Friend2

    1. Friend1写信给Friend2的ConcurrentHashMap,表示他们错过了对方
    2. Friend2写入Friend1的ConcurrentHashMap以表示相同(只有当Friend2检查Friend1是否在Friend1写入映射的同时写入映射时,才会发生这种情况-序号ily Friend2会检测到Friend1已写入其地图,因此不会写入Friend1的地图)
    3. Friend1和Friend2都写信给MongoDB。Friend1在后续任务中随机获得5秒延迟,Friend2随机获得15秒延迟
    4. Friend1的任务首先触发,并与Friend2匹配
    5. Friend2的任务排在第二位;Friend2不再位于MongoDB中,因此任务立即终止

    几个小问题:

    1. 可能Friend1和Friend2都没有与之关联的ConcurrentHashMaps,例如,Friend1检查映射是否在内存中时,Friend2是否仍在初始化其哈希映射。这很好,因为Friend2将写入Friend1的哈希映射,因此我们保证最终将尝试匹配-其中至少一个将具有哈希映射,而另一个正在迭代,因为哈希映射创建在迭代之前
    2. 如果两个朋友的任务以某种方式同时启动,则匹配的第二次迭代可能会失败。在这种情况下,如果朋友在MongoDB中处于匹配状态,则应将其从列表中删除;然后,他们应该将结果列表与写入其ConcurrentHashMap的朋友列表合并,然后下一次迭代应该将其用作新的朋友列表。最终,此人将被匹配,否则此人的“重新检查”好友列表将被清空
    3. 您应该在每个后续迭代中增加任务延迟,以增加两个朋友的任务不会同时运行的概率(例如,第一次迭代中的Thread.sleep(500 * rand.nextInt(30)),第二次迭代中的Thread.sleep(500 * rand.nextInt(60)),第三次迭代中的Thread.sleep(500 * rand.nextInt(90)),等等)
    4. 在后续的迭代中,您必须在从MongoDB中删除此人之前创建此人的新ConcurrentHashMap,否则您将面临数据竞争。同样,在遍历MongoDB的潜在匹配项时,必须将其从MongoDB中删除,否则可能会无意中匹配两次

    编辑:一些代码:

    方法addUnmatchedToMongo(person1)将“不匹配”的person1写入MongoDB

    setToMatched(friend1)使用findAndModify以原子方式将friend1设置为“匹配”;如果friend1已匹配或不存在,则该方法将返回false;如果更新成功,则该方法将返回true

    isMatched(friend1)如果friend1存在且匹配,则返回true;如果不存在或不存在且“不匹配”,则返回false

    private ConcurrentHashMap<String, ConcurrentHashMap<String, Person>> globalMap;
    private DelayQueue<DelayedRetry> delayQueue;
    private ThreadPoolExecutor executor;
    
    executor.execute(new Runnable() {
        public void run() {
            while(true) {
                Runnable runnable = delayQueue.take();
                executor.execute(runnable);
            }
        }
    }
    
    public static void findMatch(Person person, Collection<Person> friends) {
        findMatch(person, friends, 1);
    }
    
    public static void findMatch(Person person, Collection<Person> friends, int delayMultiplier) {
        globalMap.put(person.id, new ConcurrentHashMap<String, Person>());
        for(Person friend : friends) {
            if(**setToMatched(friend)**) {
                // write person to MongoDB in "matched" state
                // write "Pair(person, friend)" to MongoDB so it can be queried by the end user
                globalMap.remove(person.id);
                return;
            } else {
                if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
                    // the existence of "friendMap" indicates another thread is currently  trying to match the friend
                    ConcurrentHashMap<String, Person> friendMap = globalMap.get(friend.id);
                    if(friendMap != null) {
                        friendMap.put(person.id, person);
                    }
                }
            }
        }
        **addUnmatchedToMongo(person)**;
        Collection<Person> retryFriends = globalMap.remove(person.id).values();
        if(retryFriends.size() > 0) {
            delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier), person, retryFriends, delayMultiplier));
        }
    }
    
    public class DelayedRetry implements Runnable, Delayed {
        private final long delay;
        private final Person person;
        private final Collection<Person> friends;
        private final int delayMultiplier;
    
        public DelayedRetry(long delay, Person person, Collection<Person> friends, delayMultiplier) {
            this.delay = delay;
            this.person = person;
            this.friends = friends;
            this.delayMultiplier = delayMultiplier;
        }
    
        public long getDelay(TimeUnit unit) {
            return unit.convert(delay, TimeUnit.MILLISECONDS);
        }
    
        public void run {
            findMatch(person, friends, delayMultiplier + 1);
        }
    }