当前位置:   article > 正文

Websocket定时监控+开关+参数传递_socket定时器

socket定时器

举个例子,当开发中遇到了这样的需求时

需求:需要定时监控多个设备的温度情况,并且需要设置开关控制监控任务

       因为需要定时不断的给前端返回我们业务处理好的数据,这个时候HTTP请求显然不能很好的满足我们的需求,除非用定时器不断的发请求,这样会造成资源的浪费,而websocket是一种长连接,一旦连接服务端就就可以不断地向客户端推送数据,不用一直三次握手四次握手。

       因此这里采用websocket的方式来实现业务


1. 配置WebSocket首先需要建立WebSocket的配置类

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  4. /**
  5. * websocket 配置
  6. *
  7. * @author
  8. */
  9. @Configuration
  10. public class WebSocketConfig
  11. {
  12. @Bean
  13. public ServerEndpointExporter serverEndpointExporter()
  14. {
  15. return new ServerEndpointExporter();
  16. }
  17. }

 2.具体配置WebSocket

  1. import com.alibaba.fastjson2.JSON;
  2. import com.github.pagehelper.PageInfo;
  3. import com.ruoyi.common.constant.HttpStatus;
  4. import com.ruoyi.common.core.domain.AjaxResult;
  5. import com.ruoyi.common.core.page.TableDataInfo;
  6. import com.ruoyi.common.core.redis.RedisCache;
  7. import com.ruoyi.common.utils.StringUtils;
  8. import com.ruoyi.common.utils.spring.SpringUtils;
  9. import com.ruoyi.framework.websocket.SemaphoreUtils;
  10. import com.ruoyi.framework.websocket.WebSocketUsers;
  11. import com.ruoyi.system.domain.Equipment;
  12. import com.ruoyi.system.service.IEquipmentService;
  13. import constant.Constants;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.stereotype.Component;
  17. import javax.websocket.*;
  18. import javax.websocket.server.PathParam;
  19. import javax.websocket.server.ServerEndpoint;
  20. import java.io.IOException;
  21. import java.util.List;
  22. import java.util.Map;
  23. import java.util.concurrent.ConcurrentHashMap;
  24. import java.util.concurrent.Semaphore;
  25. /**
  26. * websocket 消息处理
  27. *
  28. * @author
  29. */
  30. //交给Spring来管理
  31. @Component
  32. //建立连接的端点
  33. @ServerEndpoint("/websocket/{id}")
  34. public class WebSocketServer {
  35. /**
  36. * WebSocketServer 日志控制器
  37. */
  38. private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
  39. /**
  40. * 默认最多允许同时在线人数100
  41. */
  42. public static int socketMaxOnlineCount = 100;
  43. /**
  44. * 信号量控制在线人数
  45. */
  46. private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
  47. /**
  48. * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  49. */
  50. public static ConcurrentHashMap<Integer, WebSocketServer> webSocketSet = new ConcurrentHashMap<Integer, WebSocketServer>();
  51. /**
  52. * concurrent包的线程安全hashMap,用来存放每一个session里的参数信息
  53. */
  54. private static ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
  55. /**
  56. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  57. */
  58. private Session session;
  59. /**
  60. * 传过来的id 1:搜索设备,2:温度报警,3:查询设备是否断开连接
  61. */
  62. private Integer id = 0;
  63. /**
  64. * 连接建立成功调用的方法
  65. */
  66. @OnOpen
  67. public void onOpen(@PathParam(value = "id") Integer param, Session session) throws Exception {
  68. boolean semaphoreFlag = false;
  69. // 尝试获取信号量
  70. semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
  71. if (!semaphoreFlag) {
  72. // 未获取到信号量
  73. LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
  74. //WebSocketUsers.sendMessageToUserByText(session, "忙碌中,请稍后再试...");
  75. session.close();
  76. } else {
  77. // 添加用户
  78. //WebSocketUsers.put(session.getId(), session);
  79. //接收客户端编号并且加入到set集合当中
  80. id = param;
  81. this.session = session;
  82. webSocketSet.put(param, this);
  83. //获取session当中的参数
  84. Map<String, List<String>> map = session.getRequestParameterMap();
  85. if (!map.isEmpty()){
  86. //把取出来的参数存入hashmap当中
  87. //当连接为ws://127.0.0.1:8080/websocket/2?equipmentIds=1时
  88. hashMap.put(session.getId(),map.get("equipmentIds").get(0));
  89. }
  90. LOGGER.info("\n 建立连接 - {}", session);
  91. //LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
  92. }
  93. }
  94. /**
  95. * 连接关闭时处理
  96. */
  97. @OnClose
  98. public void onClose(Session session) {
  99. LOGGER.info("\n 关闭连接 - {}", session);
  100. // 移除用户
  101. WebSocketUsers.remove(session.getId());
  102. webSocketSet.remove(id);
  103. // 获取到信号量则需释放
  104. SemaphoreUtils.release(socketSemaphore);
  105. }
  106. /**
  107. * 抛出异常时处理
  108. */
  109. @OnError
  110. public void onError(Session session, Throwable exception) throws Exception {
  111. if (session.isOpen()) {
  112. // 关闭连接
  113. session.close();
  114. }
  115. String sessionId = session.getId();
  116. LOGGER.info("\n 连接异常 - {}", sessionId);
  117. LOGGER.info("\n 异常信息 - {}", exception);
  118. // 移出用户
  119. //WebSocketUsers.remove(sessionId);
  120. // 获取到信号量则需释放
  121. SemaphoreUtils.release(socketSemaphore);
  122. }
  123. public void sendMessage(String message) throws IOException {
  124. synchronized (session) {
  125. getSession().getBasicRemote().sendText(message);
  126. }
  127. }
  128. /**
  129. * 服务器接收到客户端消息时调用的方法
  130. */
  131. @OnMessage
  132. public void onMessage(String message, Session session) throws Exception {
  133. //业务
  134. }
  135. /**
  136. * 给指定的人发送消息
  137. *
  138. * @param message
  139. */
  140. public static void sendToMessageById(Integer id, String message) {
  141. try {
  142. if (webSocketSet.get(id) != null) {
  143. webSocketSet.get(id).sendMessage(message);
  144. } else {
  145. System.out.println("webSocketSet中没有此key,不推送消息");
  146. }
  147. } catch (IOException e) {
  148. e.printStackTrace();
  149. }
  150. }
  151. public Session getSession() {
  152. return session;
  153. }
  154. public static ConcurrentHashMap<String, String> getHashMap() {
  155. return hashMap;
  156. }
  157. }

3.信号量工具类(控制在线连接数)

  1. import java.util.concurrent.Semaphore;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. /**
  5. * 信号量相关处理
  6. *
  7. * @author ruoyi
  8. */
  9. public class SemaphoreUtils
  10. {
  11. /**
  12. * SemaphoreUtils 日志控制器
  13. */
  14. private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
  15. /**
  16. * 获取信号量
  17. *
  18. * @param semaphore
  19. * @return
  20. */
  21. public static boolean tryAcquire(Semaphore semaphore)
  22. {
  23. boolean flag = false;
  24. try
  25. {
  26. flag = semaphore.tryAcquire();
  27. }
  28. catch (Exception e)
  29. {
  30. LOGGER.error("获取信号量异常", e);
  31. }
  32. return flag;
  33. }
  34. /**
  35. * 释放信号量
  36. *
  37. * @param semaphore
  38. */
  39. public static void release(Semaphore semaphore)
  40. {
  41. try
  42. {
  43. semaphore.release();
  44. }
  45. catch (Exception e)
  46. {
  47. LOGGER.error("释放信号量异常", e);
  48. }
  49. }
  50. }

4.控制层监控任务开关

  1. private ScheduledFuture<?> futureAlarmTempMonitor;
  2. /**
  3. * 用来存储温度配置信息
  4. */
  5. public static ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
  6. /**
  7. * 用来存储每一台设备的报警定时任务
  8. */
  9. public static ConcurrentHashMap<Long,ScheduledFuture<?>> equipmentTask=new ConcurrentHashMap<>();
  10. @Bean
  11. public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
  12. return new ThreadPoolTaskScheduler();
  13. }
  14. /**
  15. * 开启温度监控定时任务(整帧检测)
  16. *
  17. */
  18. @ApiOperation("开启温度监控定时任务")
  19. @Log(title = "开启温度监控定时任务", businessType = BusinessType.INSERT)
  20. @PostMapping("/startAlarmTempMonitor")
  21. public AjaxResult startAlarmTempMonitor(@RequestBody TempAlarmConfig tempAlarmConfig) {
  22. //将温度配置的参数存入hashMap当中
  23. hashMap.put(Constants.Temp_Alarm_Config +tempAlarmConfig.getEquipmentId(), JSON.toJSONString(tempAlarmConfig));
  24. //方法会创建一个定时计划ScheduledFuture,在这个方法需要添加两个参数,Runnable(线程接口类) 和CronTrigger(定时任务触发器)
  25. /* 该种方法可以使用cron表达式 最小单位为秒
  26. futureAlarmTempMonitor = threadPoolTaskScheduler.schedule(new TempAlarmRunnable(), new Trigger() {
  27. @Override
  28. public Date nextExecutionTime(TriggerContext triggerContext) {
  29. //周期为帧频执行一次
  30. return new CronTrigger(CronExpressionUtils.conversion(imageFramerate)).nextExecutionTime(triggerContext);
  31. }
  32. });*/
  33. //定时计划,最小单位为毫秒,在这里每五秒执行一次
  34. futureAlarmTempMonitor = threadPoolTaskScheduler.scheduleWithFixedDelay(new TempAlarmRunnable(), 5000);
  35. //把定时任务保存进hashMap当中
  36. equipmentTask.put(tempAlarmConfig.getEquipmentId(), futureAlarmTempMonitor);
  37. System.out.println("开启温度监控定时任务");
  38. //报警配置存到数据库当中
  39. //业务
  40. return AjaxResult.success();
  41. }
  42. @ApiOperation("关闭温度监控定时任务")
  43. @PostMapping("/stopAlarmTempMonitor/{equipmentId}")
  44. //这个id在TempAlarmConfig实体类当中
  45. public AjaxResult stopAlarmTempMonitor(@PathVariable Long equipmentId){
  46. if (!equipmentTask.isEmpty()){
  47. ScheduledFuture<?> scheduledFuture = equipmentTask.get(equipmentId);
  48. scheduledFuture.cancel(true);
  49. }
  50. System.out.println("关闭温度监控定时任务");
  51. return AjaxResult.success();
  52. }

5.实现Runnable接口

  1. import cn.hutool.core.collection.CollectionUtil;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.ruoyi.common.constant.Constants;
  4. import com.ruoyi.common.utils.spring.SpringUtils;
  5. import com.ruoyi.system.domain.AlarmData;
  6. import com.ruoyi.system.domain.Equipment;
  7. import com.ruoyi.system.domain.TempAlarmConfig;
  8. import com.ruoyi.system.domain.TempData;
  9. import com.ruoyi.system.sdk.SdkManager;
  10. import com.ruoyi.system.service.IAlarmDataService;
  11. import com.ruoyi.system.service.IEquipmentService;
  12. import com.ruoyi.system.service.ITempAlarmConfigService;
  13. import com.ruoyi.web.controller.thermometry.AlarmDataController;
  14. import com.ruoyi.web.controller.websocket.WebSocketServer;
  15. import com.ruoyi.web.core.config.EquipmentContext;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import javax.swing.*;
  18. import java.util.concurrent.ConcurrentHashMap;
  19. /**
  20. * 监控温度报警线程
  21. * sendToMessageById:2 表示监控温度报警的webSocket
  22. */
  23. public class TempAlarmRunnable implements Runnable {
  24. @Override
  25. public void run() {
  26. if (CollectionUtil.isNotEmpty(WebSocketServer.webSocketSet)) {
  27. //取出存入hashMap当中的值
  28. ConcurrentHashMap<String, String> hashMap = AlarmDataController.hashMap;
  29. //取出对应的设备ID 2表示对应的温度报警webSocket
  30. String sessionId = WebSocketServer.webSocketSet.get(2).getSession().getId();
  31. String equipmentId = WebSocketServer.getHashMap().get(sessionId);
  32. if (!hashMap.isEmpty()) {
  33. String s = hashMap.get(Constants.Temp_Alarm_Config + equipmentId);
  34. //获取温度报警配置参数
  35. TempAlarmConfig tempAlarmConfig = JSON.parseObject(s, TempAlarmConfig.class);
  36. selectAlarmData(tempAlarmConfig);
  37. }
  38. }
  39. }
  40. public void selectAlarmData(TempAlarmConfig tempAlarmConfig) {
  41. //给前台返回消息
  42. WebSocketServer.sendToMessageById(2,JSON.toJSONString(业务));
  43. }
  44. }

that's all

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/853461
推荐阅读
相关标签
  

闽ICP备14008679号