有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java netty ChannelInboundHandlerAdapter以约1500字节的速度剪切帧

我已经使用netty框架实现了一个服务器应用程序,它使用ChannelInblundHandlerAdapter读取传入字节

如标题所示,我的问题是,我不定期地从客户端获取内容,我想是的,这些内容在约1.500字节后被剪切。例如:在这种情况下,我应该收到一个大的JSON数组。因为它被切割了,我无法解析它

在使用消息之前,我尝试使用管道中的附加ByteToMessageDecoder通道对消息进行解码。但这并不能解决问题。我在JSON中没有分隔符,我可以检查并再次将两个(或更多)部分粘在一起

以下是我的管道配置:

        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new IdleStateHandler(45,0,0));
                        ch.pipeline().addLast(new MyByteToMessageDecoder());
                        ch.pipeline().addLast(new GatewayCommunicationHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.SO_RCVBUF, 8192)
                .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(8192))
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        initRestServer();

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(Config.gatewayPort).sync();
        f.channel().closeFuture().sync();

这就是我的ByteToMessageDecoder:(我知道它很乱,但我不知道在我的情况下如何处理它)

public class MyByteToMessageDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    byte[] receivedBytes = new byte[in.readableBytes()];
    in.getBytes(in.readerIndex(), receivedBytes);


    if (receivedBytes[in.readableBytes()-1] != (byte) 0) {
        out.add(receivedBytes);
        return;
    }

    int lenForOutBytes = 0;
    for (Object o : out) {
        byte[] bytes = (byte[]) o;
        lenForOutBytes += bytes.length;
    }

    byte[] outBytes = new byte[lenForOutBytes];

    for (Object o : out) {
        byte[] bytes = (byte[]) o;

        if (out.size() == 1) {
            outBytes = (byte[]) out.get(0);
        }
        else {
            int i = 0;

            for (int j = 0; j < bytes.length; j++) {
                outBytes[i + j] = bytes[j];
            }
            i += bytes.length;
        }
    }

    ctx.fireChannelRead(outBytes);
    in.resetReaderIndex();
}
...

还有其他人有这样的问题吗

谢谢你的回复

比尔·乔


共 (1) 个答案

  1. # 1 楼答案

    I have seen that this issue happens frequently, so I'm purposely a bit broader than I usually do

    之所以会出现这个问题,是因为TCP是基于流的,而不是基于数据包的

    这基本上发生了:

    1. [客户端]想要发送10k字节的数据
    2. [client]将数据发送到TCP层
    3. [client]TCP层将数据包拆分,它知道最大数据包大小为1500(这是几乎所有网络使用的默认MTU
    4. [客户端]客户端向服务器发送数据包,其中包含40字节作为头,1460字节作为数据
    5. [server]Netty接收第一个数据包,并直接调用函数,第一个数据包包含1460字节的数据
    6. [服务器]在您的功能需要处理剩余数据(初始数据-1260)的时间内

    所以解决这个问题有多种方法

    用长度预先结束邮件:

    虽然这通常是解决数据包问题的最简单方法,但在同时处理大小消息时,它也是效率最低的方法。这也需要改变协议

    其基本思想是在发送数据包之前预先设定长度,这样可以正确分割消息

    优势

    • 无需在数据上循环以过滤掉字符,或阻止禁止的字符
    • 如果你的网络中有中继系统,他们不必对消息边界进行任何硬解析

    缺点

    • 消息的长度必须是已知的,在长消息中,这是内存扩展

    怎么做

    如果使用标准整型字段,这非常简单,因为Netty为其内置了类:

    这在管道中以以下方式使用

    // int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip
    pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 4, 0, 2, 0, 2));
    // int lengthFieldLength, int lengthAdjustment
    pipeline.addLast(new LengthFieldPrepender(2, 0));
    

    这基本上是按照如下方式对数据包进行帧处理:

    您发送:

    DATA: 12B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21             |Hello World!    |
    +    +                        -+        +
    

    ^{}将其转换为:

    DATA: 14B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 00 0c 48 65 6c 6c 6f 20 57 6f 72 6c 64 21       |..Hello World!  |
    +    +                        -+        +
    

    然后,当您收到消息时,^{}将其解码为:

    DATA: 12B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21             |Hello World!    |
    +    +                        -+        +
    

    在简单分隔符上拆分消息

    有些协议采用不同的方法,而不是按固定长度拆分,而是按分隔符拆分。快速查看的方法是,Java中的字符串以"结尾,文本文件中的行以换行符结尾,自然文本中的段落以双换行符结尾

    优势

    • 如果您知道某个数据不包含字符,则生成相对容易,例如JSON通常不包含空格,因此按空格分隔消息很容易
    • 易于通过脚本语言实现,因为不需要状态

    缺点

    • 与框架字符的冲突可能会增大消息大小
    • 长度事先不知道,所以要么在代码中设置硬编码限制,要么继续读取,直到内存耗尽或数据结束
    • 即使你对数据包不感兴趣,也需要阅读每个字符

    怎么做

    当从Netty发送消息时,需要手动将分隔符添加到消息本身,当接收到消息时,可以使用^{}将传入流解码为消息

    管道示例:

    这在管道中以以下方式使用

    // int maxFrameLength, ByteBuf... delimiters
    pipeline.addLast(1024 * 4, DelimiterBasedFrameDecoder(Delimiters.lineDelimiter()));
    

    发送消息时,需要手动添加分隔符:

    DATA: 14B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 0d 0a       |Hello World!..  |
    +    +                        -+        +
    

    接收消息时,^{}将消息转换为帧:

    DATA: 12B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21             |Hello World!    |
    +    +                        -+        +
    

    基于复杂业务分隔符的拆分

    并不是所有的框架都很容易,如果避免的话,有些解决方案实际上是最好的,但有时,你真的需要做一些肮脏的工作

    优势

    • 几乎可以处理所有现有的数据结构
    • 不需要修改协议

    缺点

    • 通常你必须检查每个字节
    • 代码可能很难理解
    • 快速的解决方案可能会带来奇怪的错误放在它认为格式错误的地方

    这分为两类:

    • 基于现有解码器
    • 模式检测

    基于现有解码器

    有了这些解决方案,您基本上可以使用来自其他框架的现有解码器来解析数据包,并检测数据包处理中的故障

    GSON^{}为例:

    public class GSONDecoder
        extends ReplayingDecoder<Void> {
    
        Gson gson = new GsonBuilder().create();
    
        protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) 
            throws Exception {
    
            out.add(gson.fromJson(new ByteBufInputStream(buf, false), Object.class));
        }
    }
    

    模式检测

    如果要使用模式检测方法,需要了解协议。让我们为JSON制作一个模式检测解码器

    基于JSON的结构,让我们做以下假设:

    1. JSON基于{}以及[]的匹配对
    2. "之间应忽略{}的匹配对
    3. ^当前面有一个\时,{}应该被忽略
    4. 从左到右解析时,如果前面有\,则应忽略\

    基于这些属性,让我们根据以下假设进行^{}

    public static class JSONDecoder extends ByteToMessageDecoder {
    
        // Notice, this class is designed for JSON without a charset definition at the start, adding this is hard as we basicly have to call differend
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            in.markReaderIndex();
    
            int fromIndex = in.readerIndex();
    
            int unclosedCurlyBracketsSeen = 0;
            boolean inQuotedSection = false;
            boolean nonWhitespaceSeen = false;
            boolean slashSeen = false;
    
            while (in.isReadable()) {
                boolean newSlashSeenState = false;
                byte character = in.readByte();
                if (character == '{' && !inQuotedSection) {
                    unclosedCurlyBracketsSeen++;
                }
                if (character == '}' && !inQuotedSection) {
                    unclosedCurlyBracketsSeen ;
                }
                if (character == '[' && !inQuotedSection) {
                    unclosedCurlyBracketsSeen++;
                }
                if (character == ']' && !inQuotedSection) {
                    unclosedCurlyBracketsSeen ;
                }
                if (character == '"' && !slashSeen) {
                    inQuotedSection = !inQuotedSection;
                }
                if (character == '\\' && !slashSeen) {
                    newSlashSeenState = true;
                }
                if (!Character.isWhitespace(character)) {
                    nonWhitespaceSeen = true;
                }
                slashSeen = newSlashSeenState;
                if(unclosedCurlyBracketsSeen == 0 && nonWhitespaceSeen) {
                    int targetIndex = in.readerIndex();
                    out.add(in.slice(fromIndex, targetIndex - fromIndex).retain());
                    return;
                }
            }
    
            // End of stream reached, but our JSON is not complete, reset our progress!
            in.resetReaderIndex();
        }
    
    }
    

    接收消息时,它是这样工作的:

    DATA: 35B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 7b 22 68 69 21 22 2c 22 53 74 72 69 6e 67 3a 20 |{"hi!","String: |
    |00000010| 5c 22 48 69 5c 22 22 7d 20 20 7b 22 73 6c 61 73 |\"Hi\""}  {"slas|
    |00000020| 68 22 3a                                        |h":             |
    +    +                        -+        +
    
    DATA: 34B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 22 5c 5c 22 7d 7b 22 4e 65 73 74 65 64 3a 22 3a |"\\"}{"Nested:":|
    |00000010| 7b 22 64 65 65 70 65 72 22 3a 7b 22 6f 6b 22 7d |{"deeper":{"ok"}|
    |00000020| 7d 7d                                           |}}              |
    +    +                        -+        +
    

    如您所见,我们收到了2条消息,其中1条甚至在2个“虚拟TCP”数据包之间被分割,这被我们的“JSON解码器”转换为以下字节码数据包:

    DATA: 24B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 7b 22 68 69 21 22 2c 22 53 74 72 69 6e 67 3a 20 |{"hi!","String: |
    |00000010| 5c 22 48 69 5c 22 22 7d                         |\"Hi\""}        |
    +    +                        -+        +
    
    DATA: 16B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 20 20 7b 22 73 6c 61 73 68 22 3a 22 5c 5c 22 7d |  {"slash":"\\"}|
    +    +                        -+        +
    
    DATA: 29B
             +                        -+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +    +                        -+        +
    |00000000| 7b 22 4e 65 73 74 65 64 3a 22 3a 7b 22 64 65 65 |{"Nested:":{"dee|
    |00000010| 70 65 72 22 3a 7b 22 6f 6b 22 7d 7d 7d          |per":{"ok"}}}   |
    +    +                        -+        +