Netty使用入门案例

maven依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.42.Final</version>
</dependency>

服务端

//写服务端的pipeline管道处理器,如果worker group中的NioEventLoop监听到了channel的read/write事件就会触发
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    /**
     * 读取数据事件,可以读取客户端发送的消息
     * ChannelHandlerContext上下文对象,含有管道pipeline,通道channel,地址
     * Object 客户端发送的数据
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx = "+ctx);
        //将msg转成一个ByteBuf
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());

    }
    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //write+flush,将数据写入到缓存,并刷新
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
    }
    //处理异常,一般是要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //创建BossGroup和WorkerGroup
        //1. bossGroup只处理连接请求
        //2. workerGroup处理业务请求
        //两个中都有无限循环的NioEventLoop
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程配置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
                        //给pipeline设置处理器
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });//给workerGroup的EventLoop对应的管道设置处理器

            System.out.println("----------服务器 is ready-----------");
            //sync同步阻塞等待绑定完成,返回一个ChannelFuture用于异步操作的通知回调
            //启动服务器
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            //关闭循环线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

客户端

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    //通道就绪事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8));
    }
    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端只需要一个事件循环组
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建客户端的启动对象
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());//加入处理器
                        }
                    });
            System.out.println("-----------客户端 ok------------");
            //启动客户端去连接服务器
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventExecutors.shutdownGracefully();
        }

    }
}

异步模型

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。

```public interface ChannelFuture extends Future ```

调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果

Netty使用入门案例

异步任务的支持

TaskQueue

Handler中可以通过使用NioEventLoopGroup中的TaskQueue来实现异步任务,解决Handler中可能会出现的长时间阻塞问题。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().eventLoop().execute(new Runnable() {
        public void run() {
            try {
                Thread.sleep(10000);
                System.out.println("handling...");
                System.out.println("server ctx = "+ctx);
                //将msg转成一个ByteBuf
                ByteBuf buf = (ByteBuf) msg;
                System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
                System.out.println("客户端地址:"+ctx.channel().remoteAddress());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

ScheduleTaskQueue

可以使用ScheduleTaskQueue定义一个定时任务

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

    // 在指定延迟后执行任务
    ctx.channel().eventLoop().schedule(new Runnable() {
        public void run() {
            System.out.println("这是一个定时任务");
        }
    }, 5, TimeUnit.SECONDS);

    System.out.println("server ctx = "+ctx);
    //将msg转成一个ByteBuf
    ByteBuf buf = (ByteBuf) msg;
    System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
    System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/netty%e4%bd%bf%e7%94%a8%e5%85%a5%e9%97%a8%e6%a1%88%e4%be%8b/

发表评论

电子邮件地址不会被公开。