赞
踩
1、在pom.xml中引入相关依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
2、创建WebSocketServer用于处理websocket请求
- package com.hdy.manage.web.socket;
-
- import com.alibaba.fastjson.JSON;
- import com.hdy.manage.configuration.WebSocketConfiguration;
- import com.hdy.manage.data.base.JsonResult;
- import com.hdy.manage.exception.WebsocketUnauthorizedException;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.*;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.concurrent.CopyOnWriteArraySet;
-
- /**
- * WebSocket请求
- */
- @Component
- @ServerEndpoint(value = "/websocket", configurator = WebSocketConfiguration.class)
- @Slf4j
- public class WebSocketServer {
-
- //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
- private static CopyOnWriteArraySet<WebSocketServer> webSocketServers = new CopyOnWriteArraySet<>();
- //与某个客户端的连接会话,需要通过它来给客户端发送数据
- private Session session;
- // 用户id
- private Integer userId;
- // 用户token:用于挤出登录通知,不能使用userId
- private String token;
-
- /**
- * 连接建立成功调用的方法
- */
- @OnOpen
- public void onOpen(Session session) {
- Object userIdObj = session.getUserProperties().get("userId");
- Object tokenObj = session.getUserProperties().get("token");
- if (null == userIdObj || null == tokenObj) {
- // 未进行认证,获取不到用户信息
- log.info("用户未登录,主动报错关闭连接");
- sendMessage(session, JSON.toJSONString(JsonResult.unauthorized()));
- throw new WebsocketUnauthorizedException("用户未登录");
- } else {
- Integer userId = (Integer) userIdObj;
- String token = (String) tokenObj;
- this.session = session;
- this.userId = userId;
- this.token = token;
- webSocketServers.add(this); //加入set中
- log.info("用户 [" + userId + "] 加入websocket,当前在线人数为:" + onlineCount());
- }
- }
-
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose() {
- webSocketServers.remove(this);//从set中删除
- log.info("用户 [" + this.userId + "] 关闭websocket,当前在线人数为:" + onlineCount());
- }
-
- /**
- * 收到客户端消息后调用的方法
- */
- @OnMessage
- public void onMessage(String message, Session session) {
- log.info(this.userId + "发送消息: " + message);
- System.out.println(message);
- }
-
- /**
- * 发生错误时调用
- */
- @OnError
- public void onError(Session session, Throwable error) {
- error.printStackTrace();
- log.info("WebSocket连接发生错误,连接已被断开...");
- }
-
- /**
- * 发送消息
- */
- public void sendMessage(String message) throws IOException {
- this.session.getBasicRemote().sendText(message);
- //this.session.getAsyncRemote().sendText(message);
- }
-
- /**
- * 根据Session进行推送
- * @param session
- * @param message
- */
- private static void sendMessage(Session session, String message) {
- try {
- session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 按照用户id推送消息
- * @param userId
- * @param jsonResult
- */
- public static void sendMessage(Integer userId, JsonResult jsonResult) {
- for (WebSocketServer webSocketServer : webSocketServers) {
- try {
- if (webSocketServer.userId.equals(userId)) {
- webSocketServer.sendMessage(JSON.toJSONString(jsonResult));
- }
- } catch (IOException e) {
- continue;
- }
- }
- }
-
- /**
- * 根据用户token推送消息
- * @param token
- * @param jsonResult
- */
- public static void sendMessage(String token, JsonResult jsonResult) {
- for (WebSocketServer webSocketServer : webSocketServers) {
- try {
- if (webSocketServer.token.equals(token)) {
- webSocketServer.sendMessage(JSON.toJSONString(jsonResult));
- }
- } catch (IOException e) {
- continue;
- }
- }
- }
-
- public static synchronized int onlineCount() {
- return webSocketServers.size();
- }
- }
3、创建WebSocketConfiguration,开启WebSocket支持
- package com.hdy.manage.configuration;
-
- import com.hdy.manage.data.entity.SysUser;
- import org.apache.shiro.SecurityUtils;
- import org.apache.shiro.subject.Subject;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- import javax.websocket.HandshakeResponse;
- import javax.websocket.server.HandshakeRequest;
- import javax.websocket.server.ServerEndpointConfig;
-
- /**
- * 开启WebSocket支持
- */
- @Configuration
- public class WebSocketConfiguration extends ServerEndpointConfig.Configurator {
-
- @Override
- public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
- Subject subject = SecurityUtils.getSubject();
- if (subject.isAuthenticated()) {
- SysUser sysUser = (SysUser) subject.getPrincipal();
- sec.getUserProperties().put("userId", sysUser.getId());
- sec.getUserProperties().put("token", subject.getSession().getId());
- }
- super.modifyHandshake(sec, request, response);
- }
-
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
- }
4、创建Redis消息队列发布者
- package com.hdy.manage.configuration.redis;
-
- import com.alibaba.fastjson.JSONObject;
- import com.hdy.manage.data.base.JsonResult;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
-
- @Component
- public class RedisPublication {
-
- @Autowired
- private StringRedisTemplate redisTemplate;
-
- /**
- * 根据userId推送消息
- * @param userId
- * @param jsonResult
- */
- public void sendMessage(Integer userId, JsonResult jsonResult) {
- JSONObject messageObject = new JSONObject();
- messageObject.put("userId", userId);
- messageObject.put("content", jsonResult);
- redisTemplate.convertAndSend("channel:message", messageObject.toJSONString());
- }
-
- /**
- * 根据token推送消息
- * @param token
- * @param jsonResult
- */
- public void sendMessage(String token, JsonResult jsonResult) {
- JSONObject messageObject = new JSONObject();
- messageObject.put("token", token);
- messageObject.put("content", jsonResult);
- redisTemplate.convertAndSend("channel:message", messageObject.toJSONString());
- }
- }
5、创建Redis消息队列消费者
- package com.hdy.manage.configuration.redis;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.hdy.manage.data.base.JsonResult;
- import com.hdy.manage.web.socket.WebSocketServer;
- import org.springframework.data.redis.connection.Message;
- import org.springframework.data.redis.connection.MessageListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 设置redis订阅发布接收者
- */
- @Component
- public class RedisSubscription implements MessageListener {
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- String messageBody = new String(message.getBody());
- JSONObject messageObject = JSON.parseObject(messageBody);
- Integer userId = messageObject.getInteger("userId");
- JsonResult jsonResult = messageObject.getJSONObject("content").toJavaObject(JsonResult.class);
- if (null == userId) {
- String token = messageObject.getString("token");
- WebSocketServer.sendMessage(token, jsonResult);
- } else {
- WebSocketServer.sendMessage(userId, jsonResult);
- }
- }
- }
6、对Redis消息队列进行系统配置
- package com.hdy.manage.configuration.redis;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.data.redis.listener.PatternTopic;
- import org.springframework.data.redis.listener.RedisMessageListenerContainer;
- import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
-
- @Configuration
- public class RedisConfiguration {
-
- @Bean
- RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
- RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
- redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
- // 可以添加多个 messageListener,配置不同的交换机
- redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("channel:message"));
- //redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("channel:notice"));
- return redisMessageListenerContainer;
- }
-
- @Bean
- MessageListenerAdapter messageListenerAdapter(RedisSubscription redisSubscription) {
- return new MessageListenerAdapter(redisSubscription, "onMessage");
- }
-
- @Bean
- StringRedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
- return new StringRedisTemplate(redisConnectionFactory);
- }
- }
7、发送消息调用
- @Autowired
- private RedisPublication redisPublication;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。