赞
踩
举个例子,当开发中遇到了这样的需求时
需求:需要定时监控多个设备的温度情况,并且需要设置开关控制监控任务
因为需要定时不断的给前端返回我们业务处理好的数据,这个时候HTTP请求显然不能很好的满足我们的需求,除非用定时器不断的发请求,这样会造成资源的浪费,而websocket是一种长连接,一旦连接服务端就就可以不断地向客户端推送数据,不用一直三次握手四次握手。
因此这里采用websocket的方式来实现业务
1. 配置WebSocket首先需要建立WebSocket的配置类
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- /**
- * websocket 配置
- *
- * @author
- */
- @Configuration
- public class WebSocketConfig
- {
- @Bean
- public ServerEndpointExporter serverEndpointExporter()
- {
- return new ServerEndpointExporter();
- }
- }

2.具体配置WebSocket
- import com.alibaba.fastjson2.JSON;
- import com.github.pagehelper.PageInfo;
- import com.ruoyi.common.constant.HttpStatus;
- import com.ruoyi.common.core.domain.AjaxResult;
- import com.ruoyi.common.core.page.TableDataInfo;
- import com.ruoyi.common.core.redis.RedisCache;
- import com.ruoyi.common.utils.StringUtils;
- import com.ruoyi.common.utils.spring.SpringUtils;
- import com.ruoyi.framework.websocket.SemaphoreUtils;
- import com.ruoyi.framework.websocket.WebSocketUsers;
- import com.ruoyi.system.domain.Equipment;
- import com.ruoyi.system.service.IEquipmentService;
- import constant.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Semaphore;
-
- /**
- * websocket 消息处理
- *
- * @author
- */
-
- //交给Spring来管理
- @Component
- //建立连接的端点
- @ServerEndpoint("/websocket/{id}")
- public class WebSocketServer {
- /**
- * WebSocketServer 日志控制器
- */
- private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
-
- /**
- * 默认最多允许同时在线人数100
- */
- public static int socketMaxOnlineCount = 100;
-
- /**
- * 信号量控制在线人数
- */
- private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
-
-
- /**
- * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
- */
- public static ConcurrentHashMap<Integer, WebSocketServer> webSocketSet = new ConcurrentHashMap<Integer, WebSocketServer>();
-
- /**
- * concurrent包的线程安全hashMap,用来存放每一个session里的参数信息
- */
- private static ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
-
- /**
- * 与某个客户端的连接会话,需要通过它来给客户端发送数据
- */
- private Session session;
-
- /**
- * 传过来的id 1:搜索设备,2:温度报警,3:查询设备是否断开连接
- */
- private Integer id = 0;
-
-
- /**
- * 连接建立成功调用的方法
- */
- @OnOpen
- public void onOpen(@PathParam(value = "id") Integer param, Session session) throws Exception {
- boolean semaphoreFlag = false;
- // 尝试获取信号量
- semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
- if (!semaphoreFlag) {
- // 未获取到信号量
- LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
- //WebSocketUsers.sendMessageToUserByText(session, "忙碌中,请稍后再试...");
- session.close();
- } else {
- // 添加用户
- //WebSocketUsers.put(session.getId(), session);
- //接收客户端编号并且加入到set集合当中
- id = param;
- this.session = session;
- webSocketSet.put(param, this);
-
- //获取session当中的参数
- Map<String, List<String>> map = session.getRequestParameterMap();
- if (!map.isEmpty()){
- //把取出来的参数存入hashmap当中
- //当连接为ws://127.0.0.1:8080/websocket/2?equipmentIds=1时
- hashMap.put(session.getId(),map.get("equipmentIds").get(0));
- }
- LOGGER.info("\n 建立连接 - {}", session);
- //LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
- }
- }
-
- /**
- * 连接关闭时处理
- */
- @OnClose
- public void onClose(Session session) {
- LOGGER.info("\n 关闭连接 - {}", session);
- // 移除用户
- WebSocketUsers.remove(session.getId());
- webSocketSet.remove(id);
- // 获取到信号量则需释放
- SemaphoreUtils.release(socketSemaphore);
- }
-
- /**
- * 抛出异常时处理
- */
- @OnError
- public void onError(Session session, Throwable exception) throws Exception {
- if (session.isOpen()) {
- // 关闭连接
- session.close();
- }
- String sessionId = session.getId();
- LOGGER.info("\n 连接异常 - {}", sessionId);
- LOGGER.info("\n 异常信息 - {}", exception);
- // 移出用户
- //WebSocketUsers.remove(sessionId);
- // 获取到信号量则需释放
- SemaphoreUtils.release(socketSemaphore);
- }
-
- public void sendMessage(String message) throws IOException {
- synchronized (session) {
- getSession().getBasicRemote().sendText(message);
- }
- }
-
- /**
- * 服务器接收到客户端消息时调用的方法
- */
- @OnMessage
- public void onMessage(String message, Session session) throws Exception {
- //业务
- }
-
-
- /**
- * 给指定的人发送消息
- *
- * @param message
- */
- public static void sendToMessageById(Integer id, String message) {
- try {
- if (webSocketSet.get(id) != null) {
- webSocketSet.get(id).sendMessage(message);
- } else {
- System.out.println("webSocketSet中没有此key,不推送消息");
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public Session getSession() {
- return session;
- }
-
- public static ConcurrentHashMap<String, String> getHashMap() {
- return hashMap;
- }
- }

3.信号量工具类(控制在线连接数)
- import java.util.concurrent.Semaphore;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * 信号量相关处理
- *
- * @author ruoyi
- */
- public class SemaphoreUtils
- {
- /**
- * SemaphoreUtils 日志控制器
- */
- private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
-
- /**
- * 获取信号量
- *
- * @param semaphore
- * @return
- */
- public static boolean tryAcquire(Semaphore semaphore)
- {
- boolean flag = false;
-
- try
- {
- flag = semaphore.tryAcquire();
- }
- catch (Exception e)
- {
- LOGGER.error("获取信号量异常", e);
- }
-
- return flag;
- }
-
- /**
- * 释放信号量
- *
- * @param semaphore
- */
- public static void release(Semaphore semaphore)
- {
-
- try
- {
- semaphore.release();
- }
- catch (Exception e)
- {
- LOGGER.error("释放信号量异常", e);
- }
- }
- }

4.控制层监控任务开关
- private ScheduledFuture<?> futureAlarmTempMonitor;
-
- /**
- * 用来存储温度配置信息
- */
- public static ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
-
- /**
- * 用来存储每一台设备的报警定时任务
- */
- public static ConcurrentHashMap<Long,ScheduledFuture<?>> equipmentTask=new ConcurrentHashMap<>();
-
- @Bean
- public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
- return new ThreadPoolTaskScheduler();
- }
-
- /**
- * 开启温度监控定时任务(整帧检测)
- *
- */
- @ApiOperation("开启温度监控定时任务")
- @Log(title = "开启温度监控定时任务", businessType = BusinessType.INSERT)
- @PostMapping("/startAlarmTempMonitor")
- public AjaxResult startAlarmTempMonitor(@RequestBody TempAlarmConfig tempAlarmConfig) {
- //将温度配置的参数存入hashMap当中
- hashMap.put(Constants.Temp_Alarm_Config +tempAlarmConfig.getEquipmentId(), JSON.toJSONString(tempAlarmConfig));
- //方法会创建一个定时计划ScheduledFuture,在这个方法需要添加两个参数,Runnable(线程接口类) 和CronTrigger(定时任务触发器)
- /* 该种方法可以使用cron表达式 最小单位为秒
- futureAlarmTempMonitor = threadPoolTaskScheduler.schedule(new TempAlarmRunnable(), new Trigger() {
- @Override
- public Date nextExecutionTime(TriggerContext triggerContext) {
- //周期为帧频执行一次
- return new CronTrigger(CronExpressionUtils.conversion(imageFramerate)).nextExecutionTime(triggerContext);
- }
- });*/
- //定时计划,最小单位为毫秒,在这里每五秒执行一次
- futureAlarmTempMonitor = threadPoolTaskScheduler.scheduleWithFixedDelay(new TempAlarmRunnable(), 5000);
- //把定时任务保存进hashMap当中
- equipmentTask.put(tempAlarmConfig.getEquipmentId(), futureAlarmTempMonitor);
- System.out.println("开启温度监控定时任务");
- //报警配置存到数据库当中
- //业务
-
- return AjaxResult.success();
- }
-
- @ApiOperation("关闭温度监控定时任务")
- @PostMapping("/stopAlarmTempMonitor/{equipmentId}")
- //这个id在TempAlarmConfig实体类当中
- public AjaxResult stopAlarmTempMonitor(@PathVariable Long equipmentId){
- if (!equipmentTask.isEmpty()){
- ScheduledFuture<?> scheduledFuture = equipmentTask.get(equipmentId);
- scheduledFuture.cancel(true);
- }
- System.out.println("关闭温度监控定时任务");
- return AjaxResult.success();
- }

5.实现Runnable接口
- import cn.hutool.core.collection.CollectionUtil;
- import com.alibaba.fastjson2.JSON;
- import com.ruoyi.common.constant.Constants;
- import com.ruoyi.common.utils.spring.SpringUtils;
- import com.ruoyi.system.domain.AlarmData;
- import com.ruoyi.system.domain.Equipment;
- import com.ruoyi.system.domain.TempAlarmConfig;
- import com.ruoyi.system.domain.TempData;
- import com.ruoyi.system.sdk.SdkManager;
- import com.ruoyi.system.service.IAlarmDataService;
- import com.ruoyi.system.service.IEquipmentService;
- import com.ruoyi.system.service.ITempAlarmConfigService;
- import com.ruoyi.web.controller.thermometry.AlarmDataController;
- import com.ruoyi.web.controller.websocket.WebSocketServer;
- import com.ruoyi.web.core.config.EquipmentContext;
- import org.springframework.beans.factory.annotation.Autowired;
-
- import javax.swing.*;
- import java.util.concurrent.ConcurrentHashMap;
-
-
- /**
- * 监控温度报警线程
- * sendToMessageById:2 表示监控温度报警的webSocket
- */
- public class TempAlarmRunnable implements Runnable {
-
- @Override
- public void run() {
- if (CollectionUtil.isNotEmpty(WebSocketServer.webSocketSet)) {
- //取出存入hashMap当中的值
- ConcurrentHashMap<String, String> hashMap = AlarmDataController.hashMap;
- //取出对应的设备ID 2表示对应的温度报警webSocket
- String sessionId = WebSocketServer.webSocketSet.get(2).getSession().getId();
- String equipmentId = WebSocketServer.getHashMap().get(sessionId);
- if (!hashMap.isEmpty()) {
- String s = hashMap.get(Constants.Temp_Alarm_Config + equipmentId);
- //获取温度报警配置参数
- TempAlarmConfig tempAlarmConfig = JSON.parseObject(s, TempAlarmConfig.class);
- selectAlarmData(tempAlarmConfig);
- }
- }
- }
-
- public void selectAlarmData(TempAlarmConfig tempAlarmConfig) {
-
- //给前台返回消息
- WebSocketServer.sendToMessageById(2,JSON.toJSONString(业务));
- }
- }

that's all
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。