第一版和第二版见基于BIO模型实现多人聊天室
NIO介绍见:
NIO、BIO模型对比实现文件的复制
NIO网络编程之Selector介绍
第三版:使用NIO模型改进
服务端实现
public class ChatServer {
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int Buffer = 1024;
private ServerSocketChannel server;
private Selector selector;
private ByteBuffer rBuffer = ByteBuffer.allocate(Buffer);
private ByteBuffer wBuffer = ByteBuffer.allocate(Buffer);
private Charset charset = Charset.forName("UTF-8");
private int port;
public ChatServer() {
}
public ChatServer(int port) {
this.port = port;
}
public void start(){
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);//!!!
server.socket().bind(new InetSocketAddress(port));
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("启动服务器,监听端口:"+port+"...");
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
for(SelectionKey key:keys){
//处理被触发的事件
handles(key);
}
keys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(selector);
}
}
private void handles(SelectionKey key) throws IOException {
// ACCEPT事件 - 和客户端建立了连接
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_READ);
System.out.println(getClientName(client) +"已连接");
}
// READ事件 - 客户端发送了消息
else if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
String fwdMsg = receive(client);
if(fwdMsg.isEmpty()){
//客户端异常
key.cancel();
selector.wakeup();
}else{
System.out.println(getClientName(client)+":"+fwdMsg);
// 检查用户是否退出
if(readyToQuit(fwdMsg)){
key.cancel();
selector.wakeup();
System.out.println(getClientName(client)+"已断开连接");
}else{
forwardMessage(client,fwdMsg);
}
}
}
}
private void forwardMessage(SocketChannel client, String fwdMsg) {
selector.keys().forEach(key->{
SelectableChannel connectedClient = key.channel();
if(connectedClient instanceof ServerSocketChannel) return;
if(key.isValid() && !client.equals(connectedClient)){
wBuffer.clear();
wBuffer.put(charset.encode(getClientName(client)+":"+fwdMsg));
wBuffer.flip();
while (wBuffer.hasRemaining()){
try {
((SocketChannel)connectedClient).write(wBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
private String receive(SocketChannel client) throws IOException {
rBuffer.clear();
while(client.read(rBuffer)>0);
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
private String getClientName(SocketChannel client){
return "客户端["+client.socket().getPort()+"]";
}
private boolean readyToQuit(String msg){
return QUIT.equals(msg);
}
private void close(Closeable... closeable){
for(Closeable c:closeable){
try {
c.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer(7777);
chatServer.start();
}
}
客户端实现
public class ChatClient {
private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
private static final int DEFAULT_SERVER_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private String host;
private int port;
private SocketChannel client;
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
private Selector selector;
private Charset charset = Charset.forName("UTF-8");
public ChatClient() {
}
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public boolean readyToQuit(String msg){
return QUIT.equals(msg);
}
private void close(Closeable... closeable){
for(Closeable c:closeable){
try {
c.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void start(){
try {
client = SocketChannel.open();
client.configureBlocking(false);
selector = Selector.open();
client.register(selector, SelectionKey.OP_CONNECT);
client.connect(new InetSocketAddress(host,port));
while (selector.isOpen()){
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
keys.forEach(key->{
handles(key);
});
keys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException e){
} finally {
close(selector);
}
}
private void handles(SelectionKey key) {
// CONNECT连接就绪事件
if (key.isConnectable()){
SocketChannel channel = (SocketChannel) key.channel();
if(client.isConnectionPending()){
try {
client.finishConnect();
//处理用户输入
new Thread(new UserInputHandler(this)).start();
client.register(selector,SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}
// READ事件
else if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
String msg = null;
try {
msg = recieve(client);
} catch (IOException e) {
e.printStackTrace();
}
if(msg.isEmpty()){
// 服务器异常
close(selector);
}else{
System.out.println(msg);
}
}
}
public void send(String msg) throws IOException {
if (msg.isEmpty()) return;
wBuffer.clear();
wBuffer.put(charset.encode(msg));
wBuffer.flip();
while (wBuffer.hasRemaining()){
client.write(wBuffer);
}
if(readyToQuit(msg)){
close(selector);
}
}
private String recieve(SocketChannel client) throws IOException {
rBuffer.clear();
while(client.read(rBuffer)>0);
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
public static void main(String[] args) {
ChatClient client = new ChatClient("127.0.0.1",7777);
client.start();
}
}
处理用户输入的线程:
public class UserInputHandler implements Runnable{
private ChatClient chatClient;
public UserInputHandler(ChatClient chatClient){
this.chatClient = chatClient;
}
@Override
public void run() {
try {
// 等待用户输入消息
BufferedReader consoleReader = new BufferedReader(
new InputStreamReader(System.in)
);
while (true){
String input = consoleReader.readLine();
// 向服务器发送消息
chatClient.send(input);
// 检查用户是否准备退出
if(chatClient.readyToQuit(input)){
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e5%9f%ba%e4%ba%8enio%e6%a8%a1%e5%9e%8b%e6%94%b9%e8%bf%9b%e5%a4%9a%e4%ba%ba%e8%81%8a%e5%a4%a9%e5%ae%a4/