基于BIO模型实现多人聊天室

第一版

服务端实现:

public class ChatServer {
    private int DEFAULT_PORT = 8888;
    private final String QUIT = "quit";
    private ServerSocket serverSocket;
    private Map<Integer, Writer> connectedClients;

    public ChatServer() {
        connectedClients = new HashMap<>();
    }
    public synchronized void addClient(Socket socket) throws IOException {
        if(socket==null) return;
        int port = socket.getPort();
        BufferedWriter writer = new BufferedWriter(
                new OutputStreamWriter(socket.getOutputStream())
        );
        connectedClients.put(port,writer);
        System.out.println("客户端["+port+"]已连接到服务器");
    }
    public synchronized void removeClient(Socket socket) throws IOException {
        if(socket==null) return;
        int port = socket.getPort();
        if(connectedClients.containsKey(port)){
            connectedClients.get(port).close();
        }
        connectedClients.remove(port);
        System.out.println("客户端["+port+"]已断开连接");
    }

    public synchronized void forwardMessage(Socket socket,String fwdMsg){
        connectedClients.forEach((id,writer)->{
            if(!id.equals(socket.getPort())){
                try {
                    writer.write(fwdMsg);
                    writer.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public synchronized void close(){
        if(serverSocket!=null){
            try {
                serverSocket.close();
                System.out.println("关闭ServerSocket");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public boolean readyToQuit(String msg){
        return msg.equals(QUIT);
    }

    public void start(){
        try {
            // 绑定监听端口
            serverSocket = new ServerSocket(DEFAULT_PORT);
            System.out.println("启动服务器,监听端口:"+DEFAULT_PORT+"...");
            while(true){
                // 等待客户端连接
                Socket socket = serverSocket.accept();
                // 创建ChatHandler线程
                new Thread(new ChatHandler(this,socket)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatServer server = new ChatServer();
        server.start();
    }
}

服务端分配线程服务用户:

public class ChatHandler implements Runnable{
    private ChatServer server;
    private Socket socket;
    public ChatHandler(ChatServer server,Socket socket){
        this.server = server;
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            // 存储新上线用户
            server.addClient(socket);

            // 读取用户发送的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            String msg = null;
            while ((msg = reader.readLine())!=null){
                String fwdMsg = "客户端["+socket.getPort()+"]:"+msg;
                System.out.println(fwdMsg);
                if(server.readyToQuit(msg)) break;
                // 将消息转发给聊天室里在线的其他用户
                server.forwardMessage(socket,fwdMsg+"\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                server.removeClient(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

用户端:

public class ChatClient {
    private final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private final int DEFAULT_SERVER_PORT = 8888;
    private final String QUIT = "quit";

    private Socket socket;
    private BufferedReader reader;
    private BufferedWriter writer;

    // 发送消息给服务器
    public void send(String msg) throws IOException {
        if(!socket.isOutputShutdown()){
            writer.write(msg+"\n");
            writer.flush();
        }
    }

    // 从服务器接收消息
    public String recieve() throws IOException {
        String msg = null;
        if(!socket.isInputShutdown()){
            msg = reader.readLine();
        }
        return msg;
    }

    // 检查用户是否准备退出
    public boolean readyToQuit(String msg){
        return QUIT.equals(msg);
    }

    public void close(){
        if(writer!=null){
            try {
                System.out.println("关闭socket");
                writer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public void start(){
        try {
            // 创建socket
            socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
            // 创建IO流
            reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            // 处理用户的输入
            new Thread(new UserInputHandler(this)).start();
            // 读取服务器转发的消息
            String msg = null;
            while((msg = recieve())!=null){
                System.out.println(msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatClient client = new ChatClient();
        client.start();
    }
}

用户线程处理IO输入:

public class UserInputHandler implements Runnable{
    private ChatClient chatClient;

    public UserInputHandler(ChatClient chatClient){
        this.chatClient = chatClient;
    }

    @Override
    public void run() {
        try {
            // 等待用户输入消息
            BufferedReader consoleReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            while (true){
                String input = consoleReader.readLine();
                // 向服务器发送消息
                chatClient.send(input);
                // 检查用户是否准备退出
                if(chatClient.readyToQuit(input)){
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

第二版:使用线程池改进

服务端,创建线程使用线程池:

public class ChatServer {
    private int DEFAULT_PORT = 8888;
    private final String QUIT = "quit";
    private ExecutorService executorService;
    private ServerSocket serverSocket;
    private Map<Integer, Writer> connectedClients;

    public ChatServer() {
        executorService = Executors.newFixedThreadPool(10);
        connectedClients = new HashMap<>();
    }
    public synchronized void addClient(Socket socket) throws IOException {
        if(socket==null) return;
        int port = socket.getPort();
        BufferedWriter writer = new BufferedWriter(
                new OutputStreamWriter(socket.getOutputStream())
        );
        connectedClients.put(port,writer);
        System.out.println("客户端["+port+"]已连接到服务器");
    }
    public synchronized void removeClient(Socket socket) throws IOException {
        if(socket==null) return;
        int port = socket.getPort();
        if(connectedClients.containsKey(port)){
            connectedClients.get(port).close();
        }
        connectedClients.remove(port);
        System.out.println("客户端["+port+"]已断开连接");
    }

    public synchronized void forwardMessage(Socket socket,String fwdMsg){
        connectedClients.forEach((id,writer)->{
            if(!id.equals(socket.getPort())){
                try {
                    writer.write(fwdMsg);
                    writer.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public synchronized void close(){
        if(serverSocket!=null){
            try {
                serverSocket.close();
                System.out.println("关闭ServerSocket");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public boolean readyToQuit(String msg){
        return msg.equals(QUIT);
    }

    public void start(){
        try {
            // 绑定监听端口
            serverSocket = new ServerSocket(DEFAULT_PORT);
            System.out.println("启动服务器,监听端口:"+DEFAULT_PORT+"...");
            while(true){
                // 等待客户端连接
                Socket socket = serverSocket.accept();
                // 创建ChatHandler线程
                executorService.execute(new ChatHandler(this,socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatServer server = new ChatServer();
        server.start();
    }
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e5%9f%ba%e4%ba%8ebio%e6%a8%a1%e5%9e%8b%e5%ae%9e%e7%8e%b0%e5%a4%9a%e4%ba%ba%e8%81%8a%e5%a4%a9%e5%ae%a4/