基于NIO模型改进多人聊天室

第一版和第二版见基于BIO模型实现多人聊天室

NIO介绍见:

NIO、BIO模型对比实现文件的复制
NIO网络编程之Selector介绍

第三版:使用NIO模型改进

服务端实现

public class ChatServer {
    private static final int DEFAULT_PORT = 8888;
    private static final String QUIT = "quit";
    private static final int Buffer = 1024;

    private ServerSocketChannel server;
    private Selector selector;
    private ByteBuffer rBuffer = ByteBuffer.allocate(Buffer);
    private ByteBuffer wBuffer = ByteBuffer.allocate(Buffer);
    private Charset charset = Charset.forName("UTF-8");
    private int port;

    public ChatServer() {
    }

    public ChatServer(int port) {
        this.port = port;
    }

    public void start(){
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);//!!!
            server.socket().bind(new InetSocketAddress(port));

            selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("启动服务器,监听端口:"+port+"...");
            while (true) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                for(SelectionKey key:keys){
                    //处理被触发的事件
                    handles(key);
                }
                keys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(selector);
        }
    }

    private void handles(SelectionKey key) throws IOException {
        // ACCEPT事件 - 和客户端建立了连接
        if(key.isAcceptable()){
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            client.register(selector,SelectionKey.OP_READ);
            System.out.println(getClientName(client) +"已连接");
        }
        // READ事件 - 客户端发送了消息
        else if(key.isReadable()){
            SocketChannel client = (SocketChannel) key.channel();
            String fwdMsg = receive(client);
            if(fwdMsg.isEmpty()){
                //客户端异常
                key.cancel();
                selector.wakeup();
            }else{
                System.out.println(getClientName(client)+":"+fwdMsg);
                // 检查用户是否退出
                if(readyToQuit(fwdMsg)){
                    key.cancel();
                    selector.wakeup();
                    System.out.println(getClientName(client)+"已断开连接");
                }else{
                    forwardMessage(client,fwdMsg);
                }
            }
        }
    }

    private void forwardMessage(SocketChannel client, String fwdMsg) {
        selector.keys().forEach(key->{
            SelectableChannel connectedClient = key.channel();
            if(connectedClient instanceof ServerSocketChannel) return;
            if(key.isValid() && !client.equals(connectedClient)){
                wBuffer.clear();
                wBuffer.put(charset.encode(getClientName(client)+":"+fwdMsg));
                wBuffer.flip();
                while (wBuffer.hasRemaining()){
                    try {
                        ((SocketChannel)connectedClient).write(wBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    private String receive(SocketChannel client) throws IOException {
        rBuffer.clear();
        while(client.read(rBuffer)>0);
        rBuffer.flip();
        return String.valueOf(charset.decode(rBuffer));
    }

    private String getClientName(SocketChannel client){
        return "客户端["+client.socket().getPort()+"]";
    }

    private boolean readyToQuit(String msg){
        return QUIT.equals(msg);
    }
    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer(7777);
        chatServer.start();
    }
}

客户端实现

public class ChatClient {
    private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private static final int DEFAULT_SERVER_PORT = 8888;
    private static final String QUIT = "quit";
    private static final int BUFFER = 1024;

    private String host;
    private int port;
    private SocketChannel client;
    private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
    private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
    private Selector selector;
    private Charset charset = Charset.forName("UTF-8");

    public ChatClient() {
    }

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public boolean readyToQuit(String msg){
        return QUIT.equals(msg);
    }
    private void close(Closeable... closeable){
        for(Closeable c:closeable){
            try {
                c.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void start(){
        try {
            client = SocketChannel.open();
            client.configureBlocking(false);

            selector = Selector.open();
            client.register(selector, SelectionKey.OP_CONNECT);
            client.connect(new InetSocketAddress(host,port));
            while (selector.isOpen()){
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                keys.forEach(key->{
                    handles(key);
                });
                keys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e){

        } finally {
            close(selector);
        }
    }

    private void handles(SelectionKey key) {
        // CONNECT连接就绪事件
        if (key.isConnectable()){
            SocketChannel channel = (SocketChannel) key.channel();
            if(client.isConnectionPending()){
                try {
                    client.finishConnect();
                    //处理用户输入
                    new Thread(new UserInputHandler(this)).start();
                    client.register(selector,SelectionKey.OP_READ);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // READ事件
        else if(key.isReadable()){
            SocketChannel client = (SocketChannel) key.channel();
            String msg = null;
            try {
                msg = recieve(client);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if(msg.isEmpty()){
                // 服务器异常
                close(selector);
            }else{
                System.out.println(msg);
            }
        }
    }

    public void send(String msg) throws IOException {
        if (msg.isEmpty()) return;
        wBuffer.clear();
        wBuffer.put(charset.encode(msg));
        wBuffer.flip();
        while (wBuffer.hasRemaining()){
            client.write(wBuffer);
        }
        if(readyToQuit(msg)){
            close(selector);
        }
    }

    private String recieve(SocketChannel client) throws IOException {
        rBuffer.clear();
        while(client.read(rBuffer)>0);
        rBuffer.flip();
        return String.valueOf(charset.decode(rBuffer));
    }

    public static void main(String[] args) {
        ChatClient client = new ChatClient("127.0.0.1",7777);
        client.start();
    }
}

处理用户输入的线程:

public class UserInputHandler implements Runnable{
    private ChatClient chatClient;

    public UserInputHandler(ChatClient chatClient){
        this.chatClient = chatClient;
    }

    @Override
    public void run() {
        try {
            // 等待用户输入消息
            BufferedReader consoleReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            while (true){
                String input = consoleReader.readLine();
                // 向服务器发送消息
                chatClient.send(input);
                // 检查用户是否准备退出
                if(chatClient.readyToQuit(input)){
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e5%9f%ba%e4%ba%8enio%e6%a8%a1%e5%9e%8b%e6%94%b9%e8%bf%9b%e5%a4%9a%e4%ba%ba%e8%81%8a%e5%a4%a9%e5%ae%a4/