当前位置:   article > 正文

SpringBoot+Vue整合WebSocket实现实时通讯_vue中使用websocket通信接收后台数据、实现消息实时通讯

vue中使用websocket通信接收后台数据、实现消息实时通讯

        在开发过程中,我们经常遇到需要对前台的列表数据,实现实时展示最新的几条数据,或者是调度的任务进度条实现实时的刷新......,而对于这种需求,无状态的http协议显然无法满足我们的需求,于是websocket协议应运而生。websocket协议本质上是一个基于tcp的协议,是双向通讯协议,实现了浏览器和客户端的实时通讯,接收端和发送端可以互相发送或接收消息。

        本文整合websocket方式采用后台自定义Endpoint,前端使用内置的WebSocket。

一、SpringBoot配置

1、开发环境

  1. SpringBoot:2.5.13
  2. JDK:1.8

2、引入pom文件

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

3、新建WebSocketConfig配置类

        在配置类中手动注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket端点。@ServerEndpoint注解与@Controller注解类似,都是用来配置请求的uri。

  1. @Configuration
  2. public class WebSocketConfig {
  3. /**
  4. * ServerEndpointExporter类的作用是,会扫描所有的服务器端点,
  5. * 把带有@ServerEndpoint 注解的所有类都添加进来
  6. *
  7. */
  8. @Bean
  9. public ServerEndpointExporter serverEndpointExporter(){
  10. return new ServerEndpointExporter();
  11. }
  12. }

4、新建WebSocketServer服务类

        server类用来对与前端建立的websocket连接做出相应的响应,同时通过该类我们可以主动向前台推送消息。在该类中我们无法使用通过@Resource和@Autowired注入spring容器中的bean,由于spring容器管理的bean都是以单例的形式存在的,而websocket服务类则可以对应多个客户端。 项目初始化启动时,会初始化websocket服务类,此时还没有用户连接,spring会为其注入service, 所以该对象的service不是null,而当新用户建立websocket连接时,系统会新建一个websocket服务类对象, 但不会注入service,导致后续用户连接websocket服务类中的service都是null。

  1. @Slf4j
  2. @Component
  3. @ServerEndpoint("/notice/{userId}")
  4. public class WebSocketServer {
  5. /**
  6. * 解决无法注入bean:定义静态service对象,通过@Autowired在系统启动时为静态变量赋值
  7. * @Autowired 注解作用在方法上,如果方法没有参数,spring容器会在类加载完后执行一次这个方法,
  8. * 如果方法中有参数的话,还会从容器中自动注入这个方法的参数,然后执行一次这个方法。
  9. */
  10. public static XxService xxService;
  11. @Autowired
  12. public void setXxService(XxService xxService){
  13. WebSocketServer.xxService = xxService;
  14. }
  15. //存储客户端session信息
  16. public static Map<String, Session> clients = new ConcurrentHashMap<>();
  17. //存储把不同用户的客户端session信息集合
  18. public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();
  19. //会话id
  20. private String sid = null;
  21. //建立连接的用户id
  22. private String userId;
  23. /**
  24. * @description: 当与前端的websocket连接成功时,执行该方法
  25. * @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解
  26. **/
  27. @OnOpen
  28. public void onOpen(Session session, @PathParam("userId") String userId){
  29. this.sid = UUID.randomUUID().toString();
  30. this.userId = userId;
  31. clients.put(this.sid,session);
  32. //判断该用户是否存在会话信息,不存在则添加
  33. Set<String> clientSet = connection.get(userId);
  34. if (clientSet == null){
  35. clientSet = new HashSet<>();
  36. connection.put(userId,clientSet);
  37. }
  38. clientSet.add(this.sid);
  39. log.info(this.userId + "用户建立连接," + this.sid+"连接开启!");
  40. }
  41. /**
  42. * @description: 当连接失败时,执行该方法
  43. **/
  44. @OnClose
  45. public void onClose(){
  46. clients.remove(this.sid);
  47. log.info(this.sid+"连接断开");
  48. }
  49. /**
  50. * @description: 当收到前台发送的消息时,执行该方法
  51. **/
  52. @OnMessage
  53. public void onMessage(String message,Session session) {
  54. log.info("收到来自用户:" + this.userId + "的信息 " + message);
  55. //自定义消息实体
  56. ViewQueryInfoDTO viewQueryInfoDTO = JSON.parseObject(message, ViewQueryInfoDTO.class);
  57. viewQueryInfoDTO.setUserId(this.userId);
  58. //判断该次请求的消息类型是心跳检测还是获取信息
  59. if (viewQueryInfoDTO.getType().equals("heartbeat")){
  60. //立刻向前台发送消息,代表后台正常运行
  61. sendMessageByUserId(this.userId,new MessageInfo("heartbeat","ok"));
  62. }
  63. if (viewQueryInfoDTO.getType().equals("message")){
  64. //执行业务逻辑
  65. MessageInfo messageInfo = xxService.list(viewQueryInfoDTO);
  66. sendMessageByUserId(this.userId,messageInfo);
  67. }
  68. }
  69. /**
  70. * @description: 当连接发生错误时,执行该方法
  71. **/
  72. @OnError
  73. public void onError(Throwable error){
  74. log.info("系统错误");
  75. error.printStackTrace();
  76. }
  77. /**
  78. * @description: 通过userId向用户发送信息
  79. * 该类定义成静态可以配合定时任务实现定时推送
  80. **/
  81. public static void sendMessageByUserId(String userId, MessageInfo message){
  82. if (!StringUtils.isEmpty(userId)) {
  83. Set<String> clientSet = connection.get(userId);
  84. //用户是否存在客户端连接
  85. if (Objects.nonNull(clientSet)) {
  86. Iterator<String> iterator = clientSet.iterator();
  87. while (iterator.hasNext()) {
  88. String sid = iterator.next();
  89. Session session = clients.get(sid);
  90. //向每个会话发送消息
  91. if (Objects.nonNull(session)){
  92. try {
  93. String jsonString = JSON.toJSONString(message);
  94. //同步发送数据,需要等上一个sendText发送完成才执行下一个发送
  95. session.getBasicRemote().sendText(jsonString);
  96. } catch (IOException e) {
  97. e.printStackTrace();
  98. }
  99. }
  100. }
  101. }
  102. }
  103. }
  104. }

5、修改SecurityConfig配置类

        系统中使用权限框架时,需要对端点进行放行。

  1. @Override
  2. protected void configure(HttpSecurity httpSecurity) throws Exception
  3. {
  4. httpSecurity.authorizeRequests().antMatchers("/notice/**").anonymous();
  5. }

        如果请求还是被拦截,则再加入下面的配置。

  1. //忽略websocket拦截
  2. @Override
  3. public void configure(WebSecurity webSecurity){
  4. webSecurity.ignoring().antMatchers("/notice/**");
  5. }

6、使用连接工具测试

        websocket在线调试网址:websocket/ws/wss在线调试测试工具 (jackxiang.com)

 模拟前台:       后台信息:

二、Vue配置

1、编写websocket.js文件

  1. //暴露自定义websocket对象
  2. export const socket = {
  3. //后台请求路径
  4. url: "",
  5. //websocket对象
  6. websocket: null,
  7. //websocket状态
  8. websocketState: false,
  9. //重新连接次数
  10. reconnectNum: 0,
  11. //重连锁状态,保证重连按顺序执行
  12. lockReconnect: false,
  13. //定时器信息
  14. timeout: null,
  15. clientTimeout: null,
  16. serverTimeout: null,
  17. //初始化方法,根据url创建websocket对象封装基本连接方法,并重置心跳检测
  18. initWebSocket(newUrl) {
  19. socket.url = newUrl;
  20. socket.websocket = new WebSocket(socket.url);
  21. socket.websocket.onopen = socket.websocketOnOpen;
  22. socket.websocket.onerror = socket.websocketOnError;
  23. socket.websocket.onclose = socket.websocketOnClose;
  24. this.resetHeartbeat()
  25. },
  26. reconnect() {
  27. //判断连接状态
  28. if (socket.lockReconnect) return;
  29. socket.reconnectNum += 1;
  30. //重新连接三次还未成功调用连接关闭方法
  31. if (socket.reconnectNum === 3) {
  32. socket.reconnectNum = 0;
  33. socket.websocket.onclose()
  34. return;
  35. }
  36. //等待本次重连完成后再进行下一次
  37. socket.lockReconnect = true;
  38. //5s后进行重新连接
  39. socket.timeout = setTimeout(() => {
  40. socket.initWebSocket(socket.url);
  41. socket.lockReconnect = false;
  42. }, 5000);
  43. },
  44. //重置心跳检测
  45. resetHeartbeat() {
  46. socket.heartbeat();
  47. },
  48. //心跳检测
  49. heartbeat() {
  50. socket.clientTimeout = setTimeout(() => {
  51. if (socket.websocket) {
  52. //向后台发送消息进行心跳检测
  53. socket.websocket.send(JSON.stringify({ type: "heartbeat" }));
  54. socket.websocketState = false;
  55. //一分钟内服务器不响应则关闭连接
  56. socket.serverTimeout = setTimeout(() => {
  57. if (!socket.websocketState) {
  58. socket.websocket.onclose()
  59. } else {
  60. this.resetHeartbeat()
  61. }
  62. }, 60 * 1000);
  63. }
  64. }, 3 * 1000);
  65. },
  66. //发送消息
  67. sendMsg(message) {
  68. socket.websocket.send(message);
  69. },
  70. websocketOnOpen(event) {
  71. //连接开启后向后台发送消息进行一次心跳检测
  72. socket.sendMsg(JSON.stringify({ type: "heartbeat" }));
  73. },
  74. websocketOnError(error) {
  75. console.log(error);
  76. socket.reconnect();
  77. },
  78. websocketOnClose() {
  79. socket.websocket.close();
  80. },
  81. };

2、组件中使用websocket

  1. <script>
  2. //引入socket对象
  3. import { socket } from "@/utils/websocket";
  4. export default {
  5. data() {
  6. return {
  7. loading: true,
  8. websocketCount: -1,
  9. //查询条件
  10. queryCondition: {
  11. type: "message",
  12. },
  13. }
  14. },
  15. created() {
  16. //初始化websocket对象
  17. //window.location.host获取ip和端口,
  18. //process.env.VUE_APP_WEBSOCKET_BASE_API获取请求前缀
  19. socket.initWebSocket(
  20. `ws:${window.location.host}${process.env.VUE_APP_WEBSOCKET_BASE_API}/notice/` +
  21. userId
  22. );
  23. //绑定接收消息方法
  24. socket.websocket.onmessage = this.websocketOnMessage;
  25. },
  26. methods: {
  27. init() {
  28. this.queryCondition.type = "message";
  29. socket.sendMsg(JSON.stringify(this.queryCondition));
  30. },
  31. websocketOnMessage(event) {
  32. //初始化界面时,主动向后台发送一次消息,获取数据
  33. this.websocketCount += 1;
  34. if (this.websocketCount === 0) {
  35. this.init();
  36. }
  37. let info = JSON.parse(event.data);
  38. switch (info.type) {
  39. case "heartbeat":
  40. socket.websocketState = true;
  41. break;
  42. case "message":
  43. this.loading = true;
  44. this.$nextTick(() => {
  45. this.consumeMessage(info);
  46. })
  47. break;
  48. case "error":
  49. this.loading = false;
  50. break;
  51. }
  52. },
  53. consumeMessage(info) {
  54. //拿到最新数据重新渲染界面
  55. },
  56. }
  57. }
  58. </script>

三、前后台连接测试

        前后端可以正常通信响应数据,至此整合websocket完毕。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/576599
推荐阅读
相关标签
  

闽ICP备14008679号