赞
踩
1.单播(Unicast) :点对点,私信私聊
2.多播,也叫组播(Multicast)(特地人群): 多人聊天,发布订阅
3.广播(Broadcast)(所有人): 游戏公告,发布订阅
1.依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.定义两个消息发送的实体类
/** * 接收的消息 * from:消息的来源(一般为发送用户id或者sessionid) 是这条消息的标识 通过这个标志可以找到对应的用户(如果是1v1聊天 这个是找到消息发送者的标志) * to:消息的部分目的地(一般为接收用户id或者sessionid) * 如果消息发是广播和多播 那前端可以忽略该字段 直接在发送时指定一个固定路由(如:群聊、推送实时消息等) * 如果消息发是单播 就要通过to后的标识找到要发送的目的地 再进行拼接 * */ public class InMessage { //从哪里来 private String from; //到哪里去(单播必用) private String to; private String content; private Date time = new Date(); } /* * 发送的消息 * from:前端拿到发送的可以做一些事 * 没有to字段 因为已经发送到接收者 所以没有意义 */ public class OutMessage { private String from; private String content; private Date time = new Date(); }
3.消息接收的方法
@MessageMapping("/v1/chat")
4.WebSocket的两种推送方式
@SendTo:
( 不通用,一个路由接收的消息 因为注解只可以写一个路由 所以只能把该方法的全部消息都发到注解写的一个路径)
@MessageMapping("/v1/chat")
@SendTo("/topic/game_chat")
public OutMessage gameInfo(InMessage message){
return new OutMessage(message.getFrom(),message.getContent());
}
SimpMessagingTemplate:
(灵活,支持多种发送方式(建一个WebSocketService 在类里面写不同场景的发送数据和路由)
消息发送模板:
@Service public class WebSocketService { @Autowired private SimpMessagingTemplate template; /* * 简单的指定消息到目的地:广播、单播 * * @param dest:目的地的路径 * @param message:给OutMessage提供content和from */ public void sendTopicMessage(String dest, InMessage message) throws InterruptedException { template.convertAndSend(dest, new OutMessage(message.getContent()); } /* * 点对点发送消息:给统一的路径后面再加上user的路径 */ public void sendChatMessage(InMessage message) { //发送路径除了固定的路径 还拼接了特定的接收用户的标识(一般为用户id) //且每个用户单播的情况下 订阅的路径下也是加上含自己id的路径 template.convertAndSend("/chat/single/" + message.getTo() ,new OutMessage(message.getFrom() + " 发送:" + message.getContent())); } /* * 心跳检测 直接给源路径返回"pang"作标志 */ public void sendPong(InMessage message) { template.convertAndSend(message.getTo()); } }
接收消息并转给WebSocketService
@Autowired
private WebSocketService ws;
@MessageMapping("/v3/chat")
public void gameInfo(InMessage message) throws InterruptedException{
ws.sendTopicMessage("/topic/game_rank",message);
}
@MessageMapping("/v3/check")
public void gameInfo(InMessage message) throws InterruptedException{
ws.sendPong(message);//心跳检测 给前端返回
}
注意:
客户端 -> 服务器(服务器订阅):不写config里setApplicationDestinationPrefixes配置的前缀
客户端 -> 服务器(客户端发送):写config里setApplicationDestinationPrefixes配置的前缀
服务器 -> 客户端(服务器发送):写config里enableSimpleBroker配置的前缀
服务器 -> 客户端(客户端订阅):写config里enableSimpleBroker配置的前缀
5.HandshakeInterceptor 握手拦截器
(只会在第一次连接时执行一次)
主要作用:
使用:
/** * 功能描述:http握手拦截器,最早执行 * 可以通过这个类的方法获取resuest,和response 给后面使用 */ public class HttpHandShakeIntecepter implements HandshakeInterceptor{ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { /* * 拦截方法一;获得sessionid 然后判断在线的sessionid是否有该sessionid * * 先要判断是否有session存在 然后获取sessionid(适用于前后端不分离或不跨域或安卓客户端) * if(request instanceof ServletServerHttpRequest) { * //强转为ServletServerHttpRequest * ServletServerHttpRequest servletRequest = (ServletServerHttpRequest)request; * HttpSession session = servletRequest.getServletRequest().getSession(); * * String sessionId = session.getId(); * //...进行校验 * } */ /* * 拦截方法二:取出cookie里的tocken解析出用户 该token是jwt加密的 * 如何使用jwt请参考我的博客直接使用 https://blog.csdn.net/weixin_43934607/article/details/101356581 */ HttpServletRequest request= servletRequest.getServletRequest(); String token= CookieUtils.getCookieValue(request,"Cookie名称"); UserInfo user = JwtUtils.getInfoFromToken(token, publicKey); String userid= user.getId(); //如果userid为空 或者redis中不存在userid 返回未授权错误码(redis自行整合 在登陆时存入redis) if (StringUtils.isBlank(userid) || !redisTemplate.haskey(userid)) { response.setStatusCode( HttpStatus.UNAUTHORIZED ); return false; } //把该usrid存入stompAccessHeader 供DisConnect监听器断开连接时从redis中找到对应的并把online设置为false 如果5分钟没没有更新为true就在redis中删掉该用户 //在断开连接的监听器通过stompHeaderAccessor.getSessionAttributes().get("userid")取得 attributes.put("userid", userid); //把username存入attributes 方便在下面的config中取出后设置成Principal 当有异常发送时用@SendToUser 不然不知道目的地 attributes.put("userName", user.getName()); //如果是掉线重新设为true if(!redisTemplate.get(userid)){ redisTemplate.set(userid,true); } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { //该方法一般没有 所以不实现 } }
6.websocket监听器
websocket模块监听器类型:
SessionSubscribeEvent 订阅事件
SessionUnsubscribeEvent 取消订阅事件
SessionConnectEvent 连接事件
SessionDisconnectEvent 断开连接事件( 注意,对于单个会话,此事件可能会多次引发,因此事件使用时应该是满足幂等的并忽略重复事件)
使用
需要监听器类需要实现接口ApplicationListener T表示事件类型,下列几种都是对应的websocket事件类型
在监听器类上注解 @Component,spring会把改类纳入管理(不用再config配置文件里设置)
StompHeaderAccessor
简单消息传递协议中处理消息头的基类 (因为规范协议用stom进行包装) 通过这个类,可以获取消息类型(例如:发布订阅,建立连接断开连接),会话id等
StompHeaderAccessor webSocketheaderAccessor = StompHeaderAccessor.wrap(Message);
/** * 测试 */ @Component public class SubscribeEventListener implements ApplicationListener<SessionSubscribeEvent>{ /** * 在事件触发的时候调用这个方法 */ public void onApplicationEvent(SessionSubscribeEvent event) { //通过event获得Message StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage()); System.out.println("【SubscribeEventListener监听器事件 类型】"+headerAccessor.getCommand().getMessageType()); //必须在HandshakeInterceptor拦截之后放入了sessionid才可以取到 System.out.println("【SubscribeEventListener监听器事件 sessionId】"+headerAccessor.getSessionAttributes().get("sessionId")); } }
/**
* 在用户断开连接的时候触发 一般会使用到该监听器
*/
@Component
public class SubscribeEventListener implements ApplicationListener<SessionDisconnectEvent>{
public void onApplicationEvent(SessionSubscribeEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
//获得上面拦截器放进去的userId
String userId=headerAccessor.getSessionAttributes().get("userId"));
//在redis中把用户状态设置下线 30分钟没上线就删除
redisTemplate.set(userid,true,30, TimeUnit.MINUTES);
}
}
7.spring channel拦截器(已过时)
public class SocketChannelIntecepter extends ChannelInterceptorAdapter{ /** * 在完成发送之后进行调用,不管是否有异常发生,一般用于资源清理 */ @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { super.afterSendCompletion(message, channel, sent, ex); } /** * 在消息被实际发送到频道之前调用 * 可以用作登陆验证 */ @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); //1. 判断是否首次连接请求 if (StompCommand.CONNECT.equals(accessor.getCommand())) { /* 2. 验证是否登录 * accessor.getNativeHeader()获得的数据时前端传来的求请求头 * * var headers={ * username:$("#username").val(), * password:$("#password").val() * }; * stompClient.connect(headers, function (frame) { * stompClient.subscribe('/topic/demo/test', function (result) { * }); * }); */ String username = accessor.getNativeHeader("username").get(0); String password = accessor.getNativeHeader("password").get(0); //如果验证失败就return null 那么消息将不会被服务器接收 if(false){ return null; } } //不是首次连接,已经成功登陆 return message; } /** * 发送消息调用后立即调用 一般用于监听上下线 */ @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message);//消息头访问器 if (headerAccessor.getCommand() == null ) return ;// 避免非stomp消息类型,例如心跳检测 String sessionId = headerAccessor.getSessionAttributes().get("sessionId").toString();//拿到sessionid 后面可能要用 switch (headerAccessor.getCommand()) { case CONNECT: //连接成功后操作... break; case DISCONNECT: //断开连接的操作... break; case SUBSCRIBE: break; case UNSUBSCRIBE: break; default: break; } } }
8.配置文件
@Configuration //开启对websocket的支持,使用stomp协议传输代理消息, // 这时控制器使用@MessageMapping和@RequestMaping一样 @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { /** * Endpoint:注册端点,发布或者订阅消息的时候需要连接此端点 * Interceptors:前面的握手拦截器 * AllowedOrigins:非必须,*表示允许其他域进行连接 * withSockJS:表示开始sockejs支持 */ public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/endpoint-websocket").addInterceptors(new HttpHandShakeIntecepter()) .setHandshakeHandler(new DefaultHandshakeHandler(){ @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { //设置认证用户 从拦截器添加的userName取出 return new Principal(attributes.get("userName")); } }) /* * 因为Stomp是对websocket的实现 是异步发送 如果有异常 发送者将无法感知 设置完这个后在发送消息异常时 就会调用下面的接收器 把然后把该异常可以返回给消息的发送者 让前端知道发送异常并告知 * @MessageExceptionHandler(Exception.class) * @SendToUser("/error/quene") //会发送到DefaultHandshakeHandler设置的Principal: /Principal/error/quene * public Exception handleExceptions(Exception t){ * t.printStackTrace(); * return t; * } */ .setAllowedOrigins("*") .withSocketJs() } /** * 配置消息代理(中介) * enableSimpleBroker 服务端推送给客户端的路径前缀 * setApplicationDestinationPrefixes 客户端发送数据给服务器端的一个前缀 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/chat", "/check"); registry.setApplicationDestinationPrefixes("/app"); } /** * 消息传输参数配置 */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { registry.setMessageSizeLimit(8192) //设置消息字节数大小 .setSendBufferSizeLimit(8192)//设置消息缓存大小 .setSendTimeLimit(10000); //设置消息发送时间限制毫秒 } /** * 配置客户端入站通道拦截器 */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(4) //设置消息输入通道的线程池线程数 .maxPoolSize(8)//最大线程数 .keepAliveSeconds(60);//线程最大空闲时间 //前面配置的spring channel拦截器 过时不建议再使用 registration.interceptors( new SocketChannelIntecepter()); } /** * 配置客户端出站通道拦截器 */ @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(4).maxPoolSize(8); //前面配置的spring channel拦截器 过时不建议再使用 registration.interceptors( new SocketChannelIntecepter()); } }
1.依赖
<dependency> <groupId>org.webjars</groupId> <artifactId>sockjs-client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>stomp-websocket</artifactId> <version>2.3.3-1</version> </dependency> <-- 如果要加入jquery --> <!-- <dependency>--> <!-- <groupId>org.webjars</groupId>--> <!-- <artifactId>jquery</artifactId>--> <!-- <version>3.1.0</version>--> <!-- </dependency>-->
2.核心代码(包括心跳重连实现)
//写成全局 所有操作都要用到 var stompClient = null; var tryTimes=0;//重连次数 function connect() { //让websocket连接的端点 var socket = new SockJS('http://localhost:8080/endpoint-websocket?userid=xxx'); stompClient = Stomp.over(socket); /** 加入请求头的方式 * var headers={ * username:$("#username").val(), * password:$("#password").val() * }; * stompClient.connect(headers, function (frame) {..... */ stompClient.connect({}, function (frame) { heartCheck.reset().start(); //开始心跳检测 tryTimes= 0;//重置重连次数 //连接成功之后的其他操作... //订阅(该路由专门用于心跳检测) stompClient.subscribe('/check/net/'+userid, function (result) { heartCheck.reset().start(); //心跳检测重置 }); //普通订阅(如果路径没有指定userid 那么服务器发到这个路径的消息都可以收到) //如果要取消订阅 要使用下面subscribe返回的对象 再调用该对象的unsubscribe() stompClient.subscribe('/chat/single', function (result) { //获得result的body 再用JSON.parse解析 var body=JSON.parse(result.body); console.log(body.content); }); }, function(errorCallback){ //连接失败的操作...(该方法非必须) console.log(errorCallback) reconnect(); }); } function reconnect() { if(tryTimes>10){ alert("重连次数以达上限 连接失败") return; } setTimeout(function () { //没连接上会一直重连,设置延迟避免请求过多 connect(); }, 3000); } function sendMessage() { //stomp协议规定是用son格式传输 所以解析和发送都是json //要发送的数据写成json格式 然后用JSON.stringify解析成字符串形式传输给服务器 stompClient.send("/app/v3/single/chat", {}, JSON.stringify({'content': $("#content").val(), 'to':$("#to").val(), 'from':$("#from").val()})); } //手动关闭连接 function disconnect() { if (stompClient !== null) { stompClient.disconnect(); } //断开连接成功之后的操作... } //心跳检测与重连 var heartCheck = { timeout: 10000, //10s发一次心跳 timeoutObj: null, serverTimeoutObj: null, reset: function(){ clearTimeout(this.timeoutObj);//清除定时任务 clearTimeout(this.serverTimeoutObj); return this; }, start: function(){ var self = this; this.timeoutObj = setTimeout(function(){ //这里发送一个心跳到后端指定路由,后端该路径收到将再发一条消息到前端指定路由,从而完成一次交互(消息content可以为空 只要能到达路由就可以) stompClient.send("/app/v3/check", {}, JSON.stringify({'to':"/check/net/"+userid})); console.log("ping!") //如果超过一定时间还没重置才会执行到这,说明后端主动断开了 self.serverTimeoutObj = setTimeout(function(){ disConnect(); connect(); }, self.timeout) }, this.timeout) } }
最后小结:
服务端 1)单独的SpringBoot项目 用nginx反向代理多个节点(记得配置过期时间 默认1分半) 2)在握手拦截器进行校验 在redis中检查是否有对应的userid、并通过userid再redis中找到对应的User 如果该字段为false 则把online设置为true(该字段不存入mysql) 再将userid添加在attributes 方便用户下线时在DisConnect的监听器中取到userid 再把userid再redis中对应User的online设置为false 3)在要给客户端发送消息时 根据接收者的userid在redis中检查该用户的online是否为true 若是false 这说明没有websocket的连接 则其无法收到消息 所以先用数据库把消息存起来 每个用户上线时 在Connect的监听器里 查找未发送消息的数据库查找 如果有自己userid对应的消息 就调用方法给指定目的地进行推送 发送者 在connect的路径加上加过密的userid或者token 再握手拦截器进行拦截判断与保存到stomp中 接收者 1)只用设置好订阅的路径(如果是点对点 可以设置成:/通用路径/自己的userid) 2)把连接和监听的代码单独抽离成一份代码 然后每个页面引入该代码 并且在该组件给连接进行心跳检查 保证正常连接 (因为是和当前页面建立的连接 即每次刷新或跳转到其他页面连接会断开 所以每一份代码引入自动连接的代码 但是要控制好登陆后才可以连接) 3)如果是安卓端接收:监听到消息后进行提示 先操作本地数据库 再展示新的页面 4)如果是网页接收:监听到消息后进行提示 并重新在数据库拉取消息并展示
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。