赞
踩
本篇博文主要测试NIO服务端和客户端的通信,简单起见,不涉及数据库和其他模块,因此用户信息只采用集合保存,实际中,应该采用持久化存储。不同的客户端用简单的昵称进行区分就可以,因此,我们可以采用一个map保存,k为客户端ip,v为昵称。
客户端用两个线程,一个线程负责获取控制台消息输入与发送,另一个线程负责接受服务端消息与打印。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 10086)); socketChannel.configureBlocking(false); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("客户端已经启动"); //开启线程,接受消息 receiveMsgThread(); //主线程发送消息 Scanner scanner = new Scanner(System.in); byteBuffer = ByteBuffer.allocate(1024); while (true) { String input = scanner.nextLine(); if (hasNickname) {//有昵称后才能进行聊天 //单聊格式:msg@对方昵称 if (input.contains("@")) { input = input.replace("@", SEPECTOR); } //发送消息,服务端根据分隔符将消息分割后,若为2部分,则是群聊消息;若为3部分,则是私发消息 writeMesage(nickname + SEPECTOR + input, socketChannel); } else {//设置昵称 writeMesage(input, socketChannel); } }
服务端采用一个线程监听多个客户端的连接,进行消息的处理,实现群发或私发.问题是服务端如何通过单线程实现对多个客户端的监听处理呢?答案当然是selector
/** * 循环不断的监听客户端的链接 * @throws IOException */ public void listen() throws IOException { while (true) { //阻塞,直至有事件就绪 selector.select(); //获取SelectionKey集合,使用迭代器读取SelectionKey Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //读到SelectionKey SelectionKey sk = iterator.next(); //移除已经处理的SelectionKey iterator.remove(); //处理SelectionKey handleSelectionKey(sk); } //处理完毕,清空SelectionKey集合 selector.selectedKeys().clear(); } }
/** * 处理SelectionKey * @param sk SelectionKey */ private void handleSelectionKey(SelectionKey sk) throws IOException { ServerSocketChannel serverSocketChannel; SocketChannel socketChannel; if (sk.isAcceptable()) {//可接收 //从SelectionKey中获取 ServerSocketChannel通道 serverSocketChannel = (ServerSocketChannel) sk.channel(); //接受来自客户端的连接 socketChannel = serverSocketChannel.accept(); //设置新连接的通道为非阻塞 socketChannel.configureBlocking(false); //将该通道注册到selectror上,并设置对读事件感兴趣 socketChannel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) {//读就绪 //获取通道 socketChannel = (SocketChannel) sk.channel(); //接收消息 String msg = readMsg(socketChannel); //处理消息,若为1部分,则是昵称消息;若为2部分,则是群聊消息;若为3部分,则是私聊消息; String[] msgArray = msg.split(SEPECTOR); if (msgArray.length == 1 && !"".equals(msg)) { //昵称处理 } else if (msgArray.length == 2) { //群发处理 } else if (msgArray.length == 3) { //私聊群发处理 } } }
问题来了,**服务端如何知道客户端到底是想要私聊呢还是群聊呢?**办法有很多种,一种简单的办法是根据客户端发送消息的类型加以区分。我们可以事先约定一下格式:
消息格式 | flag标识 | 类型 |
---|---|---|
msg | false | 设置昵称 |
msg | true | 群聊 |
msg@him | true | 私聊him |
当客户端连接成功后,服务段给出提示,让用户输入昵称进行设置,成功后flag置为true,否则,一直让用户进行昵称设置,知道设置成功,之后便可以进行消息发送了,好比进行注册才能使用。
在进行消息发送之前,我们可以对用户输入的特殊字符串进行替换,便于服务端进行加工处理,防止因为接受到特殊字符而发生歧义。比如用户想要在群聊中发送的实际的’@‘字符,但我们却事先约定了@字符表示私聊,造成奇混乱。因此我们可以对一些字符替换排除。如把’@‘替换为’####’,‘01010101’,‘0xFFFF’,当然客户端也可能发送####等(事实上,用户可以发送我们约定的任何字符,我们要进行转义处理,这里就不多赘述了,这里只是简单测试替换为####)。
因此服务端接受的消息大致有以下三种类型,接受后进行字符串分割,根据长度数组长度便可轻易区分要进行何种处理。
接收形式 | 切割长度 | 处理类型 |
---|---|---|
msg | 1 | 昵称处理 |
me####msg | 2 | 群发处理 |
me####msg####him | 3 | 私发处理 |
大致思路如上,下面给出完整代码:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.StandardCharsets; import java.util.*; /** * @author XiaoXin * @date 2020/2/5 下午5:13 */ public class MyServer { private Selector selector;//选择器 private ByteBuffer buffer = ByteBuffer.allocate(1024);//缓冲区 private Map<String, String> clientMap = new HashMap<>();//记录客户端信息 private List<SocketChannel> socketChannelList = new ArrayList<>(); private final String TAG = "welcome"; private final String SEPECTOR = "####"; /** * 创建服务端通道 * @param port * @throws IOException */ public MyServer(int port) throws IOException { //创建通道 ServerSocketChannel sschannel = null; sschannel = ServerSocketChannel.open(); //绑定端口 sschannel.bind(new InetSocketAddress(port)); //设置非阻塞模式 sschannel.configureBlocking(false); //获取selector selector = Selector.open(); //注册通道到selectror sschannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server is running at" + sschannel.getLocalAddress()); } /** * 循环不断的监听客户端的链接 * @throws IOException */ public void listen() throws IOException { while (true) { //阻塞,直至有事件就绪 selector.select(); //获取SelectionKey集合,使用迭代器读取SelectionKey Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //读到SelectionKey SelectionKey sk = iterator.next(); //移除已经处理的SelectionKey iterator.remove(); //处理SelectionKey handleSelectionKey(sk); } //处理完毕,清空SelectionKey集合 selector.selectedKeys().clear(); } } /** * 处理SelectionKey * @param sk SelectionKey */ private void handleSelectionKey(SelectionKey sk) throws IOException { ServerSocketChannel serverSocketChannel; SocketChannel socketChannel; if (sk.isAcceptable()) {//可接收 //从SelectionKey中获取 ServerSocketChannel通道 serverSocketChannel = (ServerSocketChannel) sk.channel(); //接受来自客户端的连接 socketChannel = serverSocketChannel.accept(); //设置新连接的通道为非阻塞 socketChannel.configureBlocking(false); //将该通道注册到selectror上,并设置对读事件感兴趣 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("[" + socketChannel.getRemoteAddress() + "] is already connected to the server!"); //告诉客户端输入昵称 writeMsg("请输入昵称,进行身份确认后聊天:", socketChannel); } else if (sk.isReadable()) {//读就绪 //获取通道 socketChannel = (SocketChannel) sk.channel(); //接收消息 String msg = readMsg(socketChannel); //处理消息,若为1部分,则是昵称消息;若为2部分,则是群聊消息;若为3部分,则是私聊消息; String[] msgArray = msg.split(SEPECTOR); //防止msg为空 if (msg == null || "".equals(msg)) { System.out.println(clientMap); String addr = socketChannel.getRemoteAddress().toString(); String nickname = clientMap.get(addr); System.out.println(nickname + "断开了连接"); clientMap.remove(addr); socketChannelList.remove(socketChannel); socketChannel.close();//关闭通道 broadcastMsg(nickname + "已经退出了群聊" + ",当前在线人数:" + clientMap.size()); return; } if (msgArray.length == 1 && !"".equals(msg)) { //昵称可能重复 if (clientMap.containsValue(msgArray[0])) { writeMsg("该昵称已经存在,请重新输入:", socketChannel); } else { clientMap.put(socketChannel.getRemoteAddress().toString(), msgArray[0]); socketChannelList.add(socketChannel); writeMsg(TAG + "," + msgArray[0], socketChannel); broadcastMsg2(msgArray[0] + "加入了群聊", socketChannel); } } else if (msgArray.length == 2) { System.out.println(msgArray[0] + " said to all:" + msgArray[1]); //广播消息 broadcastMsg(msgArray[0] + ":" + msgArray[1]); } else if (msgArray.length == 3) { System.out.println("["+msgArray[0]+"私聊"+msgArray[2]+":"+msgArray[1]+"]"); p2pChat(msgArray[0] + ":" + msgArray[1], msgArray[2], socketChannel); } } } /** * 接收消息 * @param socketChannel * @throws IOException * @return 消息 */ private String readMsg(SocketChannel socketChannel) throws IOException { //初始化缓冲区 buffer.clear(); int len = 0; StringBuilder stringBuilder = new StringBuilder(); while ((len = socketChannel.read(buffer)) > 0) { buffer.flip(); stringBuilder.append(new String(buffer.array(), 0, len, StandardCharsets.UTF_8)); } return stringBuilder.toString(); } /** * 发送消息 * @param str * @param socketChannel * @throws IOException */ private void writeMsg(String str, SocketChannel socketChannel) throws IOException { buffer.clear(); buffer.put(str.getBytes(StandardCharsets.UTF_8)); buffer.flip(); socketChannel.write(buffer); } /** * 群发消息,方法一 * @param msg * @throws IOException */ private void broadcastMsg(String msg) throws IOException { for (SelectionKey key : selector.keys()) { Channel target = key.channel(); if (target.isOpen() && target instanceof SocketChannel) { writeMsg(msg, (SocketChannel) target); } } } /** * 群发消息,方法二 * @param msg * @param socketChannel * @throws IOException */ private void broadcastMsg2(String msg, SocketChannel socketChannel) throws IOException { for (SocketChannel channel : socketChannelList) { if (channel.isOpen() && !channel.equals(socketChannel)) { buffer.clear(); buffer.put(msg.getBytes()); buffer.flip(); channel.write(buffer); } } } /** * 私发消息 * @param msg * @param hisname * @param sourcechannel --源通道 * @throws IOException */ private void p2pChat(String msg, String hisname, SocketChannel sourcechannel) throws IOException { boolean flag = false;//记录是否发成功发送给指定用户 for (SelectionKey sk : selector.keys()) { Channel target = sk.channel(); if (target.isOpen() && target instanceof SocketChannel) { SocketChannel socketChannel = (SocketChannel) target; String temname = clientMap.get(socketChannel.getRemoteAddress().toString()); if (temname.equals(hisname)) { writeMsg(msg, socketChannel); flag = true; break; } } } if (!flag) { writeMsg("该用户不存在", sourcechannel); }else { writeMsg(msg+"---------【私聊发送给:"+hisname+" 状态:成功】", sourcechannel); } } /** * 启动聊天服务器 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { MyServer myServer = new MyServer(10086); myServer.listen(); } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Scanner; /** * @author XiaoXin * @date 2020/2/5 */ public class MyClient { private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//缓冲区 private Selector selector = null;//选择器 private boolean hasNickname = false; private String nickname = "user"; //输入昵称的返回字符串标记,如:welocme,张三 private final String TAG = "welcome"; private final String SEPECTOR = "####"; /** * 启动通道和selector * @throws IOException */ public void start() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 10086)); socketChannel.configureBlocking(false); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("客户端已经启动"); //开启线程,接受消息 receiveMsgThread(); //主线程发送消息 Scanner scanner = new Scanner(System.in); byteBuffer = ByteBuffer.allocate(1024); while (true) { String input = scanner.nextLine(); if (hasNickname) {//有昵称后才能进行聊天 //单聊格式:msg@对方昵称 if (input.contains("@")) { input = input.replace("@", SEPECTOR); } //发送消息,服务端根据分隔符将消息分割后,若为2部分,则是群聊消息;若为3部分,则是私发消息 writeMesage(nickname + SEPECTOR + input, socketChannel); } else {//设置昵称 writeMesage(input, socketChannel); } } } /** * 接受消息 * @param socketChannel * @return * @throws IOException */ private String readMessage(SocketChannel socketChannel) throws IOException { byteBuffer.clear(); int len = 0; StringBuilder builder = new StringBuilder(); while ((len = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); builder.append(new String(byteBuffer.array(), 0, len,"UTF-8")); } return builder.toString(); } /** * 发送消息 * @param str * @param socketChannel * @throws IOException */ private void writeMesage(String str, SocketChannel socketChannel) throws IOException { byteBuffer.clear(); byteBuffer.put(str.getBytes(StandardCharsets.UTF_8)); byteBuffer.flip(); socketChannel.write(byteBuffer); } /** * 接收消息的线程 */ private void receiveMsgThread() { new Thread(() -> { SocketChannel socketChannel = null; while (true) { try { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isReadable()) { socketChannel = (SocketChannel) selectionKey.channel(); String msgReciv = readMessage(socketChannel); System.out.println(msgReciv); //连接成功时,服务端要求客户端输入昵称,若返回约定的标志,说明昵称设置成功 if (msgReciv.contains(TAG)) { hasNickname = true; nickname = msgReciv.substring(TAG.length() + 1);//标记逗号后的字符串为昵称 } } } selector.selectedKeys().clear(); } catch (IOException e) { if (socketChannel != null) { try { socketChannel.close(); } catch (IOException ex) { ex.printStackTrace(); } } } } }).start(); } /** * 启动聊天客户端 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { new MyClient().start(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。