当前位置:   article > 正文

SpringBoot(java)实现websocket实现实时通信_java springboot websocket

java springboot websocket

一、认识WebSocket

WebSockets是一种在Web应用程序中实现实时通信的技术。它允许客户端和服务器之间建立持久的、双向的通信通道,从而使得服务器可以实时向客户端推送数据,而不需要客户端不断地向服务器发起请求。这种实时通信的能力对于需要即时更新数据的应用程序非常有用,比如在线聊天应用、实时游戏、股票市场更新等。

在使用WebSockets时,通常需要以下步骤:

  1. 建立连接客户端向服务器发起WebSocket连接请求,服务器接受连接请求后,双方建立WebSocket连接。

  2. 通信:一旦建立了连接,客户端和服务器就可以通过该连接进行双向通信,可以发送和接收数据。

  3. 处理消息:客户端和服务器需要处理接收到的消息,并根据需要进行相应的操作。消息的格式和内容可以根据应用程序的需求来定义。

  4. 关闭连接:当通信结束时,可以通过发送关闭消息来关闭WebSocket连接,释放资源。

在实际开发中,可以使用各种现代Web框架和库来简化WebSocket的使用,例如:

  • 在前端,可以使用现代JavaScript框架如Vue.js、React.js或Angular来处理WebSocket连接和消息传递。
  • 在后端,常见的Web框架如Spring Boot(Java)、Express.js(Node.js)、Django(Python)等都提供了对WebSocket的支持。

要实现实时通信,你需要在客户端和服务器端分别实现WebSocket连接的建立和消息的处理逻辑。具体的实现方式会根据你选择的编程语言、框架和库而有所不同。

二、使用WebSocket(参照若依后端SpringBoot,前端Vue.js)

1、在pom.xml中添加websocket依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

2、配置匿名访问

.antMatchers("/websocket/**").permitAll()

3、若依的websocket实现实时通信代码

(1)SemaphoreUtils.java

  1. package com.ruoyi.framework.websocket;
  2. import java.util.concurrent.Semaphore;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. /**
  6. * 信号量相关处理
  7. *
  8. * @author ruoyi
  9. */
  10. public class SemaphoreUtils
  11. {
  12. /**
  13. * SemaphoreUtils 日志控制器
  14. */
  15. private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
  16. /**
  17. * 获取信号量
  18. *
  19. * @param semaphore
  20. * @return
  21. */
  22. public static boolean tryAcquire(Semaphore semaphore)
  23. {
  24. boolean flag = false;
  25. try
  26. {
  27. flag = semaphore.tryAcquire();
  28. }
  29. catch (Exception e)
  30. {
  31. LOGGER.error("获取信号量异常", e);
  32. }
  33. return flag;
  34. }
  35. /**
  36. * 释放信号量
  37. *
  38. * @param semaphore
  39. */
  40. public static void release(Semaphore semaphore)
  41. {
  42. try
  43. {
  44. semaphore.release();
  45. }
  46. catch (Exception e)
  47. {
  48. LOGGER.error("释放信号量异常", e);
  49. }
  50. }
  51. }

(2)WebSocketConfig.java

  1. package com.ruoyi.framework.websocket;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. /**
  6. * websocket 配置
  7. *
  8. * @author ruoyi
  9. */
  10. @Configuration
  11. public class WebSocketConfig
  12. {
  13. @Bean
  14. public ServerEndpointExporter serverEndpointExporter()
  15. {
  16. return new ServerEndpointExporter();
  17. }
  18. }

(3)WebSocketServer.java

  1. package com.ruoyi.framework.websocket;
  2. import java.util.concurrent.Semaphore;
  3. import javax.websocket.OnClose;
  4. import javax.websocket.OnError;
  5. import javax.websocket.OnMessage;
  6. import javax.websocket.OnOpen;
  7. import javax.websocket.Session;
  8. import javax.websocket.server.ServerEndpoint;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.stereotype.Component;
  12. /**
  13. * websocket 消息处理
  14. *
  15. * @author ruoyi
  16. */
  17. @Component
  18. @ServerEndpoint("/websocket/message")
  19. public class WebSocketServer
  20. {
  21. /**
  22. * WebSocketServer 日志控制器
  23. */
  24. private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
  25. /**
  26. * 默认最多允许同时在线人数100
  27. */
  28. public static int socketMaxOnlineCount = 100;
  29. private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
  30. /**
  31. * 连接建立成功调用的方法
  32. */
  33. @OnOpen
  34. public void onOpen(Session session) throws Exception
  35. {
  36. boolean semaphoreFlag = false;
  37. // 尝试获取信号量
  38. semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
  39. if (!semaphoreFlag)
  40. {
  41. // 未获取到信号量
  42. LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
  43. WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
  44. session.close();
  45. }
  46. else
  47. {
  48. // 添加用户
  49. WebSocketUsers.put(session.getId(), session);
  50. LOGGER.info("\n 建立连接 - {}", session);
  51. LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
  52. WebSocketUsers.sendMessageToUserByText(session, "连接成功");
  53. }
  54. }
  55. /**
  56. * 连接关闭时处理
  57. */
  58. @OnClose
  59. public void onClose(Session session)
  60. {
  61. LOGGER.info("\n 关闭连接 - {}", session);
  62. // 移除用户
  63. WebSocketUsers.remove(session.getId());
  64. // 获取到信号量则需释放
  65. SemaphoreUtils.release(socketSemaphore);
  66. }
  67. /**
  68. * 抛出异常时处理
  69. */
  70. @OnError
  71. public void onError(Session session, Throwable exception) throws Exception
  72. {
  73. if (session.isOpen())
  74. {
  75. // 关闭连接
  76. session.close();
  77. }
  78. String sessionId = session.getId();
  79. LOGGER.info("\n 连接异常 - {}", sessionId);
  80. LOGGER.info("\n 异常信息 - {}", exception);
  81. // 移出用户
  82. WebSocketUsers.remove(sessionId);
  83. // 获取到信号量则需释放
  84. SemaphoreUtils.release(socketSemaphore);
  85. }
  86. /**
  87. * 服务器接收到客户端消息时调用的方法
  88. */
  89. @OnMessage
  90. public void onMessage(String message, Session session)
  91. {
  92. String msg = message.replace("你", "我").replace("吗", "");
  93. WebSocketUsers.sendMessageToUserByText(session, msg);
  94. }
  95. }

(4)WebSocketUsers.java

  1. package com.ruoyi.framework.websocket;
  2. import java.io.IOException;
  3. import java.util.Collection;
  4. import java.util.Map;
  5. import java.util.Set;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import javax.websocket.Session;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. /**
  11. * websocket 客户端用户集
  12. *
  13. * @author ruoyi
  14. */
  15. public class WebSocketUsers
  16. {
  17. /**
  18. * WebSocketUsers 日志控制器
  19. */
  20. private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
  21. /**
  22. * 用户集
  23. */
  24. private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
  25. /**
  26. * 存储用户
  27. *
  28. * @param key 唯一键
  29. * @param session 用户信息
  30. */
  31. public static void put(String key, Session session)
  32. {
  33. USERS.put(key, session);
  34. }
  35. /**
  36. * 移除用户
  37. *
  38. * @param session 用户信息
  39. *
  40. * @return 移除结果
  41. */
  42. public static boolean remove(Session session)
  43. {
  44. String key = null;
  45. boolean flag = USERS.containsValue(session);
  46. if (flag)
  47. {
  48. Set<Map.Entry<String, Session>> entries = USERS.entrySet();
  49. for (Map.Entry<String, Session> entry : entries)
  50. {
  51. Session value = entry.getValue();
  52. if (value.equals(session))
  53. {
  54. key = entry.getKey();
  55. break;
  56. }
  57. }
  58. }
  59. else
  60. {
  61. return true;
  62. }
  63. return remove(key);
  64. }
  65. /**
  66. * 移出用户
  67. *
  68. * @param key 键
  69. */
  70. public static boolean remove(String key)
  71. {
  72. LOGGER.info("\n 正在移出用户 - {}", key);
  73. Session remove = USERS.remove(key);
  74. if (remove != null)
  75. {
  76. boolean containsValue = USERS.containsValue(remove);
  77. LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
  78. return containsValue;
  79. }
  80. else
  81. {
  82. return true;
  83. }
  84. }
  85. /**
  86. * 获取在线用户列表
  87. *
  88. * @return 返回用户集合
  89. */
  90. public static Map<String, Session> getUsers()
  91. {
  92. return USERS;
  93. }
  94. /**
  95. * 群发消息文本消息
  96. *
  97. * @param message 消息内容
  98. */
  99. public static void sendMessageToUsersByText(String message)
  100. {
  101. Collection<Session> values = USERS.values();
  102. for (Session value : values)
  103. {
  104. sendMessageToUserByText(value, message);
  105. }
  106. }
  107. /**
  108. * 发送文本消息
  109. *
  110. * @param userName 自己的用户名
  111. * @param message 消息内容
  112. */
  113. public static void sendMessageToUserByText(Session session, String message)
  114. {
  115. if (session != null)
  116. {
  117. try
  118. {
  119. session.getBasicRemote().sendText(message);
  120. }
  121. catch (IOException e)
  122. {
  123. LOGGER.error("\n[发送消息异常]", e);
  124. }
  125. }
  126. else
  127. {
  128. LOGGER.info("\n[你已离线]");
  129. }
  130. }
  131. }

(5)webSocket.js

  1. /**
  2. * 参数说明:
  3. * webSocketURL:String webSocket服务地址 eg: ws://127.0.0.1:8088/websocket (后端接口若为restful风格可以带参数)
  4. * callback:为带一个参数的回调函数
  5. * message:String 要传递的参数值(不是一个必要的参数)
  6. */
  7. export default{
  8. // 初始化webSocket
  9. webSocketInit(webSocketURL){ // ws://127.0.0.1:8088/websocket
  10. this.webSocket = new WebSocket(webSocketURL);
  11. this.webSocket.onopen = this.onOpenwellback;
  12. this.webSocket.onmessage = this.onMessageCallback;
  13. this.webSocket.onerror = this.onErrorCallback;
  14. this.webSocket.onclose = this.onCloseCallback;
  15. },
  16. // 自定义回调函数
  17. setOpenCallback(callback){ // 与服务端连接打开回调函数
  18. this.webSocket.onopen = callback;
  19. },
  20. setMessageCallback(callback){ // 与服务端发送消息回调函数
  21. this.webSocket.onmessage = callback;
  22. },
  23. setErrorCallback(callback){ // 与服务端连接异常回调函数
  24. this.webSocket.onerror = callback;
  25. },
  26. setCloseCallback(callback){ // 与服务端连接关闭回调函数
  27. this.webSocket.onclose = callback;
  28. },
  29. close(){ // 关闭连接
  30. this.webSocket.close();
  31. },
  32. sendMessage(message){ // 发送消息函数
  33. this.webSocket.send(message);
  34. },
  35. }

(6)index.vue

  1. <template>
  2. <div class="contanier" >
  3. </div>
  4. </template>
  5. <script>
  6. import webSocket from '@/utils/webSocket'
  7. import Cookie from 'js-cookie'
  8. export default {
  9. name: "WebSocketTest",
  10. data() {
  11. return {
  12. id: null,
  13. infoList: {
  14. },
  15. webSocketObject: null,
  16. }
  17. },
  18. created() {
  19. this.createWebSocket();
  20. },
  21. methods: {
  22. sendMessage() {
  23. // 数据发生改变时给WebSocket发送消息,让其进行广播操作
  24. webSocket.sendMessage("11111");
  25. },
  26. // 与websocket服务器创建连接
  27. createWebSocket() {
  28. if (typeof (webSocket) === "undefined") {
  29. alert("您的浏览器不支持socket")
  30. }
  31. const TokenKey = Cookie.get("Admin-Token");
  32. webSocket.webSocketInit('ws://127.0.0.1:8080/websocket/message' ) //初始化webSocket
  33. // 按需进行绑定回调函数
  34. webSocket.setOpenCallback(res => {
  35. console.log("连接建立成功", res);
  36. })
  37. webSocket.setMessageCallback(res => {
  38. if (res.data === "连接成功") {
  39. console.log("连接成功");
  40. return;
  41. }
  42. // 在此处进行数据刷新操作即可实现数据发生改变时实时更新数据
  43. console.log("接收到回信", res);
  44. console.log("接收到回信", JSON.parse(res.data));
  45. this.infoList = JSON.parse(res.data);
  46. })
  47. webSocket.setErrorCallback(res => {
  48. console.log("连接异常", res);
  49. })
  50. webSocket.setCloseCallback(res => {
  51. console.log("连接关闭", res);
  52. })
  53. }
  54. }
  55. }
  56. </script>
  57. <style scoped>
  58. </style>

三、实现

前后端同时打开:

关闭后端:

 关闭前端:

后端向前端发送信息(群发):

 WebSocketUsers.sendMessageToUsersByText(JSONObject.toJSONString(list));

 后端向前端发送信息(单发):

 WebSocketUsers.sendMessageToUserByText(value,JSONObject.toJSONString(list));

参考:

插件集成 | RuoYi

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/574857
推荐阅读
相关标签
  

闽ICP备14008679号