赞
踩
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端
HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接。
无状态: 每次连接只处理一个请求,请求结束后断开连接。
无连接: 对于事务处理没有记忆能力,服务器不知道客户端是什么状态。
通过HTTP实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。
WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。
WebSocket特点
(1)建立在 TCP 协议之上,服务器端的实现比较容易。
(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种
HTTP 代理服务器。
(3)数据格式比较轻量,性能开销小,通信高效。
(4)可以发送文本,也可以发送二进制数据。
(5)没有同源限制,客户端可以与任意服务器通信。
(6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
启用WebSocket的支持
@Configuration
public class WebSocketConfig {
/**
* 自动注册使用@ServerEndpoint注解声明的websocket endpoint
* 2022/2/14
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
直接@ServerEndpoint(“/websocket/{userId}”) 、@Component启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。
集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可。
import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @ServerEndpoint("/imserver/{userId}") @Component public class WebSocketServer { static Log log=LogFactory.get(WebSocketServer.class); /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/ private static int onlineCount = 0; /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/ private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/ private Session session; /**接收userId*/ private String userId=""; /** * 连接建立成功调用的方法*/ @OnOpen public void onOpen(Session session,@PathParam("userId") String userId) { this.session = session; this.userId=userId; if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); webSocketMap.put(userId,this); //加入set中 }else{ webSocketMap.put(userId,this); //加入set中 addOnlineCount(); //在线数加1 } log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount()); try { sendMessage("连接成功"); } catch (IOException e) { log.error("用户:"+userId+",网络异常!!!!!!"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:"+userId+",报文:"+message); //可以群发消息 //消息保存到数据库、redis if(StringUtils.isNotBlank(message)){ try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId",this.userId); String toUserId=jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){ webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); }else{ log.error("请求的userId:"+toUserId+"不在该服务器上"); //否则不在这个服务器上,发送到mysql或者redis } }catch (Exception e){ e.printStackTrace(); } } } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 * */ public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException { log.info("发送消息到:"+userId+",报文:"+message); if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage(message); }else{ log.error("用户"+userId+",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
在使用websocket的时候会面对session共享的问题
在这里我们通过Redis的监听订阅模式来实现session共享,当有连接加入时,将连接发送到队列中然后向订阅该队列的服务下发,由存储该连接的服务进行逻辑处理
session共享的业务逻辑如下图所示
Redis可以通过发布订阅模式、轮询机制实现消息队列。
由于没有消息持久化与 ACK 的保证,所以,Redis 的发布订阅功能并不可靠。这也就导致了它的应用场景很有限,建议用于实时与可靠性要求不高的场景。
服务器中维护着一个pubsub_channels字典,所有的频道和订阅关系都存储在这里。字典的键为频道的名称,而值为订阅频道的客户端链表。
(1)如果频道已经存在,则新的客户端会添加到pubsub_channels对应频道的链表末尾
(2)如果频道原本不存在,则会为频道创建一个键,该客户端成为链表的第一个元素
pubsub_channels对应键的链表会删除该客户端
服务器会遍历pubsub_channels中对应键的链表,向每一个客户端发送信息
服务器还维护着一个pubsub_patterns链表,链表的pattern属性记录了被订阅的模式,而client属性记录了订阅模式的客户端
(1)创建一个链表节点,pattern属性记录订阅的模式,client记录订阅模式的客户端
(2)将这个链表节点添加到pubsub_patterns链表中
服务器遍历pubsob_patterns找到对应的pattern同时也是对应该client客户端的节点,将改节点删除
服务器遍历pubsub_channels,查找与channels频道相匹配的模式麻将消息发送给订阅了这些模式的客户端。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
public class RedisConfig { //Key的过期时间 private Duration timeToLive = Duration.ofDays(1); /** * redis模板,存储关键字是字符串,值jackson2JsonRedisSerializer是序列化后的值 * * @param * @return org.springframework.data.redis.core.RedisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(connectionFactory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); //使用StringRedisSerializer来序列化和反序列化redis的key值 RedisSerializer redisSerializer = new StringRedisSerializer(); //key redisTemplate.setKeySerializer(redisSerializer); redisTemplate.setHashKeySerializer(redisSerializer); //value redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public RedisCacheConfiguration redisCacheConfiguration() { return RedisCacheConfiguration. defaultCacheConfig(). entryTtl(this.timeToLive). //Key过期时间 1天 serializeKeysWith(RedisSerializationContext.SerializationPair. fromSerializer(new StringRedisSerializer())). serializeValuesWith(RedisSerializationContext.SerializationPair. fromSerializer(new GenericJackson2JsonRedisSerializer())); } }
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.concurrent.CountDownLatch; @Configuration public class RedisMessageListener { /** * 创建连接工厂 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapterWang, MessageListenerAdapter listenerAdapterTest2) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //(不同的监听器可以收到同一个频道的信息)接受消息的频道 container.addMessageListener(listenerAdapter, new PatternTopic("phone")); container.addMessageListener(listenerAdapterWang, new PatternTopic("phone")); container.addMessageListener(listenerAdapterTest2, new PatternTopic("phoneTest2")); return container; } /** * 绑定消息监听者和接收监听的方法 * * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean public MessageListenerAdapter listenerAdapterWang(ReceiverRedisMessage receiver) { return new MessageListenerAdapter(receiver, "receiveMessageWang"); } /** * 绑定消息监听者和接收监听的方法 * * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage receiver) { return new MessageListenerAdapter(receiver, "receiveMessage2"); } /** * 注册订阅者 * * @param latch * @return */ @Bean ReceiverRedisMessage receiver(CountDownLatch latch) { return new ReceiverRedisMessage(latch); } /** * 计数器,用来控制线程 * * @return */ @Bean public CountDownLatch latch() { return new CountDownLatch(1);//指定了计数的次数 1 } }
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.CountDownLatch; public class ReceiverRedisMessage { private CountDownLatch latch; @Autowired public ReceiverRedisMessage(CountDownLatch latch) { this.latch = latch; } /** * 队列消息接收方法 * * @param jsonMsg */ public void receiveMessage(String jsonMsg) { log.info("[开始消费REDIS消息队列phone数据...]"); try { System.out.println(jsonMsg); log.info("[消费REDIS消息队列phone数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } public void receiveMessageWang(String jsonMsg) { log.info("[开始消费REDIS消息队列phone数据...]"); try { System.out.println(jsonMsg); log.info("[消费REDIS消息队列phone数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } /** * 队列消息接收方法 * * @param jsonMsg */ public void receiveMessage2(String jsonMsg) { log.info("[开始消费REDIS消息队列phoneTest2数据...]"); try { System.out.println(jsonMsg); /** * 此处执行自己代码逻辑 操作数据库等 */ log.info("[消费REDIS消息队列phoneTest2数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } }
public void redisTest() {
//这个是测试同一个频道,不同的订阅者收到相同的信息,“phone”也就是topic也可以理解为频道
redisTemplate.convertAndSend("phone", "223333");
//这个phoneTest2是另外的一个频道
// redisTemplate.convertAndSend("phoneTest2", "34555665");
}
Redis的列表类型键可以用来实现队列,并且支持阻塞式读取,可以实现一个高性能的优先队列, 在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。
public R saveUserTicket(String phoneNum) {
redisTemplate.opsForList().leftPush("ticket:Data", phoneNum);
return R.ok();
}
@Scheduled(fixedRate = 1)
public synchronized void consumer() {
String message = redisTemplate.opsForList().rightPop("ticket:Data", 5, TimeUnit.SECONDS);
if (!StringUtils.isEmpty(message)){
//数据库操作
}
}
如上述代码,如果此时队列为空,消费者依然会频繁拉取数据,造成CPU空转,不仅占用CPU资源还对Redis造成压力。因此当队列为空时我们可以休眠一段时间,再进行拉取。
实现如下
@Scheduled(fixedRate = 1)
public synchronized void consumer() throws InterruptedException {
long a = redisTemplate.opsForList().size("ticket:Data");
if (a == 0) {
TimeUnit.SECONDS.sleep(1);//等待时间
}
String message = redisTemplate.opsForList().rightPop("ticket:Data", 5, TimeUnit.SECONDS);
if (!StringUtils.isEmpty(message)){
//数据库操作
}
发布订阅系统在我们日常的工作中经常会使用到,这种场景大部分情况我们都是使用消息队列,常用的消息队列有 Kafka,RocketMQ,RabbitMQ,每一种消息队列都有其特性,很多时候我们可能不需要独立部署相应的消息队列,只是简单的使用,而且数据量也不会太大,这种情况下,我们就可以使用 Redis 的 Pub/Sub 模型。
Redis 的发布订阅功能主要由 PUBLISH,SUBSCRIBE,PSUBSCRIBE 命令组成,一个或者多个客户端订阅某个或者多个频道,当其他客户端向该频道发送消息的时候,订阅了该频道的客户端都会收到对应的消息。
Psubscribe 命令订阅一个或多个符合给定模式的频道。每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets )
Psubscribe 命令基本语法如下:
PSUBSCRIBE pattern [pattern …]
Redis Pubsub 命令用于查看订阅与发布系统状态,它由数个不同格式的子命令组成。
PUBSUB [argument [argument …]]
Publish 命令用于将信息发送到指定的频道。
Publish 命令基本语法如下:
PUBLISH channel message
Punsubscribe 命令用于退订所有给定模式的频道。
Punsubscribe 命令基本语法如下:
PUNSUBSCRIBE [pattern [pattern …]]
Subscribe 命令用于订阅给定的一个或多个频道的信息。
Subscribe 命令基本语法如下:
SUBSCRIBE channel [channel …]
Unsubscribe 命令用于退订给定的一个或多个频道的信息。
Unsubscribe 命令基本语法如下:
UNSUBSCRIBE channel [channel …]
Redis通过PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令实现了发布和订阅功能。
订阅 Channel
在 Redis 的底层结构中,客户端和频道的订阅关系是通过一个字典加链表的结构保存的,形式如下:
在 Redis 的底层结构中,Redis 服务器结构体中定义了一个 pubsub_channels 字典
struct redisServer {
//用于保存所有频道的订阅关系
dict *pubsub_channels
}
在这个字典中,key 代表的是频道名称,value 是一个链表,这个链表里面存放的是所有订阅这个频道的客户端。
所以当有客户端执行订阅频道的动作的时候,服务器就会将客户端与被订阅的频道在 pubsub_channels 字典中进行关联。
这个时候有两种情况:
该渠道是首次被订阅:首次被订阅说明在字典中并不存在该渠道的信息,那么程序首先要创建一个对应的 key,并且要赋值一个空链表,然后将对应的客户端加入到链表中。此时链表只有一个元素。
该渠道已经被其他客户端订阅过:这个时候就直接将对应的客户端信息添加到链表的末尾就好了。
比如,如果有一个新的客户端 Client 08 要订阅 run 渠道,那么上图就会变成
如果 Client 08 要订阅一个新的渠道 new_sport ,那么就会变成
上面介绍的是单个 Channel 的订阅,相反的如果一个客户端要取消订阅相关 Channel,则无非是找到对应的 Channel 的链表,从中删除对应的客户端,如果该客户端已经是最后一个了,则将对应 Channel 也删除。
模式渠道的订阅与单个渠道的订阅类似,不过服务器是将所有模式的订阅关系都保存在服务器状态的pubsub_patterns 属性里面。
struct redisServer{
//保存所有模式订阅关系
list *pubsub_patterns;
}
与订阅单个 Channel 不同的是,pubsub_patterns 属性是一个链表,不是字典。节点的结构如下:
struct pubsubPattern{
//订阅模式的客户端
redisClient *client;
//被订阅的模式
robj *pattern;
} pubsubPattern;
其实 client 属性是用来存放对应客户端信息,pattern 是用来存放客户端对应的匹配模式。
所以对应上面的 Client-06 模式匹配的结构存储如下
在pubsub_patterns链表中有一个节点,对应的客户端是 Client-06,对应的匹配模式是run*。
当某个客户端通过命令psubscribe 订阅对应模式的 Channel 时候,服务器会创建一个节点,并将 Client 属性设置为对应的客户端,pattern 属性设置成对应的模式规则,然后添加到链表尾部。
创建新节点;
给节点的属性赋值;
将节点添加到链表的尾部;
退订模式的命令是punsubscribe,客户端使用这个命令来退订一个或者多个模式 Channel。服务器接收到该命令后,会遍历pubsub_patterns链表,将匹配到的 client 和 pattern 属性的节点给删掉。这里需要判断 client 属性和 pattern 属性都合法的时候再进行删除。
遍历所有的节点,当匹配到相同 client 属性和 pattern 属性的时候就进行节点删除。
当一个客户端执行了publish channelName message 命令的时候,服务器会从pubsub_channels和pubsub_patterns 两个结构中找到符合channelName 的所有 Channel,进行消息的发送。在 pubsub_channels 中只要找到对应的 Channel 的 key 然后向对应的 value 链表中的客户端发送消息。
@OnMessage
public void onMessage(Session session, String message) throws IOException, ParseException {
//业务逻辑处理
//向所有用户广播
redisUtil.convertSend("topic", message);
}
/**
* 队列消息接收方法
* 2022/2/21
*/
public void receiveMessage(String message) {
log.info("[开始消费REDIS消息队列topic数据...]");
try {
//业务逻辑处理
webSocketServer.sendMessageForProject(message);
log.info("[消费REDIS消息队列topic数据成功.]");
} catch (Exception e) {
log.error("[消费REDIS消息队列topic数据失败,失败信息:{}]", e.getMessage());
}
latch.countDown();
}
参考来源:「人苼若只茹初見」
原文链接: https://blog.csdn.net/printf88/article/details/123685995
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。