赞
踩
关于websocket做一次全面的总结。
SpringBoot添加maven依赖
- <!-- 引入 websocket 依赖类-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
- </dependency>
-
- <!-- redisson -->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>2.15.1</version>
- </dependency>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- /**
- * @author
- */
- @Configuration
- public class WebSocketConfig {
-
- @Bean
- public ServerEndpointExporter serverEndpointExporter () {
- return new ServerEndpointExporter();
- }
- }
存储WebSocket Session
- mport javax.websocket.Session;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * @author
- */
- public class WebSocketBean {
- /**
- * 连接session对象
- */
- private Session session;
-
- /**
- * 连接错误次数
- */
- private AtomicInteger erroerLinkCount = new AtomicInteger(0);
-
- public int getErroerLinkCount() {
- // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
- return erroerLinkCount.getAndIncrement();
- }
-
- public void cleanErrorNum() {
- // 清空计数
- erroerLinkCount = new AtomicInteger(0);
- }
-
- public Session getSession() {
- return session;
- }
-
- public void setSession(Session session) {
- this.session = session;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
发送消息的实体类(群发)我是用2个类去做的可以根据自己的需求来。
- **
- * 推送全部
- * @author
- */
- public class SendMsgAll {
- /**
- * websocket业务数据(json)
- */
- private String msg;
-
- /**
- *业务模块类型
- */
- private String type;
-
- /**
- * 项目ID
- */
- private String projectId;
-
-
- public String getMsg() {
- return msg;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getProjectId() {
- return projectId;
- }
-
- public void setProjectId(String projectId) {
- this.projectId = projectId;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
单发
- /**
- * 按用户推送
- * @author
- */
- public class SendMsg extends SendMsgAll{
- /**
- * 用户ID
- */
- private String userId;
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
WebSocket server
- import com.app.domain.websocket.model.WebSocketBean;
- import com.app.domain.websocket.service.WebsocketEndpoint;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
-
- /**
- * @author
- */
- @Service
- @ServerEndpoint(value = "/web/ws/{projectId}/{userId}")
- @Component
- public class WebsocketEndpointImpl implements WebsocketEndpoint {
-
- private static Logger log = LoggerFactory.getLogger(WebsocketEndpointImpl.class);
- /**
- * 错误最大重试次数
- */
- private static final int MAX_ERROR_NUM = 3;
-
- /**
- * 用来存放每个客户端对应的webSocket对象。
- */
- private static Map<String, Map<String, WebSocketBean>> webSocketInfo;
-
- static {
- // concurrent包的线程安全map
- webSocketInfo = new ConcurrentHashMap<String, Map<String, WebSocketBean>>();
- }
-
-
- @OnOpen
- public void onOpen(Session session, EndpointConfig config, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
- WebSocketBean bean = new WebSocketBean();
- bean.setSession(session);
- Map<String,WebSocketBean> concurrentHashMap = new ConcurrentHashMap();
- concurrentHashMap.put(userId,bean);
- webSocketInfo.put(projectId, concurrentHashMap);
- log.info("ws项目:"+projectId+",客户端连接服务器userId :" + userId + "当前连接数:" + countUser(projectId));
- }
-
- @OnClose
- public void onClose(Session session, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
- // 客户端断开连接移除websocket对象
- Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
- if(concurrentHashMap != null){concurrentHashMap.remove(userId);}
- log.info("ws项目:"+projectId+",客户端断开连接,当前连接数:" + countUser(projectId));
-
- }
-
- @OnMessage
- public void onMessage(Session session, String message, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
- log.info("ws项目:"+projectId+",客户端 userId: " + userId + ",消息:" + message);
- }
-
- @OnError
- public void onError(Session session, Throwable throwable) {
- // log.error("ws发生错误" + throwable.getMessage(), throwable);
- }
-
- public void sendMessage(Session session, String message, String projectId, String userId) {
-
- log.info("ws项目:"+projectId+",连接数:"+countUser(projectId)+",发送消息 " + session);
- try {
- // 发送消息
- synchronized (session) {
- if (session.isOpen()) {
- session.getBasicRemote().sendText(message);
- }
- }
- // 清空错误计数
- this.cleanErrorNum(projectId, userId);
- } catch (Exception e) {
- log.error("ws项目:"+projectId+",用户:"+userId+",发送消息失败" + e.getMessage(), e);
- int errorNum = this.getErroerLinkCount(projectId, userId);
-
- // 小于最大重试次数重发
- if (errorNum <= MAX_ERROR_NUM) {
- sendMessage(session, message, projectId, userId);
- } else {
- log.error("ws发送消息失败超过最大次数");
- // 清空错误计数
- this.cleanErrorNum(projectId, userId);
- }
- }
- }
-
- @Override
- public void batchSendMessage(String projectId,String message) {
- Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
- if(concurrentHashMap != null){
- Set<Map.Entry<String, WebSocketBean>> set = concurrentHashMap.entrySet();
- for(Map.Entry<String, WebSocketBean> map: set ){
- sendMessage(map.getValue().getSession(), message,projectId, map.getKey());
- }
- }
- }
-
- @Override
- public void sendMessageById(String projectId,String userId, String message) {
- Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
- if(concurrentHashMap != null){
- WebSocketBean webSocketBean = concurrentHashMap.get(userId);
- if (webSocketBean != null) {
- sendMessage(webSocketBean.getSession(), message, projectId,userId);
- }
- }
- }
-
- /**
- * 清空错误计数
- */
- private void cleanErrorNum(String projectId, String userId){
- Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
- if(concurrentHashMap != null){
- WebSocketBean webSocketBean = concurrentHashMap.get(userId);
- if (webSocketBean != null) {
- webSocketBean.cleanErrorNum();
- }
- }
- }
-
- /**
- * 获取错误计数
- */
- private int getErroerLinkCount(String projectId, String userId){
- int errorNum = 0;
- Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
- if(concurrentHashMap != null){
- WebSocketBean webSocketBean = concurrentHashMap.get(userId);
- if (webSocketBean != null) {
- errorNum = webSocketBean.getErroerLinkCount();
- }
- }
- return errorNum;
- }
-
- private Integer countUser (String projectId){
- int size = 0;
- Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
- if(concurrentHashMap != null) {
- size = concurrentHashMap.size();
- }
- return size;
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
webSocketInfo这Map你们根据需求设计一定要才用线程安全的Map,AtomicInteger这个计数器也是线程安全的。
- /**
- * 给客户端发送消息
- * @author
- */
- public interface WebsocketEndpoint {
-
-
- /**
- * 向所有在线用户群发消息
- * @param projectId 项目ID
- * @param message 发送给客户端的消息
- */
- void batchSendMessage(String projectId,String message);
-
- /**
- * 发送给对应的用户
- * @param userId 用户的ID
- * @param projectId 项目ID
- * @param message 发送的消息
- */
- void sendMessageById(String projectId,String userId, String message);
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
redis订阅/推送
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.listener.PatternTopic;
- import org.springframework.data.redis.listener.RedisMessageListenerContainer;
- import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
-
- /**
- * @author
- */
- @Configuration
- public class RedisMessageListenerConfig {
- @Autowired
- private RedisReceiver redisReceiver;
-
- /**
- * 监听redis中的订阅信息
- * @param redisConnectionFactory
- * @return
- */
- @Bean
- public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
- RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
- redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
- //添加redis消息队列监听,监听im-topic消息主题的消息,使用messageListenerAdapter()中设置的类和方法处理消息。
- redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic("ptp-topic"));
- //同上一样
- redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic("mh-topic"));
- return redisMessageListenerContainer;
- }
-
- /**
- * 添加订阅消息处理类,通过反射获取处理类中的处理方法
- * 即使用RedisReceiver类中的sendMsg方法处理消息
- * @return
- */
- @Bean
- public MessageListenerAdapter messageListenerAdapter() {
- return new MessageListenerAdapter(redisReceiver, "sendMsg");
- }
-
- @Bean
- public MessageListenerAdapter messageAllListenerAdapter(){
- return new MessageListenerAdapter(redisReceiver, "sendAllMsg");
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
处理从redis中取出来的消息
- import com.alibaba.fastjson.JSONObject;
- import com.app.domain.websocket.model.SendMsg;
- import com.app.domain.websocket.model.SendMsgAll;
- import com.app.domain.websocket.service.WebsocketEndpoint;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- /**
- * 处理订阅redis的消息
- * @author
- */
- @Component
- public class RedisReceiver {
-
- @Resource
- WebsocketEndpoint websocketEndpoint;
-
- /**
- * 处理一对一消息
- * @param message 消息队列中的消息
- */
- public void sendMsg(String message) {
- SendMsg msg = JSONObject.parseObject(message, SendMsg.class);
- websocketEndpoint.sendMessageById(msg.getProjectId(),msg.getUserId(),msg.getMsg());
- }
-
- /**
- * 处理广播消息
- * @param message
- */
- public void sendAllMsg(String message){
- SendMsgAll msg = JSONObject.parseObject(message, SendMsgAll.class);
- websocketEndpoint.batchSendMessage(msg.getProjectId(),msg.getMsg());
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
还有关于项目中需要推送的消息存入队列中我也提供了公共的方法。实际操作按需求来。
- /**
- * 往Redis中存入消息
- * @author
- */
- public interface WebsocketService {
-
-
-
- /**
- * 向所有在线用户群发消息
- * @param message 发送给客户端的消息
- */
- void sendMessageAll(String projectId,String message);
-
- /**
- * 发送给对应的用户
- * @param userId 用户的ID
- * @param message 发送的消息
- */
- void sendMessageById(String projectId,String userId, String message);
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
使用这个convertAndSend方法加入队列会被监听到
- import com.alibaba.fastjson.JSON;
- import com.app.domain.websocket.model.SendMsg;
- import com.app.domain.websocket.model.SendMsgAll;
- import com.app.domain.websocket.service.WebsocketService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Service;
-
- /**
- * @author
- */
- @Service
- public class WebSocketServerImpl implements WebsocketService {
-
- @Autowired
- RedisTemplate<String,String> redisTemplate;
-
- @Override
- public void sendMessageAll(String projectId ,String message) {
- SendMsgAll sendMsgAll = new SendMsgAll();
- sendMsgAll.setProjectId(projectId);
- sendMsgAll.setMsg(message);
- redisTemplate.convertAndSend("mh-topic", JSON.toJSONString(sendMsgAll));
-
- }
-
- @Override
- public void sendMessageById(String projectId,String userId, String message) {
- SendMsg sendMsg = new SendMsg();
- sendMsg.setProjectId(projectId);
- sendMsg.setUserId(userId);
- sendMsg.setMsg(message);
- redisTemplate.convertAndSend("ptp-topic",JSON.toJSONString(sendMsg));
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
后端到这里代码就结束了
- <!DOCTYPE html>
- <html>
- <head>
- <meta charset="utf-8">
- <title>websocket通讯</title>
- </head>
- <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
- <script>
- var socket;
- function openSocket() {
- if(typeof(WebSocket) == "undefined") {
- console.log("您的浏览器不支持WebSocket");
- }else{
- console.log("您的浏览器支持WebSocket");
- //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
- //url注意要加上自己的项目发布名称
- var socketUrl="http://localhost:8080/demo/web/ws/projectId/"+$("#userId").val();
- socketUrl=socketUrl.replace("https","wss").replace("http","ws");
- console.log(socketUrl)
- socket = new WebSocket(socketUrl);
- //打开事件
- socket.onopen = function() {
- console.log("websocket已打开");
- //socket.send("这是来自客户端的消息" + location.href + new Date());
- };
- //获得消息事件
- socket.onmessage = function(msg) {
- console.log(msg.data);
- };
- //关闭事件
- socket.onclose = function() {
- console.log("websocket已关闭");
- };
- //发生了错误事件
- socket.onerror = function() {
- console.log("websocket发生了错误");
- }
- }
- }
- function sendMessage() {
- if(typeof(WebSocket) == "undefined") {
- console.log("您的浏览器不支持WebSocket");
- }else {
- console.log("您的浏览器支持WebSocket");
- console.log('[{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}]');
- socket.send('[{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}]');
- }
- }
- </script>
- <body>
- <p>【userId】:<div><input id="userId" name="userId" type="text" value="25"></div>
- <p>【Message】:<div><input id="contentText" name="contentText" type="text" value="客户端"></div>
- <p>【操作】:<div><a onclick="openSocket()"></a>openSocket</div>
- <p>【操作】:<div><a onclick="sendMessage()">sendMessage</a></div>
- </body>
-
- </html>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
第二种方法我就不去实现了(如果有人想看给我留言,和第一种比要复杂一点。还没有第一种好用。)
- location /chat/ {
- proxy_pass http://backend;
- proxy_http_version 1.1;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "upgrade";
- }
原因:使用了nginx服务,nginx配置:proxy_read_timeout(Default: 60s;),如果一直没有数据传输,连接会在过了这个时间之后自动关闭
解决方法:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。