有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java为了在Netty中重用客户端连接(获取“对等方重置连接”),我需要监听哪些事件?

当我尝试在Netty中重用客户机连接时,我得到了java.io.IOException: Connection reset by peer(如果我发送一个请求,这种情况不会发生,但是如果我每次发送两个请求,甚至从一个线程发送,这种情况都会发生)。我目前的方法包括实现一个简单的ChannelPool,其代码如下。注意,key方法从freeChannels成员获得一个空闲通道,如果没有可用通道,则创建一个新通道。方法returnChannel()是负责在完成请求时释放通道的方法。在我们处理响应之后,它在管道内被调用(请参见下面代码中的messageReceived()方法ResponseHandler)。有人知道我做错了什么,为什么我会有例外吗

通道池代码(注意使用freeChannels.pollFirst()获取通过调用returnChannel()返回的空闲通道):

public class ChannelPool {

private final ClientBootstrap cb;
private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
private static Map<Channel, Channel> proxyToClient = new ConcurrentHashMap<Channel, Channel>();

public ChannelPool(InetSocketAddress address, ChannelPipelineFactory pipelineFactory) {
    ChannelFactory clientFactory =
            new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());
    cb = new ClientBootstrap(clientFactory);
    cb.setPipelineFactory(pipelineFactory);
}

private void writeToNewChannel(final Object writable, Channel clientChannel) {
    ChannelFuture cf;
    synchronized (cb) {
        cf = cb.connect(new InetSocketAddress("localhost", 18080));
    }
    final Channel ch = cf.getChannel();
    proxyToClient.put(ch, clientChannel);
    cf.addListener(new ChannelFutureListener() {

        @Override
        public void operationComplete(ChannelFuture arg0) throws Exception {
            System.out.println("channel open, writing: " + ch);
            ch.write(writable);
        }
    });
}

public void executeWrite(Object writable, Channel clientChannel) {
    synchronized (freeChannels) {
        while (!freeChannels.isEmpty()) {
            Channel ch = freeChannels.pollFirst();
            System.out.println("trying to reuse channel: " + ch + " " + ch.isOpen());
            if (ch.isOpen()) {
                proxyToClient.put(ch, clientChannel);
                ch.write(writable).addListener(new ChannelFutureListener() {

                    @Override
                    public void operationComplete(ChannelFuture cf) throws Exception {
                        System.out.println("write from reused channel complete, success? " + cf.isSuccess());
                    }
                });
                // EDIT: I needed a return here
            }
        }
    }
    writeToNewChannel(writable, clientChannel);
}

public void returnChannel(Channel ch) {
    synchronized (freeChannels) {
        freeChannels.addLast(ch);
    }
}

public Channel getClientChannel(Channel proxyChannel) {
    return proxyToClient.get(proxyChannel);
}
}

Netty管道代码(注意RequestHandler调用使用新通道或旧通道的executeWrite(),并且ResponseHandler在收到响应并且在对客户端的响应中设置内容后调用returnChannel()):

public class NettyExample {

private static ChannelPool pool;

public static void main(String[] args) throws Exception {

    pool = new ChannelPool(
            new InetSocketAddress("localhost", 18080),
            new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    return Channels.pipeline(
                            new HttpRequestEncoder(),
                            new HttpResponseDecoder(),
                            new ResponseHandler());
                }
            });
    ChannelFactory factory =
            new NioServerSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());
    ServerBootstrap sb = new ServerBootstrap(factory);

    sb.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            return Channels.pipeline(
                    new HttpRequestDecoder(),
                    new HttpResponseEncoder(),
                    new RequestHandler());
        }
    });

    sb.setOption("child.tcpNoDelay", true);
    sb.setOption("child.keepAlive", true);

    sb.bind(new InetSocketAddress(2080));
}

private static class ResponseHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        final HttpResponse proxyResponse = (HttpResponse) e.getMessage();
        final Channel proxyChannel = e.getChannel();
        Channel clientChannel = pool.getClientChannel(proxyChannel);
        HttpResponse clientResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        clientResponse.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
        HttpHeaders.setContentLength(clientResponse, proxyResponse.getContent().readableBytes());
        clientResponse.setContent(proxyResponse.getContent());
        pool.returnChannel(proxyChannel);
        clientChannel.write(clientResponse);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        Channel ch = e.getChannel();
        ch.close();
    }
}

private static class RequestHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        final HttpRequest request = (HttpRequest) e.getMessage();
        pool.executeWrite(request, e.getChannel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        Channel ch = e.getChannel();
        ch.close();
    }
}
}

编辑:为了提供更多细节,我写了一个关于代理连接的跟踪。请注意,下面涉及由同步apache commons客户端执行的两个串行请求。第一个请求使用一个新的通道并很好地完成,第二个请求尝试重用相同的通道,该通道是开放的和可写的,但莫名其妙地失败了(除了注意到从工作线程抛出的异常之外,我无法截获任何问题)。显然,第二个请求在重试时成功完成。在两个请求完成许多秒后,两个连接最终都关闭(即,即使对等方关闭了连接,我截获的任何事件都不会反映这一点):

channel open: [id: 0x6e6fbedf]
channel connect requested: [id: 0x6e6fbedf]
channel open, writing: [id: 0x6e6fbedf, /127.0.0.1:47031 => localhost/127.0.0.1:18080]
channel connected: [id: 0x6e6fbedf, /127.0.0.1:47031 => localhost/127.0.0.1:18080]
trying to reuse channel: [id: 0x6e6fbedf, /127.0.0.1:47031 => localhost/127.0.0.1:18080] true
channel open: [id: 0x3999abd1]
channel connect requested: [id: 0x3999abd1]
channel open, writing: [id: 0x3999abd1, /127.0.0.1:47032 => localhost/127.0.0.1:18080]
channel connected: [id: 0x3999abd1, /127.0.0.1:47032 => localhost/127.0.0.1:18080]
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
    at sun.nio.ch.IOUtil.read(IOUtil.java:186)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:63)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:373)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:247)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)

共 (1) 个答案

  1. # 1 楼答案

    终于,我明白了。有两个问题导致连接重置。首先,我没有从向代理发送请求的apache commons HttpClient调用releaseConnection()(请参见follow up question)。其次,executeWrite在重复使用连接的情况下,两次向代理服务器发出相同的调用。我需要在第一次写入之后返回,而不是继续执行while循环。这个双重代理调用的结果是,我向原始客户端发出了重复的响应,破坏了与客户端的连接