当前位置:   article > 正文

websocket定时推送数据

websocket定时推送数据

示例代码

1、添加pom.xml依赖

  1. <dependency>
  2. <groupId>org.springframework</groupId>
  3. <artifactId>spring-websocket</artifactId>
  4. </dependency>

 2、创建websocket配置类

  1. package com.success.socket;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. /**
  6. * WebSocket配置类
  7. */
  8. @Configuration
  9. public class WebSocketConfig {
  10. @Bean
  11. public ServerEndpointExporter serverEndpointExporter() {
  12. return new ServerEndpointExporter();
  13. }
  14. }

3、创建WebSokcet工具类 

  1. package com.success.socket;
  2. import cn.hutool.json.JSONUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.example.dataservice.models.common.ResultPO;
  6. import com.success.service.HomePageService;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springblade.util.SpringUtils;
  11. import org.springframework.stereotype.Component;
  12. import javax.websocket.*;
  13. import javax.websocket.server.PathParam;
  14. import javax.websocket.server.ServerEndpoint;
  15. import java.io.IOException;
  16. import java.text.SimpleDateFormat;
  17. import java.util.*;
  18. import java.util.concurrent.ConcurrentHashMap;
  19. import java.util.concurrent.atomic.AtomicInteger;
  20. @Component
  21. @ServerEndpoint(value = "/ws/{userId}")
  22. public class SocketServer {
  23. private final static Logger log = LoggerFactory.getLogger(SocketServer.class);
  24. //记录当前在线连接数
  25. private static final AtomicInteger onlineCount = new AtomicInteger(0);
  26. //存放所有在线的客户端
  27. private static final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();
  28. //注入homePageService ,实际的业务在这里实现
  29. private static HomePageService homePageService = SpringUtils.getApplicationContext().getBean(HomePageService.class);
  30. /**
  31. * 连接建立成功调用的方法
  32. * 连接成功返回 {"msg":"ok","event":"open"}
  33. */
  34. @OnOpen
  35. public void onOpen(Session session, @PathParam("userId") String userId) {
  36. onlineCount.incrementAndGet(); // 在线数加1
  37. clients.put(session.getId(), session);
  38. JSONObject json = new JSONObject();
  39. json.put("event", "open");
  40. json.put("msg", "ok");
  41. this.sendMessage(json.toJSONString());
  42. log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
  43. }
  44. /**
  45. * 连接关闭调用的方法
  46. */
  47. @OnClose
  48. public void onClose(Session session) {
  49. onlineCount.decrementAndGet(); // 在线数减1
  50. clients.remove(session.getId());
  51. log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
  52. }
  53. /**
  54. * 收到客户端消息后调用的方法
  55. *
  56. * @param message 客户端发送过来的消息 {"event":"getCountGeneral"}
  57. * 当业务改动数据时,可以主动发消息(不需要客户端主动请求)
  58. */
  59. @OnMessage
  60. public void onMessage(String message, Session session) {
  61. log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
  62. if (StringUtils.isBlank(message)) {
  63. JSONObject jsonRes = new JSONObject();
  64. jsonRes.put("msg", "event无法处理");
  65. this.sendMessage(jsonRes.toJSONString());
  66. }
  67. //解析发送的报文
  68. JSONObject jsonObject = JSON.parseObject(message);
  69. //24小时实时数据 前端发送{"event":"getCountGeneral"}
  70. if ("getCountGeneral".equals(jsonObject.getString("event"))) {
  71. this.sendMessage(getCountGeneral());
  72. }
  73. }
  74. @OnError
  75. public void onError(Session session, Throwable error) {
  76. log.error("链接发生错误:{},原因:{}", session.getId(), error.getMessage());
  77. error.printStackTrace();
  78. }
  79. /**
  80. * 获取24小时实时数据
  81. *
  82. * @return
  83. */
  84. public String getCountGeneral() {
  85. Map<String,Object> map = new HashMap<>();
  86. //业务1
  87. map.put("interfaceCallTop",homePageService.getInterfaceCallTop(params));
  88. //业务2
  89. map.put("countGeneral",homePageService.getCountGeneral1());
  90. return JSONObject.toJSONString(map);
  91. }
  92. /**
  93. * 群发消息
  94. *
  95. * @param message 消息内容
  96. */
  97. public void sendMessage(String message) {
  98. if (clients.isEmpty()) {
  99. log.info("********* 当前无客户端链接 *********");
  100. }
  101. for (Map.Entry<String, Session> sessionEntry : clients.entrySet()) {
  102. Session toSession = sessionEntry.getValue();
  103. toSession.getAsyncRemote().sendText(message);
  104. log.info("服务端发送消息给客户端:{}", toSession.getId());
  105. }
  106. }
  107. /**
  108. * 启动群发任务,用于定时任务
  109. */
  110. public void run() {
  111. //首页24小时实时数据 当日接口访问排行 当日单位调用排行 当日应用调用排行
  112. this.sendMessage(getCountGeneral());
  113. }
  114. }

4、定时任务

有时候需要后端定时推送数据给前端,需要增加以下代码:

  1. package com.success.utils;
  2. import com.success.socket.SocketServer;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.scheduling.annotation.Scheduled;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @Component
  10. @RestController
  11. public class DataPushScheduledTask {
  12. private final static Logger log = LoggerFactory.getLogger(DataPushScheduledTask.class);
  13. /**
  14. * 30秒一次
  15. * 定时场景数据推送
  16. */
  17. @Scheduled(cron = "0/30 * * * * ? ")
  18. @GetMapping("/test")
  19. public void executeDataPush() {
  20. log.info("********* 定时任务执行 *********");
  21. new SocketServer().run();
  22. log.info("********* 定时任务完成 *********");
  23. }
  24. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/445790
推荐阅读
  

闽ICP备14008679号