有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java避免在对象上使用同步锁

我在一个SocketManager类中有我的below方法,该类由后台线程每60秒调用一次。它将ping一个socket,检查它是否处于活动状态,并将所有内容放入liveSocketsByDatacenter映射中

  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }

在相同的SocketManager类中,我还有以下方法getNextSocket()方法将由多个读卡器线程(假设最多10个线程)同时调用,以获取下一个活动socket

  // this method will be called by multiple threads concurrently to get the next live socket
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

问题陈述:

我想确保所有10个线程+计时器线程在调用getNextSocket()方法后不应该使用相同的socket

  • 如果计时器线程正在updateLiveSockets()方法中的socketA上工作,那么所有这10个线程都应该在其他活动socket上工作(这10个线程中的每一个都在不同的活动socket上工作)
  • 所有这10条线都应该在不同的带电插座上工作
  • 另外,如果我们的读卡器线程比可用的活动socket多,那么我仍然需要确保每个读卡器线程在不同的活动socket上工作。一旦一个读卡器线程使用了一个活动socket,那么另一个读卡器线程就可以立即使用该socket或任何更好的方法。我想尽量减少阻塞时间

解决这个问题的最好办法是什么?我可以将synchronize放在10个读卡器线程加上计时器线程的socket上,这将保证只有线程在该socket上工作,但我不想在这里使用同步。必须有更好的方法来确保每个线程同时使用不同的单个活动socket,而不是在特定socket上进行同步。我有大约60个socket和大约10个读卡器线程加上1个计时器线程。我需要在这里使用ThreadLocal概念吗


共 (3) 个答案

  1. # 1 楼答案

    第一个要求是:“我想确保所有10个线程+计时器线程在调用getNextSocket()方法后不应该使用相同的套接字。”我的答案是,没有ThreadLocal,您无法做到这一点

    至于最大限度地利用这些套接字,您肯定需要某种套接字池来管理它们,以便您可以请求和释放它们。每次请求套接字时,您都会将其“标记”为当前线程使用的套接字,这就是ThreadLocal,您可以将套接字引用移动到ThreadLocal中,这样对getNextSocket()的下一个调用就会进入其中并检查是否存在并返回,如果不存在,则转到套接字轮询并拉取一个新的套接字并在ThreadLocal上进行设置。 您将始终需要将套接字释放回池中,不要忘记这一点。也许可以为套接字本身创建某种抽象,这样您就可以使用AutoCloseable接口使释放到套接字池变得更加容易

  2. # 2 楼答案

    似乎您需要实现某种类型的套接字池,并让线程在完成后将套接字释放回池中。同步仅在从池中检索和释放套接字时发生。一旦检索到套接字,就可以在不同步的情况下完成对它的所有访问。我不知道ThreadLocal在这里能帮到你什么

    下面是显示该方法的代码:

      public synchrnozed Optional<SocketHolder> getNextSocket() {
        for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
          Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
          if ( liveSocket.isPresent() && !liveSocket.isUsed() ) {
            liveSocket.setUsed(true);
            return liveSocket;
          }
        }
        return Optional.absent();
      }
    
      public synchrnozed release(SocketHolder s){
         s.setUsed(false);
      }
    

    每个线程都将如下所示:

      new Thread( new Runnable(){
         public void run(){
           SocketHolder sh = null;
           try{
               sh = socketManager.getNextSocket();
               //do some work on socker
           }finally{
               if (sh != null) socketManager.release(sh);
           }
         }
      }; ) ).start();
    
  3. # 3 楼答案

    解决问题的最佳方法是使用ConcurrentQueue 您不需要使用ThreadLocal。ConcurrentQueue是无阻塞的,对于多线程环境非常有效。 例如,这就是如何删除不活动的套接字并保留活动的套接字

        private final Map<Datacenters, ConcurrentLinkedQueue<SocketHolder>> liveSocketsByDatacenter =
              new ConcurrentHashMap<>();
    
    
    // runs every 60 seconds to ping 70 sockets the socket to make sure whether they are alive or not (it does not matter if you ping more sockets than there are in the list because you are rotating the que)
      private void updateLiveSockets() {
        Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    
        for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        Queue<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
          for (int i = 0; i<70; i++) {
                SocketHolder s = liveSockets.poll();
            Socket socket = s.getSocket();
            String endpoint = s.getEndpoint();
            Map<byte[], byte[]> holder = populateMap();
            Message message = new Message(holder, Partition.COMMAND);
    
            // pinging to see whether a socket is live or not
            boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
            boolean isLive = (status) ? true : false;
    
            SocketHolder zmq = new SocketHolder(socket, s.getContext(), endpoint, isLive);
            liveSockets.add(zmq);
          }
        }
      }