有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java io。内蒂。util。Netty中的非法引用计数异常:refCnt:0

我尝试创建一个代理,然后在Netty中传递所有trafic其他处理程序。我知道我应该管理对ByteBuf的引用,但我不知道如何做。下面是我的示例和例外

初始值设定项:

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    private final String remoteHost;

    private final int remotePort;

    public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.sslCtx = sslCtx;
    }

    public HexDumpProxyInitializer(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.sslCtx = null;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }

        p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
        p.addLast(new InboundPrinterHandler());
    }
}

HexDumpProxyFrontendHandler

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    private Channel outboundChannel;

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
         .channel(ctx.channel().getClass())
         .handler(new HexDumpProxyBackendHandler(inboundChannel))
         .option(ChannelOption.AUTO_READ, false);
        ChannelFuture f = b.connect(remoteHost, remotePort);
        outboundChannel = f.channel();
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

   static void closeOnFlush(Channel ch) {
       if (ch.isActive()) {
           ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
       }
   }
}

内置打印机手柄

public class InboundPrinterHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf bb = null;
        bb = (ByteBuf) msg;
        System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset()));
        System.out.println("\n\n\n");
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }


}

例外

io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331)
    at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614)
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213)
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208)
    at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

共 (1) 个答案

  1. # 1 楼答案

        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            // Snip
        }
        ctx.fireChannelRead(msg);
    

    将字节缓冲区传递到另一个通道后,其他通道负责再次减少引用计数。因为另一个通道现在已经减少了引用计数,所以它现在不可用

    解决此问题的最佳方法是在使用^{}将流量传递到另一个通道之前手动增加值:

    outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() {
    // Your remaining code