赞
踩
基于redis
不用依赖任何其他乱七八糟的包
@Slf4j
@Component
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static final ConcurrentHashMap<String, WebSocketServer> WEB_SOCKET_MAP = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
private WebSocketMessageService webSocketMessageService;
private WebSocketMessageService getWebSocketMessageService() {
if (webSocketMessageService == null) {
webSocketMessageService = ApplicationContextHolder.getBean(WebSocketMessageService.class);
}
return webSocketMessageService;
}
private StringRedisTemplate template;
private StringRedisTemplate getStringRedisTemplate() {
if (template == null) {
template = ApplicationContextHolder.getBean(StringRedisTemplate.class);
}
return template;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) throws IOException {
this.session = session;
this.userId = userId;
//鉴权
Map<String, String> pathParameters = session.getPathParameters();
String cookie = pathParameters.get("cookie");
if (!WebSocketAuthentication.checkIdentity(session)) {
WebSocketAuthentication.terminate(session);
} else {
Ticket ticket = WebSocketAuthentication.issueTicket(session);
ticket.save();
sendMessage(ticket.getToken());
}
//新增逻辑
if (WEB_SOCKET_MAP.containsKey(userId)) {
WEB_SOCKET_MAP.remove(userId);
WEB_SOCKET_MAP.put(userId, this);
//加入set中
} else {
WEB_SOCKET_MAP.put(userId, this);
//加入set中
addOnlineCount();
//在线数加1
}
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("用户:" + userId + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (WEB_SOCKET_MAP.containsKey(userId)) {
WEB_SOCKET_MAP.remove(userId);
//从set中删除
subOnlineCount();
}
log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param messageInfoJsonStr 客户端发送过来的消息,会携带token{message:"string",token:"token"}
* @param session 客户端会话
*/
@OnMessage
public void onMessage(Session session, String messageInfoJsonStr) {
log.info("用户消息:" + userId + ",报文:" + messageInfoJsonStr);
//可以群发消息
//消息保存到数据库、redis
JSONObject messageInfo = JSON.parseObject(messageInfoJsonStr);
String token = messageInfo.getString("token");
if (WebSocketAuthentication.checkTicket(session, token)) {
WebSocketAuthentication.terminate(session);
}
String message = messageInfo.getString("message");
if (StringUtils.isNotBlank(message)) {
try {
//解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
//追加发送人(防止串改)
jsonObject.put("fromUserId", this.userId);
String toUserId = jsonObject.getString("toUserId");
//传送给对应toUserId用户的websocket
if (StringUtils.isNotBlank(toUserId) && WEB_SOCKET_MAP.containsKey(toUserId)) {
WEB_SOCKET_MAP.get(toUserId).sendMessage(jsonObject.toJSONString());
getWebSocketMessageService().storeMessage(this.userId, new HashSet<String>() {{
add(toUserId);
}}, jsonObject.toJSONString());
} else {
log.error("请求的userId:" + toUserId + "不在该服务器上");
//否则不在这个服务器上,发送到mysql或者redis
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @param session 客户端会话
* @param error 异常
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
//这里需要设置弹窗的方式
if (JSON.isValid(message)) {
JSONObject jsonObject = JSON.parseObject(message);
String popType = getWebSocketMessageService().searchUserPopType(this.userId, Integer.parseInt(jsonObject.get("messageCategory").toString()));
jsonObject.put("messagePop", popType);
this.session.getBasicRemote().sendText(jsonObject.toJSONString());
} else {
this.session.getBasicRemote().sendText(message);
}
}
public void sendAllUserMessage(WebSocketMessage messageInfo) {
WEB_SOCKET_MAP.values().forEach(webSocketServer -> {
try {
webSocketServer.sendMessage(messageInfo.getMessage());
} catch (IOException e) {
e.printStackTrace();
}
});
getWebSocketMessageService().storeMessage(messageInfo.getFromUserId(), WEB_SOCKET_MAP.keySet(), messageInfo.getMessage());
}
/**
* 发送自定义消息
*
* @param webSocketMessageVo 消息
*/
public void sendInfo(WebSocketMessageVo webSocketMessageVo) throws IOException {
WebSocketMessage messageInfo = webSocketMessageVo.getWebSocketMessage();
Set<String> toUserIds = webSocketMessageVo.getToUserIds();
for (String userId : toUserIds) {
log.info("发送消息到:" + userId + ",报文:" + messageInfo);
if (StringUtils.isNotBlank(userId) && WEB_SOCKET_MAP.containsKey(userId)) {
ArrayList<WebSocketMessage> webSocketMessages = getWebSocketMessageService().storeMessage(messageInfo.getFromUserId(), toUserIds, messageInfo.getMessage());
JSONObject jsonMsg = JSON.parseObject(messageInfo.getMessage());
for (WebSocketMessage webSocketMessage : webSocketMessages) {
if (webSocketMessage.getToUserId().equals(userId)) {
jsonMsg.put("id", webSocketMessage.getId());
}
}
WEB_SOCKET_MAP.get(userId).sendMessage(userId, jsonMsg.toJSONString());
} else {
//todo:等上线或者怎么样
log.error("用户" + userId + ",不在线!");
}
}
}
/**
* @description: 分布式 使用redis 去发布消息
* @dateTime: 2021/6/17 10:31
*/
public void sendMessage(@NotNull String key,String message) {
String newMessge= null;
try {
newMessge = new String(message.getBytes(Constants.UTF8), Constants.UTF8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
Map<String,String> map = new HashMap<>();
map.put(Constants.REDIS_MESSAGE_KEY, key);
map.put(Constants.REDIS_MESSAGE_VALUE, newMessge);
getStringRedisTemplate().convertAndSend(Constants.REDIS_CHANNEL, JSON.toJSONString(map));
}
/**
* @description: 单机使用 外部接口通过指定的客户id向该客户推送消息。
* @dateTime: 2021/6/16 17:49
*/
public void sendMessageByWayBillId(@NotNull String key, String message) {
WebSocketServer webSocketServer = WEB_SOCKET_MAP.get(key);
if (!Objects.isNull(webSocketServer)) {
try {
webSocketServer.sendMessage(message);
log.info("编号id为:" + key + "发送消息:" + message);
} catch (IOException e) {
e.printStackTrace();
log.error("编号id为:" + key + "发送消息失败");
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
/**
* 消息监听对象,接收订阅消息
*/
@Component
public class RedisReceiver implements MessageListener {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private WebSocketServer webSocketServer;
/**
* 处理接收到的订阅消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());// 订阅的频道名称
String msg = "";
try {
msg = new String(message.getBody(), Constants.UTF8);//注意与发布消息编码一致,否则会乱码
if (!StringUtils.isEmpty(msg)) {
if (Constants.REDIS_CHANNEL.endsWith(channel))// 最新消息
{
JSONObject jsonObject = JSON.parseObject(msg);
webSocketServer.sendMessageByWayBillId(
jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString()
, jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString());
} else {
//TODO 其他订阅的消息处理
}
} else {
log.info("消息内容为空,不处理。");
}
} catch (Exception e) {
log.error("处理消息异常:" + e.toString());
e.printStackTrace();
}
}
}
/**
* @Create 2022/12/02 13:06
* @Description:
*/
public class Constants {
public static final String REDIS_CHANNEL = "WebSocketChannel";
public static final String REDIS_MESSAGE_KEY = "msgKey";
public static final String UTF8 = "utf-8";
public static final String REDIS_MESSAGE_VALUE = "msgValue";
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。