有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

服务器和客户端的tcp客户端数量因Java NIO而异

我正在学习该系统如何将数百万个连接扩展到一个盒子

通过早期的一系列学习TCP三方握手,NIO接受连接的方式。我运行了多个测试(简单的客户机-服务器代码),并一直跟踪计数器-

在服务器上 成功接受的次数()

在客户端 成功打开的次数()即成功计数&;服务器不接受连接时的异常计数&;异常被抛出

在客户端,成功计数和;异常计数等于启动到服务器的连接数(N),但在服务器端,成功计数小于客户端成功计数(即使服务器未拒绝任何连接)

例:要启动的连接数N=10_000

场景1:(没有被服务器拒绝的连接,即在调用opne()时客户端没有引发异常)

服务器成功计数:9997

客户端成功计数:10_000,异常计数:0

场景2:(服务器拒绝的连接很少,即在客户端调用opne()时出现异常,错误为连接重置

服务器成功计数:9795

客户端成功计数:9995,异常计数:5

服务器代码:

import java.nio.*;
import java.nio.channels.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Server implements Runnable {
    private final int port;
    private ServerSocketChannel ssc;
    private Selector selector;
    private ByteBuffer buf = ByteBuffer.allocate(256);
    private AtomicInteger clientCount = new AtomicInteger(0);

    Server(int port) throws IOException {
        this.port = port;
        this.ssc = ServerSocketChannel.open();
        this.ssc.socket().bind(new InetSocketAddress(port),128);
        this.ssc.configureBlocking(false);
        this.selector = Selector.open();

        this.ssc.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
        try {
            System.out.println("Server starting on port " + this.port);

            Iterator<SelectionKey> iter;
            SelectionKey key;
            while (this.ssc.isOpen()) {
                selector.select();
                iter = this.selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    key = iter.next();
                    iter.remove();

                    if (key.isAcceptable()) this.handleAccept(key);
                    if (key.isReadable()) this.handleRead(key);
                }
            }
        } catch (IOException e) {
            System.out.println("IOException, server of port " + this.port + " terminating. Stack trace:");
            e.printStackTrace();
        }
    }

    private final ByteBuffer welcomeBuf = ByteBuffer.wrap("Welcome to Server!\n".getBytes());

    private void handleAccept(SelectionKey key) throws IOException {
        SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
        String address = (new StringBuilder(sc.socket().getInetAddress().toString())).append(":").append(sc.socket().getPort()).toString();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ, address);
        /*sc.write(welcomeBuf);
        welcomeBuf.rewind();*/
        System.out.println(String.format("accepted connection from: %s, number of clients: %d", address, clientCount.incrementAndGet()));//this count is lesser than success_count of client
    }

    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel ch = (SocketChannel) key.channel();
        StringBuilder sb = new StringBuilder();

        buf.clear();
        int read = 0;

        while (ch.isConnected() && (read = ch.read(buf)) > 0) {
            buf.flip();
            byte[] bytes = new byte[buf.limit()];
            buf.get(bytes);
            sb.append(new String(bytes));
            buf.clear();
        }
        String msg;
        if (read < 0) {
            msg = key.attachment() + " left the chat.\n";
            ch.close();
        } else {
            msg = key.attachment() + ": " + sb.toString();
        }

        System.out.println(String.format("Received message from client: %s", msg));
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server(10523);
        (new Thread(server)).start();
    }
}

客户端代码:

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

class Task implements Runnable {
    int id;
    Client client;

    public Task(int id) {
        this.id = id;
    }

    public Task(int id, Client client) {
        this.id = id;
        this.client = client;
    }

    @Override
    public void run() {
        try {
            int port = 10523;
            InetAddress hostIP = InetAddress.getLocalHost();
            InetSocketAddress myAddress =
                    new InetSocketAddress(hostIP, port);

            SocketChannel myClient = SocketChannel.open();
            myClient.socket().connect(myAddress);

            if(myClient.socket().isConnected()){
                client.successCount.incrementAndGet();
            }
        } catch (Exception e) {
            System.out.println("exception count: "+client.exceptionCount.addAndGet(1));
            e.printStackTrace();
        }
    }
}

public class Client {
    AtomicInteger successCount = new AtomicInteger();
    AtomicInteger exceptionCount = new AtomicInteger();
    public static void main(String[] args) throws InterruptedException {
        Client client = new Client();
        client.process();
    }

    private void process() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(50);
        int N = 10_000;
        for (int i = 0; i < N; i++) {
            Task task = new Task(i, this);
            executorService.submit(task);
        }
        while (true){
            Thread.sleep(8000);
            System.out.println("success count: "+successCount.get());//success_count
        }
    }
}

我只是接受连接,不做任何读/写操作

这可能是非常基本的,但我被卡住了,无法进一步调试。今天任何一个指针都能帮我学到新东西

编辑:

我尝试了一个单线程客户端,它按顺序打开N个连接,但同样的问题也出现了。在客户端显示的许多成功/连接比在服务器端显示的多


共 (1) 个答案

  1. # 1 楼答案

    • 在详细了解NIO及其语义之后,我理解了这个问题
    • 在上面的客户机代码中,我有点点击服务器(建立连接),但没有听连接是否成功。我通过选择器和OP_CONNECT事件实现了这一点。代码-
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Simple Client, connect to server, listens for any events, writes dummy message to server every 3 seconds
     */
    class ClientTask implements Runnable {
        private Client client;
        private String clientId;
        private Selector clientSelector;
        private static final int BUFFER_SIZE = 1024;
        private ByteBuffer myBuffer = ByteBuffer.allocate(BUFFER_SIZE);
        private SelectionKey key;
    
        public ClientTask(String clientId, Client client) throws IOException {
            this.clientId = clientId;
            this.client = client;
            this.clientSelector = Selector.open();
        }
    
        @Override
        public void run() {
            try {
                InetSocketAddress serverAddr = new InetSocketAddress(client.getHost(), client.getPort());
                System.out.println(String.format
                        ("%s connecting to <%s:%d>", clientId, serverAddr.getHostName(), serverAddr.getPort()));
    
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
                this.key = socketChannel.register(this.clientSelector, SelectionKey.OP_CONNECT);//listens for connection establishment events
                this.key.attach(clientId);
                socketChannel.connect(serverAddr);
                while (true) {
                    //wait for seconds
                    synchronized (this) {
                        wait(3_000);
                    }
                    clientSelector.selectNow();//non blocking call
                    Iterator<SelectionKey> iterator = clientSelector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        if (selectionKey.isConnectable()) {
                            accept(selectionKey);
                        } else if (selectionKey.isReadable()) {
                            read(selectionKey);
                        }
                    }
                    //write dummy message to server
                    write(this.key, "Dummy message from Client");
                }
            } catch (Exception e) {
                System.out.println("exception count: " + client.failedConnectionCount.addAndGet(1));
                e.printStackTrace();
            }
        }
    
        /*
         * On successful connection, add read(OP_READ) as new event to interested Operation set
         * */
        private void accept(SelectionKey selectionKey) throws IOException {
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            boolean finishConnect = channel.finishConnect();
            String cId = selectionKey.attachment().toString();
            if (finishConnect) {
                selectionKey.interestOps(SelectionKey.OP_READ);
                System.out.println(String.format("%s connected properly", cId));
                client.successConnectCount.addAndGet(1);//increment successful connection count
            } else {
                throw new RuntimeException(String.format("%s didn't connect properly", cId));
            }
        }
    
        // Writes to server
        private void write(SelectionKey clientKey, String msg) throws IOException {
            if (clientKey.isValid() && clientKey.channel() instanceof SocketChannel) {
                ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes());
                SocketChannel ch = (SocketChannel) clientKey.channel();
                String address = ch.getRemoteAddress().toString();
                int writeBytes = ch.write(msgBuf);
                msgBuf.rewind();
                if (writeBytes <= 0) {
                    throw new RuntimeException(String.format("Wrote %d bytes to client %s", writeBytes, address));
                }
            }
        }
    
        // Reads from server
        private void read(SelectionKey selectionKey) throws IOException {
            String cId = selectionKey.attachment().toString();
            StringBuilder sb = new StringBuilder();
            ByteBuffer buf = ByteBuffer.allocate(256 * 4);
            buf.clear();
            SocketChannel ch = (SocketChannel) selectionKey.channel();
    
            while (key.isValid() && ch.isConnected() && ch.read(buf) > 0) {
                buf.flip();
                byte[] bytes = new byte[buf.limit()];
                buf.get(bytes);
                sb.append(new String(bytes));
                buf.clear();
            }
    
            System.out.println(String.format("%s received data :%s from server\n\n", cId, sb.toString()));
            myBuffer.clear();
        }
    }
    
    /**
     * A simple Java NIO Client Manager creates N clients listening to given server
     */
    public class Client {
        AtomicInteger successConnectCount = new AtomicInteger();
        AtomicInteger failedConnectionCount = new AtomicInteger();
    
        private int noOfClients;
        private int port;
        private String host;
    
        public Client(String host, int port, int noOfClients) {
            this.host = host;
            this.port = port;
            this.noOfClients = noOfClients;
        }
    
        public static void main(String[] args) throws InterruptedException, IOException {
            if (args.length < 3) {
                throw new RuntimeException("Pass 3 arguments <server-ip> <server-port> <no-of-clients> and start the client (e.g java Client localhost 1234 100)");
            }
    
            Client client = new Client(args[0], Integer.parseInt(args[1]), Integer.parseInt(args[2]));
            client.process();
        }
    
        private void process() throws InterruptedException {
            try {
                ExecutorService executorService = Executors.newFixedThreadPool(noOfClients);
                for (int i = 1; i <= noOfClients; i++) {
                    ClientTask task = new ClientTask(String.format("Client_%d", i), this);
                    executorService.submit(task);
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            Thread.sleep(Integer.MAX_VALUE);//mock long waiting period
        }
    
        public int getPort() {
            return port;
        }
    
        public String getHost() {
            return host;
        }
    }
    

    如果我们查看while block,它会侦听事件,并在接收到OP_CONNECT(selectionKey.isConnectable())事件时将成功连接计数增加一