赞
踩
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- package com.service.websocket.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.servlet.config.annotation.CorsRegistry;
- import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- /**
- * Websocket 配置
- */
- @Configuration
- @Slf4j
- public class WebsocketConfig implements WebMvcConfigurer {
-
- //服务器支持跨域
- @Override
- public void addCorsMappings(CorsRegistry registry) {
- registry.addMapping("/**").allowedOrigins("*")
- .allowedMethods("GET", "POST","OPTIONS").allowedHeaders("*")
- .exposedHeaders("Access-Control-Allow-Headers",
- "Access-Control-Allow-Methods",
- "Access-Control-Allow-Origin",
- "Access-Control-Max-Age",
- "X-Frame-Options")
- .allowCredentials(false).maxAge(3600);
- }
-
- /** 注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint 。
- * 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,
- * 因为它将由容器自己提供和管理。在Spring中可以直接使用Java WebSocket API来提供服务,如果使用内置的web容器,需要做的仅仅是需要在下面添加
- * */
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
-
- }

- package com.service.websocket.server;
-
- import cn.gooday.jsh.service.common.dto.RestControllerResult;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import javax.websocket.EndpointConfig;
- import javax.websocket.OnClose;
- import javax.websocket.OnError;
- import javax.websocket.OnMessage;
- import javax.websocket.OnOpen;
- import javax.websocket.Session;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import jsh.mg.msg.service.websocket.dto.WebsocketParamsDto;
- import jsh.mg.msg.service.websocket.util.HttpUtils;
- import jsh.mg.msg.service.websocket.util.RedisUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * websocket服务器
- */
- @Component
- @ServerEndpoint("/socket/{serialNumber}")
- @Slf4j
- public class WebsocketServer {
-
- private static final Logger logger = LoggerFactory.getLogger(WebsocketServer.class);
-
- //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
- private static int onlineCount = 0;
- //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
- public static ConcurrentHashMap<String, WebsocketServer> webSocketMap =
- new ConcurrentHashMap<String, WebsocketServer>();
- //与某个客户端的连接会话,需要通过它来给客户端发送数据
- private Session session;
- private String callBackUrl;
-
- private static String PREX_STRING = "STRING_";
- private static String PREX_BINARY = "BINARY_";
- private static long EXPIRE_TIME = 604800; //过期时间7天
-
- @Autowired
- private RedisUtil redisUtil;
-
- /**
- * 连接成功后调用的方法
- * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
- */
- @OnOpen
- public void onOpen(@PathParam(value = "serialNumber") String serialNumber, Session session, EndpointConfig config) {
- this.session = session;
-
- String queryString = session.getQueryString();
- log.info("session.getQueryString()" + queryString);
- if (StringUtils.isNotBlank(queryString) && queryString.contains("url=")) {
- this.callBackUrl = queryString.replace("url=", "");
- }
- webSocketMap.put(serialNumber, this) ;
-
- //在线数加1
- addOnlineCount();
- log.info("当前有连接" + serialNumber + "加入!当前在线人数为" + getOnlineCount());
- }
-
- /**
- * 连接关闭调用方法
- */
- @OnClose
- public void onClose(@PathParam(value = "serialNumber") String serialNumber) {
- if (StringUtils.isNotBlank(serialNumber)) {
- log.info("WebsocketServer.onClose() is begin, serialNumber is " + serialNumber);
- webSocketMap.remove(serialNumber);
- //在线数减1
- subOnlineCount();
- }
- }
-
- /**
- * 连接异常
- */
- @OnError
- public void onError(@PathParam(value = "serialNumber") String serialNumber, Throwable error) {
- log.info("连接异常: serialNumber is " + serialNumber + ",error is " + error.getMessage());
- }
-
- /**
- * 接收到客户短消息
- */
- @OnMessage
- public void onMessage(@PathParam(value = "serialNumber") String serialNumber, String message) {
- try {
- if (!isOnline(serialNumber)) {
- return ;
- }
- WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;
- HttpUtils.sendPost(websocketServer.callBackUrl, message);
- } catch (Exception e) {
- log.info(e.getMessage());
- return ;
- }
-
- return ;
- }
-
- public RestControllerResult<Boolean> checkExist(String serialNumber) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- if (!isExist(serialNumber)) {
- result.setData(false);
- result.setErrorMsg("请求的链接:" + serialNumber + "不在该服务器上");
- return result;
- }
- result.setData(true);
- return result;
- }
-
- public RestControllerResult<Boolean> checkOnline(String serialNumber) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- if (!isOnline(serialNumber)) {
- result.setData(false);
- result.setErrorMsg("请求的链接:" + serialNumber + "不在线");
- return result;
- }
- result.setData(true);
- return result;
- }
-
-
- public RestControllerResult<Boolean> checkOfflineMessage(String serialNumber) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
-
- if (!isOnline(serialNumber)) {
- result.setData(false);
- result.setErrorMsg("请求的链接:" + serialNumber + "不在线");
- return result;
- }
- List<String> messageList = (List<String>)redisUtil.get(PREX_STRING + serialNumber);
- List<ByteBuffer> binaryList = (List<ByteBuffer>)redisUtil.get(PREX_BINARY + serialNumber);
-
- if ((null == messageList || messageList.size() < 1) && (null == binaryList || binaryList.size() < 1)) {
- result.setData(false);
- } else {
- result.setData(true);
- }
- return result;
- }
-
- public RestControllerResult<Boolean> pullOfflineMessage(String serialNumber, Boolean isAsync, Boolean flag) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- try {
- if (!isOnline(serialNumber)) {
- result.setData(false);
- result.setErrorMsg("请求的链接:" + serialNumber + "不在线");
- return result;
- }
- WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;
-
- List<String> messageList = (List<String>)redisUtil.get(PREX_STRING + serialNumber);
- if (null != messageList && messageList.size() > 0 ) {
- Boolean isSuccess = false;
- if (!isAsync) {
- isSuccess = sendBasicMessage(websocketServer.session, messageList, flag);
- } else {
- isSuccess = sendAsyncMessage(websocketServer.session, messageList);
- }
- if (isSuccess) {
- redisUtil.delete(PREX_STRING + serialNumber);
- }
- }
-
- List<ByteBuffer> binaryList = (List<ByteBuffer>)redisUtil.get(PREX_BINARY + serialNumber);
- if (null != binaryList && binaryList.size() > 0 ) {
- Boolean isSuccess = false;
- if (!isAsync) {
- isSuccess = sendBasicBinaryMessage(websocketServer.session, binaryList, flag);
- } else {
- isSuccess = sendAsyncBinaryMessage(websocketServer.session, binaryList);
- }
- if (isSuccess) {
- redisUtil.delete(PREX_BINARY + serialNumber);
- }
- }
- } catch (IOException e) {
- log.info(e.getMessage());
- result.setData(false);
- result.setErrorMsg(e.getMessage());
- return result;
- }
- return result;
- }
-
- /**
- * 给所有人发消息
- */
- public RestControllerResult<Boolean> sendToAll(WebsocketParamsDto paramsDto) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- //遍历HashMap
- for (String serialNumber : webSocketMap.keySet()) {
- try {
- if (paramsDto.getIsBinary()) {
- sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());
- } else {
- sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());
- }
- } catch (IOException e) {
- log.info(e.getMessage());
-
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
- }
- }
-
- result.setData(true);
- return result;
- }
-
-
-
- /**
- * 给指定的终端发送消息
- */
- public RestControllerResult<Boolean> sendToTerminal(WebsocketParamsDto paramsDto) {
- log.info("WebsocketServer.sendToSingle() is begin, serialNumber is " + paramsDto.getSerialNumber());
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- String toSerialNumber = paramsDto.getSerialNumber();
- try {
- if(!isOnline(toSerialNumber)) {
- result.setData(false);
- result.setErrorMsg("请求的链接:" + toSerialNumber + "不在该服务器上");
- log.info("请求的链接:" + toSerialNumber + "不在该服务器上");
- return result;
- }
-
- WebsocketServer websocketServer = webSocketMap.get(toSerialNumber) ;
- if (paramsDto.getIsBinary()) {
- sendBasicBinaryMessage(websocketServer.session, paramsDto.getBinaryData(), paramsDto.getFlag());
- } else {
- sendBasicMessage(websocketServer.session, paramsDto.getMessage(), paramsDto.getFlag());
- }
- result.setData(true);
- } catch (IOException e) {
- log.info(e.getMessage());
-
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + toSerialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + toSerialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
- result.setData(false);
- result.setErrorMsg("推送消息至链接:" + toSerialNumber + "时系统异常:" + e.getMessage());
- return result;
- }
- return result;
- }
-
- /**
- * 给所有人发消息
- */
- public RestControllerResult<Boolean> sendToTerminalList(WebsocketParamsDto paramsDto) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- List<String> terminalList = paramsDto.getSerialList();
- //遍历HashMap
- for (String serialNumber : terminalList) {
- try {
- if (paramsDto.getIsBinary()) {
- sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());
- } else {
- sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());
- }
- } catch (IOException e) {
- log.info(e.getMessage());
-
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
- }
- }
- result.setData(true);
- return result;
- }
-
- /**
- * 给指定的人发送消息
- */
- public RestControllerResult<Boolean> sendToTerminalAsync(WebsocketParamsDto paramsDto) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- String toSerialNumber = paramsDto.getSerialNumber();
- try {
- if(!isOnline(toSerialNumber)) {
- result.setData(false);
- result.setErrorMsg("请求的链接:" + toSerialNumber + "不在该服务器上");
- log.info("请求的链接:" + toSerialNumber + "不在该服务器上");
- return result;
- }
-
- WebsocketServer websocketServer = webSocketMap.get(toSerialNumber) ;
- if (paramsDto.getIsBinary()) {
- sendAsyncBinaryMessage(websocketServer.session, paramsDto.getBinaryData());
- } else {
- sendAsyncMessage(websocketServer.session, paramsDto.getMessage());
- }
- result.setData(true);
- } catch (IOException e) {
- log.info(e.getMessage());
-
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + toSerialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + toSerialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
-
- result.setData(false);
- result.setErrorMsg("推送消息至链接:" + toSerialNumber + "时系统异常:" + e.getMessage());
- return result;
- }
- return result;
- }
-
- /**
- * 给所有人发消息
- */
- public RestControllerResult<Boolean> sendToTerminalListAsync(WebsocketParamsDto paramsDto) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- List<String> terminalList = paramsDto.getSerialList();
- for (String serialNumber : terminalList) {
- try {
- if (paramsDto.getIsBinary()) {
- sendAsyncBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData());
- } else {
- sendAsyncMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage());
- }
- } catch (IOException e) {
- log.info(e.getMessage());
-
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
- }
- }
- result.setData(true);
- return result;
- }
-
- /**
- * 给所有人发消息
- */
- public RestControllerResult<Boolean> sendToAllAsync(WebsocketParamsDto paramsDto) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- for (String serialNumber : webSocketMap.keySet()) {
- try {
- if (paramsDto.getIsBinary()) {
- sendAsyncBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData());
- } else {
- sendAsyncMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage());
- }
- } catch (IOException e) {
- log.info(e.getMessage());
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
- }
- }
- result.setData(true);
- logger.info("给所有人发消息结果: {}", result);
- return result;
- }
-
- public static Map<String, WebsocketServer> getWebSocketMap() {
- return webSocketMap ;
- }
-
- /**
- * 同步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
- */
- public void sendBasicMessage(Session session, String message, Boolean flag) throws IOException {
- log.info("WebsocketServer.sendBasicMessage() is begin, flag is " + flag + " ,message is " + message);
- session.getBasicRemote().sendText(message, flag);
- log.info("WebsocketServer.sendBasicMessage() is end. ");
- }
-
- /**
- * 同步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
- */
- public void sendBasicBinaryMessage(Session session, ByteBuffer message, Boolean flag) throws IOException {
- log.info("WebsocketServer.sendBasicMessage() is begin, flag is " + flag + " ,message is " + message);
- session.getBasicRemote().sendBinary(message, flag);
- log.info("WebsocketServer.sendBasicMessage() is end. ");
- }
-
- /**
- * 异步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息
- */
- public void sendAsyncMessage(Session session, String message) throws IOException {
- log.info("WebsocketServer.sendAsyncMessage() is begin, message is " + message);
- session.getAsyncRemote().sendText(message);
- log.info("WebsocketServer.sendAsyncMessage() is end. ");
-
- }
-
- /**
- * 异步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息
- */
- public void sendAsyncBinaryMessage(Session session, ByteBuffer message) throws IOException {
- log.info("WebsocketServer.sendAsyncMessage() is begin, message is " + message);
- session.getAsyncRemote().sendBinary(message);
- log.info("WebsocketServer.sendAsyncMessage() is end. ");
-
- }
-
- /**
- * 同步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
- */
- public Boolean sendBasicMessage(Session session, List<String> messageList, Boolean flag) throws IOException {
- if (null == messageList || messageList.size() < 1) {
- return true;
- }
- for (int i = 0; i < messageList.size(); i++) {
- session.getBasicRemote().sendText(messageList.get(i), flag);
- }
- return true;
- }
-
- /**
- * 异步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息
- */
- public Boolean sendAsyncMessage(Session session, List<String> messageList) throws IOException {
- if (null == messageList || messageList.size() < 1) {
- return true;
- }
- for (int i = 0; i < messageList.size(); i++) {
- session.getAsyncRemote().sendText(messageList.get(i));
- }
- return true;
- }
-
- /**
- * 同步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息(true:可部分发送消息; false:一次性发布全部消息)
- */
- public Boolean sendBasicBinaryMessage(Session session, List<ByteBuffer> messageList, Boolean flag) throws IOException {
- if (null == messageList || messageList.size() < 1) {
- return true;
- }
- for (int i = 0; i < messageList.size(); i++) {
- session.getBasicRemote().sendBinary(messageList.get(i), flag);
- }
- return true;
- }
-
- /**
- * 异步发送消息模式。
- * message: 待发送的消息
- * flag: 是否支持发送部分消息
- */
- public Boolean sendAsyncBinaryMessage(Session session, List<ByteBuffer> messageList) throws IOException {
- if (null == messageList || messageList.size() < 1) {
- return true;
- }
- for (int i = 0; i < messageList.size(); i++) {
- session.getAsyncRemote().sendBinary(messageList.get(i));
- }
- return true;
- }
-
- /**
- * 获取当前时间
- *
- * @return
- */
- private String getNowTime() {
- Date date = new Date();
- DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time = format.format(date);
- return time;
- }
-
- public static synchronized int getOnlineCount() {
- return onlineCount;
- }
-
- public static synchronized void addOnlineCount() {
- WebsocketServer.onlineCount++;
- }
-
- public static synchronized void subOnlineCount() {
- WebsocketServer.onlineCount--;
- }
-
- public Boolean isExist(String serialNumber) {
- if(StringUtils.isBlank(serialNumber) || !webSocketMap.containsKey(serialNumber)){
- return false;
- }
- return true;
- }
-
- public Boolean isOnline(String serialNumber) {
- try {
- if(StringUtils.isBlank(serialNumber) || !webSocketMap.containsKey(serialNumber)){
- return false;
- }
- WebsocketServer websocketServer = webSocketMap.get(serialNumber) ;
- if (websocketServer == null ) {
- return false;
- }
- } catch (Exception e){
- log.error(e.getMessage());
- return false;
- }
- return true;
- }
- }

- package com.service.websocket.dto;
-
- import io.swagger.annotations.ApiModel;
- import io.swagger.annotations.ApiModelProperty;
- import java.nio.ByteBuffer;
- import java.util.List;
- import lombok.Data;
- import org.w3c.dom.Text;
-
- /**
- * Websocket参数 DTO.
- *
- * @author ****
- */
- @Data
- @ApiModel(value = "WebsocketParamsDto", description = "test")
- public class WebsocketParamsDto {
-
- @ApiModelProperty(value = "序列号", name = "serialNumber", example = "key.12345")
- private String serialNumber;
-
- @ApiModelProperty(value = "键值", name = "serialList", example = "key.12345")
- private List<String> serialList;
-
- @ApiModelProperty(value = "信息类型:true 异步;false 同步", name = "dataType", example = "1/空:Sring;2-binary;3-text;")
- private Boolean isBinary;
-
- @ApiModelProperty(value = "信息", name = "message", example = "待发送信息")
- private String message;
-
- @ApiModelProperty(value = "二进制数据", name = "binaryData", example = "01010101....")
- private ByteBuffer binaryData;
-
- @ApiModelProperty(value = "Text数据", name = "textData", example = "key.67890")
- private Text textData;
-
- @ApiModelProperty(value = "模式", name = "同步false/异步模式true", example = "true")
- private Boolean isAsync;
-
- @ApiModelProperty(value = "传送标识", name = "flag", example = "true--一次性发送全部信息;false--可部分发送信息;")
- private Boolean flag;
- }

5、业务调用
- /**
- * 给所有人发消息
- */
- public RestControllerResult<Boolean> sendToAll(WebsocketParamsDto paramsDto) {
- RestControllerResult<Boolean> result = new RestControllerResult<Boolean>();
- //遍历HashMap
- for (String serialNumber : webSocketMap.keySet()) {
- try {
- if (paramsDto.getIsBinary()) {
- sendBasicBinaryMessage(webSocketMap.get(serialNumber).session, paramsDto.getBinaryData(), paramsDto.getFlag());
- } else {
- sendBasicMessage(webSocketMap.get(serialNumber).session, paramsDto.getMessage(), paramsDto.getFlag());
- }
- } catch (IOException e) {
- log.info(e.getMessage());
-
- if (paramsDto.getIsBinary()) {
- redisUtil.addToListRightExpire(PREX_STRING + serialNumber, EXPIRE_TIME, paramsDto.getMessage());
- } else {
- redisUtil.addToListRightExpire(PREX_BINARY + serialNumber, EXPIRE_TIME, paramsDto.getBinaryData());
- }
- }
- }
-
- result.setData(true);
- return result;
- }

- @Resource
- private WebsocketServer webSocketServer;
-
- @Override
- public RestControllerResult<Boolean>
- sendToAll(@RequestBody WebsocketParamsDto paramsDto) {
- return webSocketServer.sendToAll(paramsDto);
- }
测试URL:Websocket在线测试-Websocket接口测试-Websocket模拟请求工具 (jsons.cn)
通过 postman 测试工具调用接口,或者其他工具
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。