当前位置:   article > 正文

WebSocket零基础极速上手开发指南_websocket开发

websocket开发

一、WebSocket后台配置

1、添加依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

2、WebSocket配置类 WebSocketConfig.java

  1. package com.service.websocket.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.web.servlet.config.annotation.CorsRegistry;
  6. import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
  7. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  8. /**
  9. * Websocket 配置
  10. */
  11. @Configuration
  12. @Slf4j
  13. public class WebsocketConfig implements WebMvcConfigurer {
  14. //服务器支持跨域
  15. @Override
  16. public void addCorsMappings(CorsRegistry registry) {
  17. registry.addMapping("/**").allowedOrigins("*")
  18. .allowedMethods("GET", "POST","OPTIONS").allowedHeaders("*")
  19. .exposedHeaders("Access-Control-Allow-Headers",
  20. "Access-Control-Allow-Methods",
  21. "Access-Control-Allow-Origin",
  22. "Access-Control-Max-Age",
  23. "X-Frame-Options")
  24. .allowCredentials(false).maxAge(3600);
  25. }
  26. /** 注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint 。
  27. * 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,
  28. * 因为它将由容器自己提供和管理。在Spring中可以直接使用Java WebSocket API来提供服务,如果使用内置的web容器,需要做的仅仅是需要在下面添加
  29. * */
  30. @Bean
  31. public ServerEndpointExporter serverEndpointExporter() {
  32. return new ServerEndpointExporter();
  33. }
  34. }

3、WebSocket服务 Server

  1. package com.service.websocket.server;
  2. import cn.gooday.jsh.service.common.dto.RestControllerResult;
  3. import java.io.IOException;
  4. import java.nio.ByteBuffer;
  5. import java.text.DateFormat;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. import javax.websocket.EndpointConfig;
  12. import javax.websocket.OnClose;
  13. import javax.websocket.OnError;
  14. import javax.websocket.OnMessage;
  15. import javax.websocket.OnOpen;
  16. import javax.websocket.Session;
  17. import javax.websocket.server.PathParam;
  18. import javax.websocket.server.ServerEndpoint;
  19. import jsh.mg.msg.service.websocket.dto.WebsocketParamsDto;
  20. import jsh.mg.msg.service.websocket.util.HttpUtils;
  21. import jsh.mg.msg.service.websocket.util.RedisUtil;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.apache.commons.lang3.StringUtils;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.stereotype.Component;
  28. /**
  29. * websocket服务器
  30. */
  31. @Component
  32. @ServerEndpoint("/socket/{serialNumber}")
  33. @Slf4j
  34. public class WebsocketServer {
  35. private static final Logger logger = LoggerFactory.getLogger(WebsocketServer.class);
  36. //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  37. private static int onlineCount = 0;
  38. //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
  39. public static ConcurrentHashMap<String, WebsocketServer> webSocketMap =
  40. new ConcurrentHashMap<String, WebsocketServer>();
  41. //与某个客户端的连接会话,需要通过它来给客户端发送数据
  42. private Session session;
  43. private String callBackUrl;
  44. private static String PREX_STRING = "STRING_";
  45. private static String PREX_BINARY = "BINARY_";
  46. private static long EXPIRE_TIME = 604800; //过期时间7天
  47. @Autowired
  48. private RedisUtil redisUtil;
  49. /**
  50. * 连接成功后调用的方法
  51. * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
  52. */
  53. @OnOpen
  54. public void onOpen(@PathParam(value = "serialNumber") String serialNumber, Session session, EndpointConfig config) {
  55. this.session = session;
  56. String queryString = session.getQueryString();
  57. log.info("session.getQueryString()" + queryString);
  58. if (StringUtils.isNotBlank(queryString) && queryString.contains("url=")) {
  59. this.callBackUrl = queryString.replace("url=", "");
  60. }
  61. webSocketMap.put(serialNumber, this) ;
  62. //在线数加1
  63. addOnlineCount();
  64. log.info("当前有连接" + serialNumber + "加入!当前在线人数为" + getOnlineCount());
  65. }
  66. /**
  67. * 连接关闭调用方法
  68. */
  69. @OnClose
  70. public void onClose(@PathParam(value = "serialNumber") String serialNumber) {
  71. if (StringUtils.isNotBlank(serialNumber)) {
  72. log.info("WebsocketServer.onClose() is begin, serialNumber is " + serialNumber);
  73. webSocketMap.remove(serialNumber);
  74. //在线数减1
  75. subOnlineCount();
  76. }
  77. }
  78. /**
  79. * 连接异常
  80. */
  81. @OnError
  82. public void onError(@PathParam(value = "serialNumber") String serialNumber, Throwable error) {
  83. log.info("连接异常: serialNumber is " + serialNumber + ",error is " + error.getMessage());
  84. }
  85. /**
  86. * 接收到客户短消息
  87. */
  88. @OnMessage
  89. public void onMessage(@PathParam(value = "serialNumber") String serialNumber, String message) {
  90. try {
  91. if (!isOnline(serialNumber)) {
  92. return ;
  93. }
  94. WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;
  95. HttpUtils.sendPost(websocketServer.callBackUrl, message);
  96. } catch (Exception e) {
  97. log.info(e.getMessage());
  98. return ;
  99. }
  100. return ;
  101. }
  102. public RestControllerResult<Boolean> checkExist(String serialNumber) {
  103. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  104. if (!isExist(serialNumber)) {
  105. result.setData(false);
  106. result.setErrorMsg("请求的链接:" + serialNumber + "不在该服务器上");
  107. return result;
  108. }
  109. result.setData(true);
  110. return result;
  111. }
  112. public RestControllerResult<Boolean> checkOnline(String serialNumber) {
  113. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  114. if (!isOnline(serialNumber)) {
  115. result.setData(false);
  116. result.setErrorMsg("请求的链接:" + serialNumber + "不在线");
  117. return result;
  118. }
  119. result.setData(true);
  120. return result;
  121. }
  122. public RestControllerResult<Boolean> checkOfflineMessage(String serialNumber) {
  123. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  124. if (!isOnline(serialNumber)) {
  125. result.setData(false);
  126. result.setErrorMsg("请求的链接:" + serialNumber + "不在线");
  127. return result;
  128. }
  129. List<String> messageList = (List<String>)redisUtil.get(PREX_STRING + serialNumber);
  130. List<ByteBuffer> binaryList = (List<ByteBuffer>)redisUtil.get(PREX_BINARY + serialNumber);
  131. if ((null == messageList || messageList.size() < 1) && (null == binaryList || binaryList.size() < 1)) {
  132. result.setData(false);
  133. } else {
  134. result.setData(true);
  135. }
  136. return result;
  137. }
  138. public RestControllerResult<Boolean> pullOfflineMessage(String serialNumber, Boolean isAsync, Boolean flag) {
  139. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  140. try {
  141. if (!isOnline(serialNumber)) {
  142. result.setData(false);
  143. result.setErrorMsg("请求的链接:" + serialNumber + "不在线");
  144. return result;
  145. }
  146. WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;
  147. List<String> messageList = (List<String>)redisUtil.get(PREX_STRING + serialNumber);
  148. if (null != messageList && messageList.size() > 0 ) {
  149. Boolean isSuccess = false;
  150. if (!isAsync) {
  151. isSuccess = sendBasicMessage(websocketServer.session, messageList, flag);
  152. } else {
  153. isSuccess = sendAsyncMessage(websocketServer.session, messageList);
  154. }
  155. if (isSuccess) {
  156. redisUtil.delete(PREX_STRING + serialNumber);
  157. }
  158. }
  159. List<ByteBuffer> binaryList = (List<ByteBuffer>)redisUtil.get(PREX_BINARY + serialNumber);
  160. if (null != binaryList && binaryList.size() > 0 ) {
  161. Boolean isSuccess = false;
  162. if (!isAsync) {
  163. isSuccess = sendBasicBinaryMessage(websocketServer.session, binaryList, flag);
  164. } else {
  165. isSuccess = sendAsyncBinaryMessage(websocketServer.session, binaryList);
  166. }
  167. if (isSuccess) {
  168. redisUtil.delete(PREX_BINARY + serialNumber);
  169. }
  170. }
  171. } catch (IOException e) {
  172. log.info(e.getMessage());
  173. result.setData(false);
  174. result.setErrorMsg(e.getMessage());
  175. return result;
  176. }
  177. return result;
  178. }
  179. /**
  180. * 给所有人发消息
  181. */
  182. public RestControllerResult<Boolean> sendToAll(WebsocketParamsDto paramsDto) {
  183. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  184. //遍历HashMap
  185. for (String serialNumber : webSocketMap.keySet()) {
  186. try {
  187. if (paramsDto.getIsBinary()) {
  188. sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());
  189. } else {
  190. sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());
  191. }
  192. } catch (IOException e) {
  193. log.info(e.getMessage());
  194. if (paramsDto.getIsBinary()) {
  195. redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
  196. } else {
  197. redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  198. }
  199. }
  200. }
  201. result.setData(true);
  202. return result;
  203. }
  204. /**
  205. * 给指定的终端发送消息
  206. */
  207. public RestControllerResult<Boolean> sendToTerminal(WebsocketParamsDto paramsDto) {
  208. log.info("WebsocketServer.sendToSingle() is begin, serialNumber is " + paramsDto.getSerialNumber());
  209. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  210. String toSerialNumber = paramsDto.getSerialNumber();
  211. try {
  212. if(!isOnline(toSerialNumber)) {
  213. result.setData(false);
  214. result.setErrorMsg("请求的链接:" + toSerialNumber + "不在该服务器上");
  215. log.info("请求的链接:" + toSerialNumber + "不在该服务器上");
  216. return result;
  217. }
  218. WebsocketServer websocketServer = webSocketMap.get(toSerialNumber) ;
  219. if (paramsDto.getIsBinary()) {
  220. sendBasicBinaryMessage(websocketServer.session, paramsDto.getBinaryData(), paramsDto.getFlag());
  221. } else {
  222. sendBasicMessage(websocketServer.session, paramsDto.getMessage(), paramsDto.getFlag());
  223. }
  224. result.setData(true);
  225. } catch (IOException e) {
  226. log.info(e.getMessage());
  227. if (paramsDto.getIsBinary()) {
  228. redisUtil.addToListRightExpire(PREX_STRING + toSerialNumber, EXPIRE_TIME, paramsDto.getMessage());
  229. } else {
  230. redisUtil.addToListRightExpire(PREX_BINARY + toSerialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  231. }
  232. result.setData(false);
  233. result.setErrorMsg("推送消息至链接:" + toSerialNumber + "时系统异常:" + e.getMessage());
  234. return result;
  235. }
  236. return result;
  237. }
  238. /**
  239. * 给所有人发消息
  240. */
  241. public RestControllerResult<Boolean> sendToTerminalList(WebsocketParamsDto paramsDto) {
  242. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  243. List<String> terminalList = paramsDto.getSerialList();
  244. //遍历HashMap
  245. for (String serialNumber : terminalList) {
  246. try {
  247. if (paramsDto.getIsBinary()) {
  248. sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());
  249. } else {
  250. sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());
  251. }
  252. } catch (IOException e) {
  253. log.info(e.getMessage());
  254. if (paramsDto.getIsBinary()) {
  255. redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
  256. } else {
  257. redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  258. }
  259. }
  260. }
  261. result.setData(true);
  262. return result;
  263. }
  264. /**
  265. * 给指定的人发送消息
  266. */
  267. public RestControllerResult<Boolean> sendToTerminalAsync(WebsocketParamsDto paramsDto) {
  268. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  269. String toSerialNumber = paramsDto.getSerialNumber();
  270. try {
  271. if(!isOnline(toSerialNumber)) {
  272. result.setData(false);
  273. result.setErrorMsg("请求的链接:" + toSerialNumber + "不在该服务器上");
  274. log.info("请求的链接:" + toSerialNumber + "不在该服务器上");
  275. return result;
  276. }
  277. WebsocketServer websocketServer = webSocketMap.get(toSerialNumber) ;
  278. if (paramsDto.getIsBinary()) {
  279. sendAsyncBinaryMessage(websocketServer.session, paramsDto.getBinaryData());
  280. } else {
  281. sendAsyncMessage(websocketServer.session, paramsDto.getMessage());
  282. }
  283. result.setData(true);
  284. } catch (IOException e) {
  285. log.info(e.getMessage());
  286. if (paramsDto.getIsBinary()) {
  287. redisUtil.addToListRightExpire(PREX_STRING + toSerialNumber, EXPIRE_TIME, paramsDto.getMessage());
  288. } else {
  289. redisUtil.addToListRightExpire(PREX_BINARY + toSerialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  290. }
  291. result.setData(false);
  292. result.setErrorMsg("推送消息至链接:" + toSerialNumber + "时系统异常:" + e.getMessage());
  293. return result;
  294. }
  295. return result;
  296. }
  297. /**
  298. * 给所有人发消息
  299. */
  300. public RestControllerResult<Boolean> sendToTerminalListAsync(WebsocketParamsDto paramsDto) {
  301. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  302. List<String> terminalList = paramsDto.getSerialList();
  303. for (String serialNumber : terminalList) {
  304. try {
  305. if (paramsDto.getIsBinary()) {
  306. sendAsyncBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData());
  307. } else {
  308. sendAsyncMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage());
  309. }
  310. } catch (IOException e) {
  311. log.info(e.getMessage());
  312. if (paramsDto.getIsBinary()) {
  313. redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
  314. } else {
  315. redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  316. }
  317. }
  318. }
  319. result.setData(true);
  320. return result;
  321. }
  322. /**
  323. * 给所有人发消息
  324. */
  325. public RestControllerResult<Boolean> sendToAllAsync(WebsocketParamsDto paramsDto) {
  326. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  327. for (String serialNumber : webSocketMap.keySet()) {
  328. try {
  329. if (paramsDto.getIsBinary()) {
  330. sendAsyncBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData());
  331. } else {
  332. sendAsyncMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage());
  333. }
  334. } catch (IOException e) {
  335. log.info(e.getMessage());
  336. if (paramsDto.getIsBinary()) {
  337. redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
  338. } else {
  339. redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  340. }
  341. }
  342. }
  343. result.setData(true);
  344. logger.info("给所有人发消息结果: {}", result);
  345. return result;
  346. }
  347. public static Map<String, WebsocketServer> getWebSocketMap() {
  348. return webSocketMap ;
  349. }
  350. /**
  351. * 同步发送消息模式。
  352. * message: 待发送的消息
  353. * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
  354. */
  355. public void sendBasicMessage(Session session, String message, Boolean flag) throws IOException {
  356. log.info("WebsocketServer.sendBasicMessage() is begin, flag is " + flag + " ,message is " + message);
  357. session.getBasicRemote().sendText(message, flag);
  358. log.info("WebsocketServer.sendBasicMessage() is end. ");
  359. }
  360. /**
  361. * 同步发送消息模式。
  362. * message: 待发送的消息
  363. * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
  364. */
  365. public void sendBasicBinaryMessage(Session session, ByteBuffer message, Boolean flag) throws IOException {
  366. log.info("WebsocketServer.sendBasicMessage() is begin, flag is " + flag + " ,message is " + message);
  367. session.getBasicRemote().sendBinary(message, flag);
  368. log.info("WebsocketServer.sendBasicMessage() is end. ");
  369. }
  370. /**
  371. * 异步发送消息模式。
  372. * message: 待发送的消息
  373. * flag: 是否支持发送部分消息
  374. */
  375. public void sendAsyncMessage(Session session, String message) throws IOException {
  376. log.info("WebsocketServer.sendAsyncMessage() is begin, message is " + message);
  377. session.getAsyncRemote().sendText(message);
  378. log.info("WebsocketServer.sendAsyncMessage() is end. ");
  379. }
  380. /**
  381. * 异步发送消息模式。
  382. * message: 待发送的消息
  383. * flag: 是否支持发送部分消息
  384. */
  385. public void sendAsyncBinaryMessage(Session session, ByteBuffer message) throws IOException {
  386. log.info("WebsocketServer.sendAsyncMessage() is begin, message is " + message);
  387. session.getAsyncRemote().sendBinary(message);
  388. log.info("WebsocketServer.sendAsyncMessage() is end. ");
  389. }
  390. /**
  391. * 同步发送消息模式。
  392. * message: 待发送的消息
  393. * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
  394. */
  395. public Boolean sendBasicMessage(Session session, List<String> messageList, Boolean flag) throws IOException {
  396. if (null == messageList || messageList.size() < 1) {
  397. return true;
  398. }
  399. for (int i = 0; i < messageList.size(); i++) {
  400. session.getBasicRemote().sendText(messageList.get(i), flag);
  401. }
  402. return true;
  403. }
  404. /**
  405. * 异步发送消息模式。
  406. * message: 待发送的消息
  407. * flag: 是否支持发送部分消息
  408. */
  409. public Boolean sendAsyncMessage(Session session, List<String> messageList) throws IOException {
  410. if (null == messageList || messageList.size() < 1) {
  411. return true;
  412. }
  413. for (int i = 0; i < messageList.size(); i++) {
  414. session.getAsyncRemote().sendText(messageList.get(i));
  415. }
  416. return true;
  417. }
  418. /**
  419. * 同步发送消息模式。
  420. * message: 待发送的消息
  421. * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
  422. */
  423. public Boolean sendBasicBinaryMessage(Session session, List<ByteBuffer> messageList, Boolean flag) throws IOException {
  424. if (null == messageList || messageList.size() < 1) {
  425. return true;
  426. }
  427. for (int i = 0; i < messageList.size(); i++) {
  428. session.getBasicRemote().sendBinary(messageList.get(i), flag);
  429. }
  430. return true;
  431. }
  432. /**
  433. * 异步发送消息模式。
  434. * message: 待发送的消息
  435. * flag: 是否支持发送部分消息
  436. */
  437. public Boolean sendAsyncBinaryMessage(Session session, List<ByteBuffer> messageList) throws IOException {
  438. if (null == messageList || messageList.size() < 1) {
  439. return true;
  440. }
  441. for (int i = 0; i < messageList.size(); i++) {
  442. session.getAsyncRemote().sendBinary(messageList.get(i));
  443. }
  444. return true;
  445. }
  446. /**
  447. * 获取当前时间
  448. *
  449. * @return
  450. */
  451. private String getNowTime() {
  452. Date date = new Date();
  453. DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  454. String time = format.format(date);
  455. return time;
  456. }
  457. public static synchronized int getOnlineCount() {
  458. return onlineCount;
  459. }
  460. public static synchronized void addOnlineCount() {
  461. WebsocketServer.onlineCount++;
  462. }
  463. public static synchronized void subOnlineCount() {
  464. WebsocketServer.onlineCount--;
  465. }
  466. public Boolean isExist(String serialNumber) {
  467. if(StringUtils.isBlank(serialNumber) || !webSocketMap.containsKey(serialNumber)){
  468. return false;
  469. }
  470. return true;
  471. }
  472. public Boolean isOnline(String serialNumber) {
  473. try {
  474. if(StringUtils.isBlank(serialNumber) || !webSocketMap.containsKey(serialNumber)){
  475. return false;
  476. }
  477. WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;
  478. if (websocketServer == null ) {
  479. return false;
  480. }
  481. } catch (Exception e){
  482. log.error(e.getMessage());
  483. return false;
  484. }
  485. return true;
  486. }
  487. }

4、入参对象配置

  1. package com.service.websocket.dto;
  2. import io.swagger.annotations.ApiModel;
  3. import io.swagger.annotations.ApiModelProperty;
  4. import java.nio.ByteBuffer;
  5. import java.util.List;
  6. import lombok.Data;
  7. import org.w3c.dom.Text;
  8. /**
  9. * Websocket参数 DTO.
  10. *
  11. * @author ****
  12. */
  13. @Data
  14. @ApiModel(value = "WebsocketParamsDto", description = "test")
  15. public class WebsocketParamsDto {
  16. @ApiModelProperty(value = "序列号", name = "serialNumber", example = "key.12345")
  17. private String serialNumber;
  18. @ApiModelProperty(value = "键值", name = "serialList", example = "key.12345")
  19. private List<String> serialList;
  20. @ApiModelProperty(value = "信息类型:true 异步;false 同步", name = "dataType", example = "1/空:Sring;2-binary;3-text;")
  21. private Boolean isBinary;
  22. @ApiModelProperty(value = "信息", name = "message", example = "待发送信息")
  23. private String message;
  24. @ApiModelProperty(value = "二进制数据", name = "binaryData", example = "01010101....")
  25. private ByteBuffer binaryData;
  26. @ApiModelProperty(value = "Text数据", name = "textData", example = "key.67890")
  27. private Text textData;
  28. @ApiModelProperty(value = "模式", name = "同步false/异步模式true", example = "true")
  29. private Boolean isAsync;
  30. @ApiModelProperty(value = "传送标识", name = "flag", example = "true--一次性发送全部信息;false--可部分发送信息;")
  31. private Boolean flag;
  32. }

5、业务调用

  1. /**
  2. * 给所有人发消息
  3. */
  4. public RestControllerResult<Boolean> sendToAll(WebsocketParamsDto paramsDto) {
  5. RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
  6. //遍历HashMap
  7. for (String serialNumber : webSocketMap.keySet()) {
  8. try {
  9. if (paramsDto.getIsBinary()) {
  10. sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());
  11. } else {
  12. sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());
  13. }
  14. } catch (IOException e) {
  15. log.info(e.getMessage());
  16. if (paramsDto.getIsBinary()) {
  17. redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
  18. } else {
  19. redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
  20. }
  21. }
  22. }
  23. result.setData(true);
  24. return result;
  25. }
  1. @Resource
  2. private WebsocketServer webSocketServer;
  3. @Override
  4. public RestControllerResult<Boolean>
  5. sendToAll(@RequestBody WebsocketParamsDto paramsDto) {
  6. return webSocketServer.sendToAll(paramsDto);
  7. }

二、Websocket 测试

1、在线测试

测试URL:Websocket在线测试-Websocket接口测试-Websocket模拟请求工具 (jsons.cn)

2、业务调用 WebSocket 服务,发出消息

通过 postman 测试工具调用接口,或者其他工具

3、Websocket 监听端回应情况如下 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/787263
推荐阅读
相关标签
  

闽ICP备14008679号