网络编程之AIO

AIO介绍

AIO是JDK1.7引入的,也称为NIO 2.0

NIO的同步非阻塞类似于轮询,让Selector去询问各个通道的状态。

而AIO则是通过Future实现真正的异步,让IO就绪时由系统来通知应用程序。

核心类:

  • AsyncServerSocketChannel
  • AsyncSocketChannel

当进行如下操作时,都是异步的操作
+ connect/accept
+ write
+ read

可以通过两种方案处理结果:
+ 通过返回的Future对象获取结果
+ 通过CompletionHandler接口实现回调

对于CompletionHandler接口,里面有completedfailed方法,分别代表成功后的回调和失败后的回调。

代码示例

实现一个客户端给服务端发送数据,服务端按原数据返回的程序。

服务端:

public class Server {
    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    AsynchronousServerSocketChannel serverSocketChannel;

    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try {
            // 绑定监听端口
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,DEFAULT_PORT));
            System.out.println("启动服务器,监听端口:"+DEFAULT_PORT);

            while (true) {
                serverSocketChannel.accept(null,new AcceptHandler());
                System.in.read();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(serverSocketChannel);
        }
    }
    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
        @Override
        public void completed(AsynchronousSocketChannel result, Object attachment) {
            if(serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null,this);
            }

            AsynchronousSocketChannel clientChannel = result;
            if(clientChannel!=null&&clientChannel.isOpen()){
                ClientHandler handler = new ClientHandler(clientChannel);
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                Map<String,Object> info = new HashMap<>();
                info.put("type","read");
                info.put("buffer",buffer);
                clientChannel.read(buffer,info,handler);

            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            // 处理错误
            exc.printStackTrace();
            if(serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null,this);
            }
        }
    }
    private class ClientHandler implements CompletionHandler<Integer,Object>{
        private AsynchronousSocketChannel clientChannel;
        public ClientHandler(AsynchronousSocketChannel channel){
            this.clientChannel = channel;
        }
        @Override
        public void completed(Integer result, Object attachment) {
            Map<String,Object> info = (Map<String, Object>) attachment;
            String type = (String) info.get("type");
            if(type.equals("read")){
                ByteBuffer buffer = (ByteBuffer) info.get("buffer");
                buffer.flip();
                info.put("type","write");
                clientChannel.write(buffer,info,this);
                buffer.clear();
            }else if(type.equals("write")){
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                info.put("type","read");
                info.put("buffer",buffer);
                clientChannel.read(buffer,info,this);
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            //处理错误
        }
    }

    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

客户端:

public class Client {
    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    AsynchronousSocketChannel clientSocketChannel;

    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        // 创建channel
        try {
            clientSocketChannel = AsynchronousSocketChannel.open();
            Future<Void> future = clientSocketChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
            future.get();
            // 等待用户的输入
            BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
            while(true){
                String input = consoleReader.readLine();
                byte[] inputBytes = input.getBytes();
                ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
                Future<Integer> writeResult = clientSocketChannel.write(buffer);
                writeResult.get();
                buffer.flip();
                Future<Integer> readResult = clientSocketChannel.read(buffer);
                readResult.get();
                String echo = new String(buffer.array());
                buffer.clear();
                System.out.println(echo);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            close(clientSocketChannel);
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        client.start();
    }
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e7%bd%91%e7%bb%9c%e7%bc%96%e7%a8%8b%e4%b9%8baio/