maven依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
服务端
//写服务端的pipeline管道处理器,如果worker group中的NioEventLoop监听到了channel的read/write事件就会触发
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
/**
* 读取数据事件,可以读取客户端发送的消息
* ChannelHandlerContext上下文对象,含有管道pipeline,通道channel,地址
* Object 客户端发送的数据
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = "+ctx);
//将msg转成一个ByteBuf
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write+flush,将数据写入到缓存,并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
//处理异常,一般是要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup和WorkerGroup
//1. bossGroup只处理连接请求
//2. workerGroup处理业务请求
//两个中都有无限循环的NioEventLoop
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程配置
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
//给pipeline设置处理器
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("----------服务器 is ready-----------");
//sync同步阻塞等待绑定完成,返回一个ChannelFuture用于异步操作的通知回调
//启动服务器
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
//关闭循环线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//通道就绪事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8));
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端只需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端的启动对象
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());//加入处理器
}
});
System.out.println("-----------客户端 ok------------");
//启动客户端去连接服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
异步模型
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
```public interface ChannelFuture extends Future
调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果

异步任务的支持
TaskQueue
Handler中可以通过使用NioEventLoopGroup中的TaskQueue来实现异步任务,解决Handler中可能会出现的长时间阻塞问题。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
public void run() {
try {
Thread.sleep(10000);
System.out.println("handling...");
System.out.println("server ctx = "+ctx);
//将msg转成一个ByteBuf
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
ScheduleTaskQueue
可以使用ScheduleTaskQueue定义一个定时任务
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 在指定延迟后执行任务
ctx.channel().eventLoop().schedule(new Runnable() {
public void run() {
System.out.println("这是一个定时任务");
}
}, 5, TimeUnit.SECONDS);
System.out.println("server ctx = "+ctx);
//将msg转成一个ByteBuf
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/netty%e4%bd%bf%e7%94%a8%e5%85%a5%e9%97%a8%e6%a1%88%e4%be%8b/