有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java在发送所有挂起的消息后关闭socket

我使用以下方法将消息写入socket:

public void sendMessage(byte[] msgB) {
    try {
        synchronized (writeLock) {
            log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
            ous.write(HEADER_MSG);
            ous.writeInt(msgB.length);
            ous.write(msgB);
            ous.flush();
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

现在称为<强>鲍伯< /强>的线程想在某个不确定的时刻关闭socket<强> x,这意味着在{{CD1}}上可能还有等待发送消息的线程,甚至在写入过程中可能有一个线程。

我可以通过让Bob在关闭socket之前获取writeLock来解决后者,但我仍然可能丢失尚未开始发送的消息,因为据我所知synchronized是不公平的,Bob可以在等待时间更长的其他线程之前获得锁

我需要的是,在X之前对sendMessage进行的所有调用都能正常工作,在X之后进行的调用都会抛出错误。我该怎么做

  • 具体说明:Bob是从socket输入流读取的线程,X是在该流上收到“关闭”消息时读取的线程

共 (3) 个答案

  1. # 1 楼答案

    我想我可以用一个ReentrantLock集来替换同步块,这是公平的

  2. # 2 楼答案

    考虑使用单线程^{}来执行消息的写入。发送线程只是试图通过调用execute(Runnable)submit(Callable)来“发送”消息。一旦您希望停止发送消息,您就关闭了ExecutorService^{}),导致随后的调用提交/执行以导致RejectedExecutionException

    这种方法的优点是,您只有一个I/O绑定线程,与多个线程等待自己编写消息相比,锁争用更少。这也是一种更好的关注点分离

    下面是一个OO进一步说明问题的快速示例:

    public interface Message {
      /**
       * Writes the message to the specified stream.
       */
      void writeTo(OutputStream os);
    }
    
    public class Dispatcher {
      private final ExecutorService sender;
      private final OutputStream out;
    
      public Dispatcher() {
        this.sender = Executors.newSingleThreadExecutor();
        this.out = ...; // Set up output stream.
      }
    
      /**
       * Enqueue message to be sent.  Return a Future to allow calling thread
       * to perform a blocking get() if they wish to perform a synchronous send.
       */
      public Future<?> sendMessage(final Message msg) {
        return sender.submit(new Callable<Void>() {
          public Void call() throws Exception {
            msg.writeTo(out);
            return null;
          }
        });
      }
    
      public void shutDown() {
        sender.shutdown(); // Waits for all tasks to finish sending.
    
        // Close quietly, swallow exception.
        try {
          out.close();
        } catch (IOException ex) {
        }
      }
    }
    
  3. # 3 楼答案

    你可以在这里使用遗嘱执行人。由于每个发送消息都是同步的(我假设是在一个公共共享对象上),所以可以使用线程限制

    static final ExecutorService executor = Executors.newSingleThreadExecutor();
    
    public void sendMessage(byte[] msgB) {
        executor.submit(new Runnable() {
            public void run() {
                try {
                    log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
                    ous.write(HEADER_MSG);
                    ous.writeInt(msgB.length);
                    ous.write(msgB);
                    ous.flush();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
    public static void atMomentX(){
        executor.shutdown();
    }
    

    完成后,另一个线程可以调用atMomentX()

    在javadoc中,shutdown方法表示:

    Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.