赞
踩
吞吐量大的 spring-5-reactive-websockets
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.7</version> <relativePath/> </parent> <groupId>com.test</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!--starter-websocket--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build>
package com.test.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** * 注入ServerEndpointExporter, * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
package com.test.websocket; import org.springframework.util.StringUtils; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; public class CustomConfigurator extends ServerEndpointConfig.Configurator { @Override public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) { // 获取请求头中的 token 信息 List<String> tokenList = request.getHeaders().get("Authorization"); if (tokenList != null && !tokenList.isEmpty()) { String token = tokenList.get(0); config.getUserProperties().put("Authorization", token); } //先从queryParam获取用户id /ws/websocket?sn=xxxx String query = request.getQueryString(); Map<String, String> queryParams = new HashMap<>(); if (query != null) { for (String param : query.split("&")) { String[] entry = param.split("="); if (entry.length > 1) { queryParams.put(entry[0], entry[1]); } } } String userId = queryParams.get("sn"); //再从路径取用户id /ws/webSocket/xxxxx 中的 xxxxx 作为标识(这步可以不用,WebSocketServer的open方法可以直接取) if (StringUtils.isEmpty(userId)) { URI requestURI = request.getRequestURI(); String uriStr = requestURI.getPath(); if (uriStr.contains("/ws/websocket")) { userId = uriStr.substring(uriStr.lastIndexOf("/") + 1); } } //将用户sn放入配置类 config.getUserProperties().put("sn", userId); } }
package com.test.websocket; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.net.URI; import java.util.Enumeration; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket服务类 */ @Component @Slf4j @ServerEndpoint(value = "/ws/websocket/{userId}", configurator = CustomConfigurator.class) public class WebSocketServer { /** * 心跳消息 */ private final static String PING = "ping"; private final static String PONG = "pong"; /** * 存放每个客户端对应的 WebSocketServer 对象 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收 userId */ private String userId = ""; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { //获取用户id if (StringUtils.isEmpty(userId)) { userId = (String) session.getUserProperties().get("sn"); } if (StringUtils.isEmpty(userId)) { URI requestURI = session.getRequestURI(); log.info("请求连接未传用户id,请求路径:{}", requestURI.getPath()); return; } //获取token并校验 String token = (String) session.getUserProperties().get("Authorization"); log.info("===>>>WebSocketServer.open token:{}", token); this.session = session; this.userId = userId; webSocketMap.put(userId, this); log.info("新用户上线:" + userId + ", 当前在线人数为:" + getOnlineCount()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if (!webSocketMap.containsKey(userId)) { return; } webSocketMap.remove(userId); log.info("用户下线:" + userId + ", 当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 */ @OnMessage public void onMessage(String message, Session session) { log.info("===>>>接收到用户:{} 发送的消息:{}", userId, message); if (PING.equals(message)) { try { this.sendMessage(PONG); } catch (IOException e) { e.printStackTrace(); } } } /** * 发生错误时调用 */ @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); } /** * 实现服务器主动推送 */ private void sendMessage(String message) throws IOException { log.info("===>>>向用户:{} 发送消息:{}", this.userId, message); this.session.getBasicRemote().sendText(message); } /** * 群发消息 */ public static void sendMessageToAll(String message) throws IOException { for (Map.Entry<String, WebSocketServer> entry : webSocketMap.entrySet()) { WebSocketServer webSocketServer = entry.getValue(); webSocketServer.sendMessage(message); } } /** * 单发消息 */ public static void sendMessageToUser(String toUserId, String message) throws IOException { if (webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(message); } else { log.error("请求的 userId:" + toUserId + "不在该服务器上"); } } /** * 在线用户 */ public static Set<String> getOnlineUsers() { Set<String> set = new HashSet<>(); Enumeration<String> enumeration = webSocketMap.keys(); while (enumeration.hasMoreElements()) { set.add(enumeration.nextElement()); } return set; } /** * 获取在线人数 */ public static int getOnlineCount() { return webSocketMap.size(); } /** * 用户是否在线 */ public static Boolean isOnline(String userId) { return webSocketMap.containsKey(userId); } }
F12打开浏览器控制台,在console面板粘贴下面内容即连接成功
(如果不能粘贴根据提示输入allow paste, 第一个单独复制等连接后,后面的再一起复制进控制台)
ws = new WebSocket('ws://localhost:8080/ws/user1');
ws.onopen=function(data){console.log(data)};
ws.onmessage=function(data){console.log(data)};
ws.onclose=function(data){console.log(data)};
ws.onerror=function(data){console.log(data)};
ws.send('msg');
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> Netty WebSocket </head> <br/> <body> <br> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost/ws/websocket/11111"); socket.onmessage = function (event) { console.log("xxx" + event); var ta = document.getElementById('responseText'); ta.value = ta.value + '\n' + event.data; }; socket.onopen = function (event) { var ta = document.getElementById('responseText'); ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!"; }; socket.onclose = function (event) { var ta = document.getElementById('responseText'); ta.value = "WebSocket 关闭!"; } } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); }else { alert("WebSocket连接没有建立成功!"); } } </script> <form onsubmit="return false;"> <input type="text" name="message" value="输入要发送的消息"> <br/><br/> <input type="button" value="发送消息" onclick="send(this.form.message.value)"> <hr color="blue"/> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style="width:500px; height: 300px;"></textarea> </form> </body> </html>
http://www.easyswoole.com/wstool.html
http://www.websocket-test.com/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。