网络编程之AIO

AIO介绍

AIO是JDK1.7引入的,也称为NIO 2.0

NIO的同步非阻塞类似于轮询,让Selector去询问各个通道的状态。

而AIO则是通过Future实现真正的异步,让IO就绪时由系统来通知应用程序。

核心类:

  • AsyncServerSocketChannel
  • AsyncSocketChannel

当进行如下操作时,都是异步的操作
+ connect/accept
+ write
+ read

可以通过两种方案处理结果:
+ 通过返回的Future对象获取结果
+ 通过CompletionHandler接口实现回调

对于CompletionHandler接口,里面有completedfailed方法,分别代表成功后的回调和失败后的回调。

代码示例

实现一个客户端给服务端发送数据,服务端按原数据返回的程序。

服务端:

public class Server {
    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    AsynchronousServerSocketChannel serverSocketChannel;

    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try {
            // 绑定监听端口
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,DEFAULT_PORT));
            System.out.println("启动服务器,监听端口:"+DEFAULT_PORT);

            while (true) {
                serverSocketChannel.accept(null,new AcceptHandler());
                System.in.read();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(serverSocketChannel);
        }
    }
    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
        @Override
        public void completed(AsynchronousSocketChannel result, Object attachment) {
            if(serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null,this);
            }

            AsynchronousSocketChannel clientChannel = result;
            if(clientChannel!=null&&clientChannel.isOpen()){
                ClientHandler handler = new ClientHandler(clientChannel);
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                Map<String,Object> info = new HashMap<>();
                info.put("type","read");
                info.put("buffer",buffer);
                clientChannel.read(buffer,info,handler);

            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            // 处理错误
            exc.printStackTrace();
            if(serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null,this);
            }
        }
    }
    private class ClientHandler implements CompletionHandler<Integer,Object>{
        private AsynchronousSocketChannel clientChannel;
        public ClientHandler(AsynchronousSocketChannel channel){
            this.clientChannel = channel;
        }
        @Override
        public void completed(Integer result, Object attachment) {
            Map<String,Object> info = (Map<String, Object>) attachment;
            String type = (String) info.get("type");
            if(type.equals("read")){
                ByteBuffer buffer = (ByteBuffer) info.get("buffer");
                buffer.flip();
                info.put("type","write");
                clientChannel.write(buffer,info,this);
                buffer.clear();
            }else if(type.equals("write")){
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                info.put("type","read");
                info.put("buffer",buffer);
                clientChannel.read(buffer,info,this);
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            //处理错误
        }
    }

    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

客户端:

public class Client {
    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    AsynchronousSocketChannel clientSocketChannel;

    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        // 创建channel
        try {
            clientSocketChannel = AsynchronousSocketChannel.open();
            Future<Void> future = clientSocketChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
            future.get();
            // 等待用户的输入
            BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
            while(true){
                String input = consoleReader.readLine();
                byte[] inputBytes = input.getBytes();
                ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
                Future<Integer> writeResult = clientSocketChannel.write(buffer);
                writeResult.get();
                buffer.flip();
                Future<Integer> readResult = clientSocketChannel.read(buffer);
                readResult.get();
                String echo = new String(buffer.array());
                buffer.clear();
                System.out.println(echo);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            close(clientSocketChannel);
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        client.start();
    }
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e7%bd%91%e7%bb%9c%e7%bc%96%e7%a8%8b%e4%b9%8baio/

(0)
彭晨涛彭晨涛管理者
上一篇 2020年2月8日 21:47
下一篇 2020年2月9日

相关推荐

  • HashMap源码分析

    HashMap是java中非常常见的一个数据结构,在这篇文章里,我依然以Map中的操作为导向来循序渐进研究HashMap中的源码,阅读这篇文章需要的前置知识有: 弱平衡的二叉查找树…

    Java 2020年2月12日
    0240
  • 阻塞队列BlockingQueue详解

    阻塞队列是生产者消费者模式的经典体现。 我们在曾在Java线程池详解中自己实现过一个阻塞队列,这篇文章我们来研究一下JDK中的阻塞队列: BlockingQueue接口主要方法 抛…

    Java 2020年2月14日
    0600
  • JUC包下的线程协作计数CountDownLatch及CyclicBarrier

    CountDownLatch 概述 用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来…

    Java 2020年2月6日
    02070
  • Servlet4.0初识总结

    JavaEE8 JavaEE8,是自2013年6月Java企业版的首次更新。JAVAEE8提供了一些新的API,提供了对HTTP/2的新支持。 Servlet4.0 Servlet…

    Java 2019年11月28日
    02180
  • Java多线程共享模型之管程

    Monitor(锁) Monitor被翻译为监视器或管程。 Monitor是重量级锁的实现,是向操作系统申请的。 Monitor的结构:WaitSet、EntryList、Owne…

    2020年2月2日
    0930
  • PriorityQueue源码分析

    总结 总结放前面防止太长不看: PriorityQueue是个最小堆,如果要改变排序顺序只能重写比较器传入构造方法。 内部元素要么实现Comparable接口,要么传入比较器进行比…

    Java 2020年2月10日
    0270
  • 为什么说Java只有值传递?

    先说一下。。以后可能不会怎么写Java相关的博客了,因为找到了字节跳动的实习工作,用Go/Python开发后端,所以这几天在抓紧时间学Go,在学Go的时候,了解到Go语言只有值传递…

    Java 2020年6月26日
    03170
  • 深入理解java虚拟机第三版读书笔记09

    续深入理解java虚拟机第三版读书笔记08 类加载器 通过一个类的全限定名来获取描述该类的二进制字节流称为类加载器。类加载器可以用户自定义,是java语言流行的一项原因 类与类加载…

    2020年1月23日
    0130
  • Java基础查缺补漏02

    哈哈我其实没有想到这个系列真会有续集,上次写完01以后以为不会再写下去了,没想到最近牛客网刷题有些题目还是挺纠结的,这里补一补 构造器能带哪些修饰符 题目: Which of th…

    Java 2020年5月25日
    080
  • 快速失败(fail-fast)和安全失败(fail-safe)

    快速失败 在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改(增加、删除、修改),则会抛出 Concurrent Modification Exception。…

    Java 2020年2月23日
    0150

发表回复

登录后才能评论