Netty解决TCP粘包拆包-四种自带解码器

本文参考资源:

LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园

在TCP协议中,一个完整的包可能会被拆分为多个包进行发送,也有可能把多个小的包封装成一个大的数据包进行发送,这就是所谓的TCP粘包和拆包问题。

粘包/拆包发生的原因

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
  2. 进行MSS大小的TCP分段(最大报文长度)
  3. 以太网帧的payload大于MTU进行IP分片(链路层的最大传输单元)

发生粘包问题的时间服务器案例

服务端

public class NettyServer {
    public void bind(int port) throws Exception{
        //配置服务器的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture future = serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new NettyServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new NettyServer().bind(8080);
    }
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        String body = buf.toString(CharsetUtil.UTF_8);
        System.out.println("The time server receive order: "+ body + "; the counter is: " + ++counter);
        //如果收到的请求是QUERY TIME ORDER就返回当前时间
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(
                System.currentTimeMillis()
                ).toString():"BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

public class NettyClient {
    public void connect(int port,String host) throws Exception{
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new NettyClient().connect(port, "127.0.0.1");
    }
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());

    private int counter;
    private byte[] req;

    public NettyClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //连续发送一百条QUERY TIME ORDER消息
        for(int i = 0;i<100;i++){
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        String body = buf.toString(CharsetUtil.UTF_8);
        System.out.println("Now is:"+body+"; the counter is: "+ ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warning("Unexpected exception from downstream: "+cause.getMessage());
        ctx.close();
    }
}

分别运行服务端和客户端,预期结果应该是服务端响应100次时间,然而服务端打印:

The time server receive order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
---省略部分,此处共57条QUERY TIME ORDER----
QUERY TIME ORDER
QUERY TIME ORDER; the counter is: 1
The time server receive order: 
QUERY TIME ORDER
---省略部分,此处共43条QUERY TIME ORDER----
QUERY TIME ORDER
QUERY TIME ORDER
; the counter is: 2

可见发生了粘包问题,自然客户端也没有收到正确的响应:

Now is:BAD ORDER
BAD ORDER
; the counter is: 1

解决策略

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
  2. 在包尾增加回车换行符进行分割,例如FTP协议;
  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
  4. 更复杂的应用层协议。

使用LineBasedFrameDecoder解决TCP粘包问题

为了解决TCP粘包/拆包导致的半包读写问题,Netty 默认提供了多种编解码器用于处理半包,LineBasedFrameDecoder是其中的一种,通过换行符来分包。

使用LineBasedFrameDecoder改写后的案例

需要改写的有四个地方:

在NettyServer中pipeline添加处理器

private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new NettyServerHandler());
    }
}

在NettyServerHandler中把消息当字符串处理

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //转String
    String body = (String) msg;

    System.out.println("The time server receive order: "+ body + "; the counter is: " + ++counter);
    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(
            System.currentTimeMillis()
            ).toString():"BAD ORDER";
    currentTime = currentTime + System.getProperty("line.separator");
    ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
    ctx.writeAndFlush(resp);
}

在NettyClient中pipeline添加处理器

new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new NettyClientHandler());
    }
}

在NettyClientHandler中把消息当字符串处理

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    System.out.println("Now is:"+body+"; the counter is: "+ ++counter);
}

再次分别运行服务端、客户端,发现问题解决:

The time server receive order: QUERY TIME ORDER; the counter is: 1
The time server receive order: QUERY TIME ORDER; the counter is: 2
The time server receive order: QUERY TIME ORDER; the counter is: 3
-----------------------省略---------------------------
The time server receive order: QUERY TIME ORDER; the counter is: 98
The time server receive order: QUERY TIME ORDER; the counter is: 99
The time server receive order: QUERY TIME ORDER; the counter is: 100
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 1
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 2
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 3
-----------------------省略-------------------------
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 98
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 99
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 100

LineBasedFrameDecoder工作原理

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n, 如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。 LineBasedFrameDecoder + StringDecoder 组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

如果发送的消息不是以换行符结束的,或者没有回车换行符,希望根据消息头中的长度字段来分包,Netty提供了多种支持TCP粘包/拆包的解码器,用来满足用户的不同诉求。

其他解码器

DelimiterBasedFrameDecoder

这是一个根据自定义分隔符分包的解码器

使用:

private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
    protected void initChannel(SocketChannel ch) throws Exception {
        ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
        //第一个参数代表单条消息的最大长度,当达到该长度仍没有查到分隔符抛出异常
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new NettyServerHandler());
    }
}

发消息的时候也要在消息末尾增加"$_",和LineBasedFrameDecoder加换行符差不多的做法。

FixedLengthFrameDecoder

这是一个按固定长度分包的解码器

使用:

private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new NettyServerHandler());
    }
}

无论一次接收到多少数据报,它都会按照构造函数中设置的固定长度进行解码。适用于数据长度固定的情况,否则在handler中手动填充长度也行。

LengthFieldBasedFrameDecoder

这个就是在在数据包中,加了一个长度字段(长度域),保存上层包的长度。解码的时候,会按照这个长度,进行上层ByteBuf应用包的提取。

使用:

private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new NettyServerHandler());
    }
}

其中五个参数:

(1)maxFrameLength - 发送的数据包最大长度;

(2)lengthFieldOffset - 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;

(3)lengthFieldLength - 长度域的自己的字节数长度。

(4)lengthAdjustment – 长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。

(5)initialBytesToStrip – 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有4个节点的长度域,则它的值为4。

LengthFieldBasedFrameDecoder(1024,0,4,0,4)的意思就是,数据包最大长度为1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)

在使用的时候handler写ByteBuf数据的时候应该在首部增添一个四个字节的长度域,代表数据长度。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/netty%e8%a7%a3%e5%86%b3tcp%e7%b2%98%e5%8c%85%e6%8b%86%e5%8c%85-%e5%9b%9b%e7%a7%8d%e8%87%aa%e5%b8%a6%e8%a7%a3%e7%a0%81%e5%99%a8/

发表评论

电子邮件地址不会被公开。