本文参考资源:
LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园
在TCP协议中,一个完整的包可能会被拆分为多个包进行发送,也有可能把多个小的包封装成一个大的数据包进行发送,这就是所谓的TCP粘包和拆包问题。
粘包/拆包发生的原因
- 应用程序write写入的字节大小大于套接口发送缓冲区大小
- 进行MSS大小的TCP分段(最大报文长度)
- 以太网帧的payload大于MTU进行IP分片(链路层的最大传输单元)
发生粘包问题的时间服务器案例
服务端
public class NettyServer {
public void bind(int port) throws Exception{
//配置服务器的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = serverBootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new NettyServer().bind(8080);
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String body = buf.toString(CharsetUtil.UTF_8);
System.out.println("The time server receive order: "+ body + "; the counter is: " + ++counter);
//如果收到的请求是QUERY TIME ORDER就返回当前时间
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(
System.currentTimeMillis()
).toString():"BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class NettyClient {
public void connect(int port,String host) throws Exception{
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new NettyClient().connect(port, "127.0.0.1");
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
private int counter;
private byte[] req;
public NettyClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//连续发送一百条QUERY TIME ORDER消息
for(int i = 0;i<100;i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String body = buf.toString(CharsetUtil.UTF_8);
System.out.println("Now is:"+body+"; the counter is: "+ ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warning("Unexpected exception from downstream: "+cause.getMessage());
ctx.close();
}
}
分别运行服务端和客户端,预期结果应该是服务端响应100次时间,然而服务端打印:
The time server receive order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
---省略部分,此处共57条QUERY TIME ORDER----
QUERY TIME ORDER
QUERY TIME ORDER; the counter is: 1
The time server receive order:
QUERY TIME ORDER
---省略部分,此处共43条QUERY TIME ORDER----
QUERY TIME ORDER
QUERY TIME ORDER
; the counter is: 2
可见发生了粘包问题,自然客户端也没有收到正确的响应:
Now is:BAD ORDER
BAD ORDER
; the counter is: 1
解决策略
- 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
- 在包尾增加回车换行符进行分割,例如FTP协议;
- 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
- 更复杂的应用层协议。
使用LineBasedFrameDecoder解决TCP粘包问题
为了解决TCP粘包/拆包导致的半包读写问题,Netty 默认提供了多种编解码器用于处理半包,LineBasedFrameDecoder是其中的一种,通过换行符来分包。
使用LineBasedFrameDecoder改写后的案例
需要改写的有四个地方:
在NettyServer中pipeline添加处理器
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyServerHandler());
}
}
在NettyServerHandler中把消息当字符串处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//转String
String body = (String) msg;
System.out.println("The time server receive order: "+ body + "; the counter is: " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(
System.currentTimeMillis()
).toString():"BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
在NettyClient中pipeline添加处理器
new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyClientHandler());
}
}
在NettyClientHandler中把消息当字符串处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is:"+body+"; the counter is: "+ ++counter);
}
再次分别运行服务端、客户端,发现问题解决:
The time server receive order: QUERY TIME ORDER; the counter is: 1
The time server receive order: QUERY TIME ORDER; the counter is: 2
The time server receive order: QUERY TIME ORDER; the counter is: 3
-----------------------省略---------------------------
The time server receive order: QUERY TIME ORDER; the counter is: 98
The time server receive order: QUERY TIME ORDER; the counter is: 99
The time server receive order: QUERY TIME ORDER; the counter is: 100
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 1
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 2
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 3
-----------------------省略-------------------------
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 98
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 99
Now is:Sun Mar 15 16:39:58 CST 2020; the counter is: 100
LineBasedFrameDecoder工作原理
LineBasedFrameDecoder
的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n
”或者“\r\n
”, 如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。
StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。 LineBasedFrameDecoder + StringDecoder
组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。
如果发送的消息不是以换行符结束的,或者没有回车换行符,希望根据消息头中的长度字段来分包,Netty提供了多种支持TCP粘包/拆包的解码器,用来满足用户的不同诉求。
其他解码器
DelimiterBasedFrameDecoder
这是一个根据自定义分隔符分包的解码器
使用:
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//第一个参数代表单条消息的最大长度,当达到该长度仍没有查到分隔符抛出异常
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyServerHandler());
}
}
发消息的时候也要在消息末尾增加"$_
",和LineBasedFrameDecoder加换行符差不多的做法。
FixedLengthFrameDecoder
这是一个按固定长度分包的解码器
使用:
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyServerHandler());
}
}
无论一次接收到多少数据报,它都会按照构造函数中设置的固定长度进行解码。适用于数据长度固定的情况,否则在handler中手动填充长度也行。
LengthFieldBasedFrameDecoder
这个就是在在数据包中,加了一个长度字段(长度域),保存上层包的长度。解码的时候,会按照这个长度,进行上层ByteBuf应用包的提取。
使用:
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyServerHandler());
}
}
其中五个参数:
(1)maxFrameLength
- 发送的数据包最大长度;
(2)lengthFieldOffset
- 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;
(3)lengthFieldLength
- 长度域的自己的字节数长度。
(4)lengthAdjustment
– 长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。
(5)initialBytesToStrip
– 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有4个节点的长度域,则它的值为4。
LengthFieldBasedFrameDecoder(1024,0,4,0,4)
的意思就是,数据包最大长度为1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)
在使用的时候handler写ByteBuf数据的时候应该在首部增添一个四个字节的长度域,代表数据长度。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/netty%e8%a7%a3%e5%86%b3tcp%e7%b2%98%e5%8c%85%e6%8b%86%e5%8c%85-%e5%9b%9b%e7%a7%8d%e8%87%aa%e5%b8%a6%e8%a7%a3%e7%a0%81%e5%99%a8/