当前位置:   article > 正文

用Java原生Socket(BIO)做一个聊天室 实现单聊和群聊(附完整源码)_socketio单聊 java

socketio单聊 java

目录

 

思路

源码分析

目录结构

消息体

服务端

客户端

分析

源码地址


思路

要实现单聊和群聊需要考虑哪些方面?

1、需要有客户端和服务端,客户端给用户提供发送消息、接收消息的途径。服务端用于接受客户端的连接,并进行消息接收、消息处理和消息转发。

2、需要统一消息的格式,我这里仅仅是定义一个Message对象,通过type字段区分消息的类型。

如果做的精细点,可以考虑自己设计一个基于tcp的通讯协议,具体思路可以看我上一篇博客的方案二

3、单聊:服务器把收到的消息转发给目标用户。群聊:服务器收到消息后,转发给指定小组的所有人。

源码地址在最下方,需要的自取。

源码分析

目录结构

效果

单聊

群聊

消息体

Message对象

  1. /**
  2. * 消息
  3. * @author dayrain
  4. */
  5. @Data
  6. public class Message implements Serializable {
  7. private static final long serialVersionUID = 7753923870637862141L;
  8. /**
  9. * 发送人
  10. */
  11. private String sendFrom;
  12. /**
  13. * 接收人
  14. */
  15. private String sendTo;
  16. /**
  17. * 消息内容
  18. */
  19. private String content;
  20. /**
  21. * 消息类型
  22. */
  23. private Integer type;
  24. /**
  25. * 发送时间
  26. */
  27. private String sendTime;
  28. /**
  29. * 创建登录消息
  30. * @param sendFrom 登录人
  31. * @return 消息体
  32. */
  33. public static Message createLoginMessage(String sendFrom) {
  34. Message message = new Message();
  35. message.setSendFrom(sendFrom);
  36. message.setType(Constants.MessageType.LOGIN);
  37. message.setSendTime(TimeUtils.getNowTime());
  38. return message;
  39. }
  40. /**
  41. * 单聊时创建的消息
  42. * @param userId 发送人
  43. * @param sendTo 接收人
  44. * @param msg 消息
  45. * @return 消息体
  46. */
  47. public static Message createPrivateMessage(String userId, String sendTo, String msg) {
  48. Message message = new Message();
  49. message.setSendFrom(userId);
  50. message.setSendTo(sendTo);
  51. message.setContent(msg);
  52. message.setType(Constants.MessageType.PRIVATE_MESSAGE);
  53. message.setSendTime(TimeUtils.getNowTime());
  54. return message;
  55. }
  56. /**
  57. * 创建退出消息
  58. * @param userId 需要退出的用户名
  59. * @return 消息体
  60. */
  61. public static Message createExitMessage(String userId) {
  62. Message message = new Message();
  63. message.setType(Constants.MessageType.EXIT);
  64. message.setSendFrom(userId);
  65. message.setSendTime(TimeUtils.getNowTime());
  66. return message;
  67. }
  68. /**
  69. * 创建群组消息
  70. * @param userId 发送人
  71. * @param groupId 组号
  72. * @param content 内容
  73. * @return 消息体
  74. */
  75. public static Message createGroupMessage(String userId, String groupId, String content) {
  76. Message message = new Message();
  77. message.setType(Constants.MessageType.GROUP_MESSAGE);
  78. message.setSendFrom(userId);
  79. message.setSendTo(groupId);
  80. message.setContent(content);
  81. message.setSendTime(TimeUtils.getNowTime());
  82. return message;
  83. }
  84. }

消息具体的类型大概有下面的几种

  1. public interface Constants {
  2. /**
  3. * 消息类型
  4. */
  5. interface MessageType{
  6. /**
  7. * 单聊
  8. */
  9. Integer PRIVATE_MESSAGE = 1;
  10. /**
  11. * 群聊
  12. */
  13. Integer GROUP_MESSAGE = 2;
  14. /**
  15. * 系统消息,系统推送时用到
  16. */
  17. Integer SYSTEM_MESSAGE = 3;
  18. /**
  19. * 用户登录
  20. */
  21. Integer LOGIN = 4;
  22. /**
  23. * 用户退出
  24. */
  25. Integer EXIT = 5;
  26. }
  27. /**
  28. * 用户循环输入消息时的结束标志
  29. */
  30. String EXIT_FLAG = "exit";
  31. }

Group对象,群发的时候用到

  1. /**
  2. * 群组
  3. * @author dayrain
  4. */
  5. @Data
  6. @AllArgsConstructor
  7. @NoArgsConstructor
  8. public class Group {
  9. /**
  10. * 组号
  11. */
  12. private String groupId;
  13. /**
  14. * 成员
  15. */
  16. private List<String> userIds;
  17. }

服务端

首先服务端肯定是要常驻内存,随时接受来自客户端的连接。

  1. /**
  2. * tcp服务端
  3. * @author dayrain
  4. */
  5. public class TcpServer {
  6. public TcpServer() throws IOException, ClassNotFoundException {
  7. ServerSocket serverSocket = new ServerSocket(8081);
  8. while (true) {
  9. Socket socket = serverSocket.accept();
  10. TcpServerThread tcpServerThread = new TcpServerThread(socket);
  11. tcpServerThread.start();
  12. }
  13. }
  14. public static void main(String[] args) throws IOException, ClassNotFoundException {
  15. new TcpServer();
  16. }
  17. }

Socket socket = serverSocket.accept();  系统用启动后会阻塞在这里,直到有客户端连接上。

当有客户端连接时,会把socket注入到一个线程里,并启动了这个线程。下面是这个线程的源码:

  1. /**
  2. * 服务线程(有一个客户端连接,就有一条线程)
  3. * @author dayrain
  4. */
  5. public class TcpServerThread extends Thread{
  6. private final Socket socket;
  7. private InputStream inputStream;
  8. private OutputStream outputStream;
  9. public TcpServerThread(Socket socket) {
  10. this.socket = socket;
  11. try {
  12. inputStream = socket.getInputStream();
  13. outputStream = socket.getOutputStream();
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. @Override
  19. public void run() {
  20. while (true) {
  21. try {
  22. Message message = MessageUtils.readMessageFromInputStream(inputStream);
  23. if(Constants.MessageType.LOGIN.equals(message.getType())) {
  24. //登录
  25. TcpServerThreadGroup.addServerThread(message.getSendFrom(), this);
  26. System.out.println(message.getSendFrom() + "登录成功!");
  27. continue;
  28. }
  29. if(Constants.MessageType.PRIVATE_MESSAGE.equals(message.getType())) {
  30. //私聊, 发送给指定的用户
  31. String sendTo = message.getSendTo();
  32. TcpServerThreadGroup.getServerThread(sendTo).sendMessageToThisOne(message);
  33. }else if(Constants.MessageType.GROUP_MESSAGE.equals(message.getType())) {
  34. //群组消息,转发给同组的所有人(除了自己)
  35. String groupId = message.getSendTo();
  36. Group group = new GroupService().getGroupByGroupId(groupId);
  37. for (String userId : group.getUserIds()) {
  38. if(!userId.equals(message.getSendFrom())) {
  39. TcpServerThread serverThread = TcpServerThreadGroup.getServerThread(userId);
  40. if(serverThread != null) {
  41. TcpServerThreadGroup.getServerThread(userId).sendMessageToThisOne(message);
  42. }
  43. }
  44. }
  45. }else if(Constants.MessageType.EXIT.equals(message.getType())) {
  46. //退出
  47. ResourceCloseUtils.closeSocketResource(null, socket);
  48. ResourceCloseUtils.closeStreamResource(inputStream, outputStream);
  49. break;
  50. }
  51. } catch (IOException | ClassNotFoundException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. /**
  57. * 把消息发送给当前用户
  58. * @param message 消息
  59. * @throws IOException 异常
  60. */
  61. public void sendMessageToThisOne(Message message) throws IOException {
  62. MessageUtils.writeMessageFromOutputStream(this.outputStream, message);
  63. }
  64. }

正如”思路“中描述的,服务端充当一个中介的作用,只负责转发。

run方法里面是个死循环,保持和客户端的连接,循环处理客户端发过来的消息。

登录成功后,我们把当前线程加入到一个threadGroup中,threadGroup是一个主控,实际上就是一个map,以键值对的形式存储与客户端的连接。

当服务端需要把消息转发给某个人时,就去这个group中拿。具体实现如下:

  1. /**
  2. * 线程组,服务器管理所有连接的线程
  3. * @author dayrain
  4. */
  5. public class TcpServerThreadGroup {
  6. private static final ConcurrentHashMap<String, TcpServerThread> tcpServerGroup = new ConcurrentHashMap<>();
  7. public static void addServerThread(String userId, TcpServerThread tcpServerThread) {
  8. tcpServerGroup.put(userId, tcpServerThread);
  9. }
  10. public static TcpServerThread getServerThread(String userId) {
  11. return tcpServerGroup.get(userId);
  12. }
  13. }

群聊的时候,需要获取群组信息,我这里是写死的,也可以从数据库中取。

  1. /**
  2. * 组服务
  3. * @author dayrain
  4. */
  5. public class GroupService {
  6. private static HashMap <String, Group> groupDao = null;
  7. static {
  8. groupDao = new HashMap<>();
  9. groupDao.put("1", new Group("1", Arrays.asList("张三", "李四", "王二")));
  10. groupDao.put("2", new Group("2", Arrays.asList("李四", "王二", "赵六")));
  11. }
  12. Group getGroupByGroupId(String groupId) {
  13. return groupDao.get(groupId);
  14. }
  15. }

消息处理工具类

  1. /**
  2. * 消息发送、接收的工具类
  3. * @author dayrain
  4. */
  5. public class MessageUtils {
  6. /**
  7. * 从输入流中获取对方发送的消息
  8. * @param inputStream 输入流
  9. * @return 消息
  10. * @throws IOException io异常
  11. * @throws ClassNotFoundException 消息格式不正确
  12. */
  13. public static Message readMessageFromInputStream(InputStream inputStream) throws IOException, ClassNotFoundException {
  14. if(inputStream == null) {
  15. return null;
  16. }
  17. ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
  18. return (Message) objectInputStream.readObject();
  19. }
  20. /**
  21. * 向输出流中写信息
  22. * @param outputStream 输出流
  23. * @param message 消息
  24. * @throws IOException 异常
  25. */
  26. public static void writeMessageFromOutputStream(OutputStream outputStream, Message message) throws IOException {
  27. if(outputStream == null) {
  28. return;
  29. }
  30. ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
  31. objectOutputStream.writeObject(message);
  32. }
  33. }

客户端

客户端和服务器建立连接后,会得到一个Socket对象。下面是客户端的一段代码:

  1. /**
  2. * 客户端
  3. * @author dayrain
  4. */
  5. public class TcpClient {
  6. private static String userId = null;
  7. public static void main(String[] args) throws IOException, InterruptedException {
  8. Socket socket = new Socket(InetAddress.getLocalHost(), 8081);
  9. System.out.println("========请先登录========");
  10. userId = login(socket);
  11. ReadThread readThread = new ReadThread(socket);
  12. readThread.start();
  13. while (true) {
  14. System.out.println("========1.单聊========");
  15. System.out.println("========2.群聊========");
  16. System.out.println("========3.退出========");
  17. String choice = ScannerUtils.readMessage();
  18. switch (choice){
  19. case "1":
  20. //私聊
  21. privateChat(socket);
  22. break;
  23. case "2":
  24. //群聊
  25. groupChat(socket);
  26. break;
  27. default:
  28. //退出系统
  29. exitChat(socket);
  30. System.out.println("已退出登录");
  31. System.exit(0);
  32. return;
  33. }
  34. }
  35. }
  36. /**
  37. * 退出聊天
  38. * @param socket
  39. * @throws IOException
  40. */
  41. private static void exitChat(Socket socket) throws IOException {
  42. Message message = Message.createExitMessage(userId);
  43. MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), message);
  44. }
  45. /**
  46. * 群聊
  47. * @param socket
  48. * @throws IOException
  49. */
  50. private static void groupChat(Socket socket) throws IOException {
  51. System.out.println("----------输入群组号----------");
  52. String groupId = ScannerUtils.readMessage();
  53. while (true) {
  54. System.out.println("输入你要发送的消息(exit 退出)");
  55. String content = ScannerUtils.readMessage();
  56. if(Constants.EXIT_FLAG.equals(content)) {
  57. break;
  58. }
  59. Message groupMessage = Message.createGroupMessage(userId, groupId, content);
  60. MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), groupMessage);
  61. }
  62. }
  63. /**
  64. * 单聊
  65. * @param socket
  66. * @throws IOException
  67. */
  68. private static void privateChat(Socket socket) throws IOException {
  69. System.out.println("----------输入对方用户名----------");
  70. String sendTo = ScannerUtils.readMessage();
  71. while (true) {
  72. System.out.println("输入你要发送的消息(exit 退出)");
  73. String message = ScannerUtils.readMessage();
  74. if(Constants.EXIT_FLAG.equals(message)) {
  75. break;
  76. }
  77. Message privateMessage = Message.createPrivateMessage(userId, sendTo, message);
  78. MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), privateMessage);
  79. }
  80. }
  81. /**
  82. * 登录
  83. * @param socket
  84. * @return 当前用户的id
  85. * @throws IOException
  86. */
  87. private static String login(Socket socket) throws IOException {
  88. System.out.println("输入登录名");
  89. String userId = ScannerUtils.readMessage();
  90. Message loginMessage = Message.createLoginMessage(userId);
  91. MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), loginMessage);
  92. return userId;
  93. }
  94. }

上面的代码可以把用户的各种需求推送给服务器,但却没有接收消息相关的代码?那是因为我把输入和输出分开了。

Java BIO中,有两个阻塞的地方:

serverSocket.accept()   //服务端等待客户端连接。

socket.getInputStream().read()  //服务器或者客户端,阻塞等待对方发送的消息。

如果输入和输出不分开,放在一个线程里,会出现阻塞的情况。就算处理好,聊天的时候也会出现只能一问一答的尴尬情况。

为了不影响用户体验,我再给read操作单独开一个线程。

ReadThread类

  1. /**
  2. * 每一个客户端都另开一个线程,循环读取客户端发过来的消息
  3. * @author dayrain
  4. */
  5. public class ReadThread extends Thread {
  6. private final Socket socket;
  7. public ReadThread(Socket socket) {
  8. this.socket = socket;
  9. }
  10. @Override
  11. public void run() {
  12. try {
  13. while (true) {
  14. if(!socket.isClosed()) {
  15. Message message = MessageUtils.readMessageFromInputStream(socket.getInputStream());
  16. if(Constants.MessageType.PRIVATE_MESSAGE.equals(message.getType())) {
  17. System.out.println("【悄悄话】【"+message.getSendTime()+"】收到了来自【" + message.getSendFrom() + "】的一条消息,内容为: 【" + message.getContent() + "】");
  18. }
  19. if(Constants.MessageType.GROUP_MESSAGE.equals(message.getType())) {
  20. System.out.println("【群消息】【"+message.getSendTime()+"】收到了来自群【" + message.getSendTo() + "】,用户名为【" + message.getSendFrom() + "】发来了一条消息,内容为: 【" + message.getContent() + "】");
  21. }
  22. }
  23. }
  24. } catch (IOException | ClassNotFoundException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }

分析

当然上面的代码只是简单的demo,还有很多问题需要解决。

比如:

1、用户认证问题。上述代码未实现登录功能,包括消息体中的sendFrom依赖客户端,容易被篡改。

其实我们可以仿造传统的cookie+session模式,登录成功后返回token,客户端每次访问接口,都带上token。

2、持久化问题。如果想实现消息存储的功能,服务端接收到消息后,在转发的同时,也要进行持久化操作。

3、序列化问题。这个demo中,因为只有一个Message类型,客户端和服务端都是Java写的,所以就用自带的ObjectOutputStream、ObjectInputStream来序列化和反序列化。

之前做的一个项目用websocket实现后台推送的,前后端交互的是json字符串,那样的话就需要自己处理了。

源码地址

其他没贴出来的代码,都是简化操作的工具类。

需要完整代码的小伙伴,可自取:

https://gitee.com/DayCloud/dayrain-demo/tree/master/java-net/tcp-chatroom

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/178632
推荐阅读
相关标签
  

闽ICP备14008679号