基于NIO模型改进多人聊天室

第一版和第二版见基于BIO模型实现多人聊天室

NIO介绍见:

NIO、BIO模型对比实现文件的复制
NIO网络编程之Selector介绍

第三版:使用NIO模型改进

服务端实现

public class ChatServer {
    private static final int DEFAULT_PORT = 8888;
    private static final String QUIT = "quit";
    private static final int Buffer = 1024;

    private ServerSocketChannel server;
    private Selector selector;
    private ByteBuffer rBuffer = ByteBuffer.allocate(Buffer);
    private ByteBuffer wBuffer = ByteBuffer.allocate(Buffer);
    private Charset charset = Charset.forName("UTF-8");
    private int port;

    public ChatServer() {
    }

    public ChatServer(int port) {
        this.port = port;
    }

    public void start(){
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);//!!!
            server.socket().bind(new InetSocketAddress(port));

            selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("启动服务器,监听端口:"+port+"...");
            while (true) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                for(SelectionKey key:keys){
                    //处理被触发的事件
                    handles(key);
                }
                keys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(selector);
        }
    }

    private void handles(SelectionKey key) throws IOException {
        // ACCEPT事件 - 和客户端建立了连接
        if(key.isAcceptable()){
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            client.register(selector,SelectionKey.OP_READ);
            System.out.println(getClientName(client) +"已连接");
        }
        // READ事件 - 客户端发送了消息
        else if(key.isReadable()){
            SocketChannel client = (SocketChannel) key.channel();
            String fwdMsg = receive(client);
            if(fwdMsg.isEmpty()){
                //客户端异常
                key.cancel();
                selector.wakeup();
            }else{
                System.out.println(getClientName(client)+":"+fwdMsg);
                // 检查用户是否退出
                if(readyToQuit(fwdMsg)){
                    key.cancel();
                    selector.wakeup();
                    System.out.println(getClientName(client)+"已断开连接");
                }else{
                    forwardMessage(client,fwdMsg);
                }
            }
        }
    }

    private void forwardMessage(SocketChannel client, String fwdMsg) {
        selector.keys().forEach(key->{
            SelectableChannel connectedClient = key.channel();
            if(connectedClient instanceof ServerSocketChannel) return;
            if(key.isValid() && !client.equals(connectedClient)){
                wBuffer.clear();
                wBuffer.put(charset.encode(getClientName(client)+":"+fwdMsg));
                wBuffer.flip();
                while (wBuffer.hasRemaining()){
                    try {
                        ((SocketChannel)connectedClient).write(wBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    private String receive(SocketChannel client) throws IOException {
        rBuffer.clear();
        while(client.read(rBuffer)>0);
        rBuffer.flip();
        return String.valueOf(charset.decode(rBuffer));
    }

    private String getClientName(SocketChannel client){
        return "客户端["+client.socket().getPort()+"]";
    }

    private boolean readyToQuit(String msg){
        return QUIT.equals(msg);
    }
    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer(7777);
        chatServer.start();
    }
}

客户端实现

public class ChatClient {
    private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private static final int DEFAULT_SERVER_PORT = 8888;
    private static final String QUIT = "quit";
    private static final int BUFFER = 1024;

    private String host;
    private int port;
    private SocketChannel client;
    private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
    private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
    private Selector selector;
    private Charset charset = Charset.forName("UTF-8");

    public ChatClient() {
    }

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public boolean readyToQuit(String msg){
        return QUIT.equals(msg);
    }
    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void start(){
        try {
            client = SocketChannel.open();
            client.configureBlocking(false);

            selector = Selector.open();
            client.register(selector, SelectionKey.OP_CONNECT);
            client.connect(new InetSocketAddress(host,port));
            while (selector.isOpen()){
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                keys.forEach(key->{
                    handles(key);
                });
                keys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e){

        } finally {
            close(selector);
        }
    }

    private void handles(SelectionKey key) {
        // CONNECT连接就绪事件
        if (key.isConnectable()){
            SocketChannel channel = (SocketChannel) key.channel();
            if(client.isConnectionPending()){
                try {
                    client.finishConnect();
                    //处理用户输入
                    new Thread(new UserInputHandler(this)).start();
                    client.register(selector,SelectionKey.OP_READ);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // READ事件
        else if(key.isReadable()){
            SocketChannel client = (SocketChannel) key.channel();
            String msg = null;
            try {
                msg = recieve(client);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if(msg.isEmpty()){
                // 服务器异常
                close(selector);
            }else{
                System.out.println(msg);
            }
        }
    }

    public void send(String msg) throws IOException {
        if (msg.isEmpty()) return;
        wBuffer.clear();
        wBuffer.put(charset.encode(msg));
        wBuffer.flip();
        while (wBuffer.hasRemaining()){
            client.write(wBuffer);
        }
        if(readyToQuit(msg)){
            close(selector);
        }
    }

    private String recieve(SocketChannel client) throws IOException {
        rBuffer.clear();
        while(client.read(rBuffer)>0);
        rBuffer.flip();
        return String.valueOf(charset.decode(rBuffer));
    }

    public static void main(String[] args) {
        ChatClient client = new ChatClient("127.0.0.1",7777);
        client.start();
    }
}

处理用户输入的线程:

public class UserInputHandler implements Runnable{
    private ChatClient chatClient;

    public UserInputHandler(ChatClient chatClient){
        this.chatClient = chatClient;
    }

    @Override
    public void run() {
        try {
            // 等待用户输入消息
            BufferedReader consoleReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            while (true){
                String input = consoleReader.readLine();
                // 向服务器发送消息
                chatClient.send(input);
                // 检查用户是否准备退出
                if(chatClient.readyToQuit(input)){
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e5%9f%ba%e4%ba%8enio%e6%a8%a1%e5%9e%8b%e6%94%b9%e8%bf%9b%e5%a4%9a%e4%ba%ba%e8%81%8a%e5%a4%a9%e5%ae%a4/

(0)
彭晨涛彭晨涛管理者
上一篇 2020年2月7日
下一篇 2020年2月8日

相关推荐

  • AbstractCollection默认集合类

    AbstractCollection用于实现基本的Collection结构,提供给普通用户继承使用。也是JDK集合类的父类,部分方法是没有被重载的。 相比Collection接口并…

    Java 2019年11月18日
    0100
  • JDK8u20字符串去重

    优点:节省大量内存 缺点:略微多占用cpu时间,新生代回收时间略微增加 -XX:+UseStringDeduplication String s1 = new String("he…

    Java 2020年1月15日
    0710
  • Java字节码实例探究

    深入理解java虚拟机第三版读书笔记06中介绍了class文件结构,这里我们动手实践,编译一个类查看一下它的字节码。 java源码: public class Main { pri…

    2020年1月23日
    0220
  • Java基础查缺补漏02

    哈哈我其实没有想到这个系列真会有续集,上次写完01以后以为不会再写下去了,没想到最近牛客网刷题有些题目还是挺纠结的,这里补一补 构造器能带哪些修饰符 题目: Which of th…

    Java 2020年5月25日
    080
  • NIO零拷贝与其系统函数调用

    本文参考资源: Java NIO学习笔记四(零拷贝详解)Java拿笔小星的博客-CSDN博客 关于Buffer和Channel的注意事项和细节 ByteBuffer不止可以存取by…

    2020年3月14日
    0450
  • JDK8-Stream流库详解

    流提供了一种让我们可以在比集合更高的概念级别上指定计算的数据视图。通过使用流,我们可以说明想要完成什么任务,而不是说明如何去实现它。 流的创建 Collection.stream(…

    Java 2020年2月11日
    0140
  • 详解Java中的四种引用及其应用

    本文参考资源: 深入理解Java中的引用(一)——Reference - 简书 深入理解Java中的引用(二)——强软弱虚引用 - 简书 深入理解Java中的引用(三)——Dire…

    2020年2月14日
    0170
  • Java之UDP编程

    DatagramSocket概述 上次在Java网络套接字Socket编程那篇博客里只写了Socket和ServerSocket,即TCP通信,这次来补充一下UDP通信。 和Soc…

    Java 2020年3月11日
    0800
  • Java自动装箱缓存机制

    尝试运行这段代码: 相似的两段代码,得到的结果却完全不相同。 首先要知道在java中==比较的是对象的引用,从直觉出发,无论是integer1、integer2还是integer3…

    Java 2019年12月5日
    0130
  • 阻塞队列BlockingQueue详解

    阻塞队列是生产者消费者模式的经典体现。 我们在曾在Java线程池详解中自己实现过一个阻塞队列,这篇文章我们来研究一下JDK中的阻塞队列: BlockingQueue接口主要方法 抛…

    Java 2020年2月14日
    0570

发表回复

登录后才能评论