赞
踩
之前做用springboot+websocket做双向通讯时,websocket的session存在无法序列化导致集群不能通过共享session来实现,后来采取了记录需要推送客户端ip,然后用http去请求web接口这个不友好的方式。当然需求只需要做扫码登录,这种方式影响也不会有什么影响。但集群问题一直没解决就在心里埋下了个种子。
用netty搭建websocket集群服务,因为netty中的channel是在本地的需要整合rabbitmq或者redis等发布/订阅模式来实现消息发送(或者Channel共享,具体还没考究母鸡能不能实现)。这里用redis做广播(如果用redis需要考虑消息丢失和ack等情况,这里只做演示)
根据流程图,用户端client1想给client2发送消息,如果是单机的话就很简单,直接拿到client2的Channel发送Message就行了。但如果是server集群的话,就需要广播消息,让每个集群节点都收到内容,Channel不在本地的话就忽略。
- <!-- 缓存begin -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
- <!-- redisson -->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>2.13.0</version>
- </dependency>
- <!-- netty -->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.29.Final</version>
- </dependency>
- <!--lombok 依赖包 -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.serializer.RedisSerializer;
- import java.net.UnknownHostException;
-
- /**
- * @Description
- * @Author kele
- * @Data 2023/8/31 16:34
- */
- @Configuration
- public class RedisConfig {
-
- @Bean
- public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
- // 将template 泛型设置为 <String, Object>
- RedisTemplate<String, Object> template = new RedisTemplate();
- // 连接工厂,不必修改
- template.setConnectionFactory(redisConnectionFactory);
- /*
- * 序列化设置
- */
- // key、hash的key 采用 String序列化方式
- template.setKeySerializer(RedisSerializer.string());
- template.setHashKeySerializer(RedisSerializer.string());
- // value、hash的value 采用 Jackson 序列化方式
- template.setValueSerializer(RedisSerializer.json());
- template.setHashValueSerializer(RedisSerializer.json());
- template.afterPropertiesSet();
-
- return template;
- }
- }
RedisMessageListenerConfiguration .java
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.listener.PatternTopic;
- import org.springframework.data.redis.listener.RedisMessageListenerContainer;
- import java.util.Arrays;
- import java.util.List;
-
- /**
- * @Description
- * @Author kele
- * @Data 2023/8/31 16:07
- */
- @Configuration
- public class RedisMessageListenerConfiguration {
-
- @Autowired
- private LifeRedisMessageListener lifeRedisMessageListener;
- @Autowired
- private RedisConnectionFactory redisConnectionFactory;
-
- /**
- * 配置订阅关系
- */
- @Bean
- public RedisMessageListenerContainer container() {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(redisConnectionFactory);
- //订阅频道
- List<PatternTopic> topicList = Arrays.asList(new PatternTopic("life.*"),new PatternTopic("*.life"));
- container.addMessageListener(lifeRedisMessageListener, topicList);
- return container;
- }
- }
LifeRedisMessageListener .java
-
- import com.alibaba.fastjson.JSONObject;
- import com.na.integration.socket.websocket.WebSocketHandler;
- import com.na.model.dto.NettyRedisConnectionDto;
- import com.na.common.utils.JSONUtils;
- import io.netty.channel.Channel;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.connection.Message;
- import org.springframework.data.redis.connection.MessageListener;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.serializer.RedisSerializer;
- import org.springframework.stereotype.Component;
- import java.time.LocalDateTime;
- import java.util.Map;
-
- /**
- * @Description
- * @Author kele
- * @Data 2023/8/31 16:07
- */
- @Component
- @Slf4j
- public class LifeRedisMessageListener implements MessageListener {
-
- @Autowired
- private RedisTemplate redisTemplate;
-
- @Override
- public void onMessage(Message message, byte[] pattern) {
- //需要在加载bean的时候配置相同的redis序列化器,否则会乱码
- RedisSerializer keySerializer = redisTemplate.getKeySerializer();
- RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
- log.info("----------Life接收到发布者消息----------");
- log.info("|频道:{}", keySerializer.deserialize(message.getChannel()));
- log.info("|当前监听器绑定的pattern:{}", new String(pattern));
- log.info("|消息内容:{}", valueSerializer.deserialize(message.getBody()));
- log.info("---------------------------------");
- //反序列化
- JSONObject jsonObject = JSONUtils.toJsonObj((String) valueSerializer.deserialize(message.getBody()));
- NettyRedisConnectionDto dto = new NettyRedisConnectionDto();
- Long id = Long.valueOf(jsonObject.get("id").toString());
- dto.setId(id);
- dto.setChannel((Channel) jsonObject.get("key"));
- dto.setSendMessage(jsonObject.get("sendMessage").toString());
- dto.setSendId(Long.valueOf(jsonObject.get("sendId").toString()));
-
- /**
- * 发送内容
- */
- //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
- Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
- //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
- Map<Long, String> clientMap = WebSocketHandler.getClientMap();
- //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
- String v = clientMap.get(dto.getSendId());
- Channel channel = null;
- try {
- channel = channelMap.get(v);
- } catch (NullPointerException e) {
- log.error("消息id:" + id + "通道不在本地");
- return;
- }
- Channel finalChannel = channel;
- channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(
- Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "来自id为"
- + dto.getId() + ",发送的内容message=" + dto.getSendMessage())));
- // redisTemplate.opsForValue().get("");
- }
- import lombok.Data;
- import lombok.experimental.Accessors;
- import java.io.Serializable;
-
- @Data
- @Accessors(chain = true)
- public class MessageRequest implements Serializable {
-
- private Long unionId;
-
- private Integer current = 1;
-
- private Integer size = 10;
- }
-
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- import io.netty.handler.stream.ChunkedWriteHandler;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- /**
- * @Description websocket通道初始化器
- **/
- @Component
- public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- @Autowired
- private WebSocketHandler webSocketHandler;
-
- @Value("${websocket.url}")
- private String websocketUrl;
-
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
-
- //获取pipeline通道
- ChannelPipeline pipeline = socketChannel.pipeline();
- //因为基于http协议,使用http的编码和解码器
- pipeline.addLast(new HttpServerCodec());
- //是以块方式写,添加ChunkedWriteHandler处理器
- pipeline.addLast(new ChunkedWriteHandler());
- /*
- 说明
- 1\. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
- 2\. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
- */
- pipeline.addLast(new HttpObjectAggregator(8192));
- /* 说明
- 1\. 对应websocket ,它的数据是以 帧(frame) 形式传递
- 2\. 可以看到WebSocketFrame 下面有六个子类
- 3\. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri
- 4\. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
- 5\. 是通过一个 状态码 101
- */
- pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
- //自定义的handler ,处理业务逻辑
- pipeline.addLast(webSocketHandler);
- }
- }
-
- import com.alibaba.fastjson.JSON;
- import com.na.enums.ResultCode;
- import com.na.exceptions.RRException;
- import com.na.utils.RedisLockUtil;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.util.concurrent.Future;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.time.LocalDateTime;
- import java.util.Map;
- import java.util.Optional;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @Description websocket处理器
- **/
- @Slf4j
- @Component
- @ChannelHandler.Sharable//保证处理器,在整个生命周期中就是以单例的形式存在,方便统计客户端的在线数量
- public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
- @Autowired
- private RedisLockUtil redisLockUtil;
-
- //通道map,存储channel,用于群发消息,以及统计客户端的在线数量,解决问题问题三,如果是集群环境使用redis的hash数据类型存储即可
- private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
- //任务map,存储future,用于停止队列任务
- private static Map<String, Future> futureMap = new ConcurrentHashMap<>();
- //存储channel的id和用户主键的映射,客户端保证用户主键传入的是唯一值,解决问题四,如果是集群中需要换成redis的hash数据类型存储即可
- private static Map<Long, String> clientMap = new ConcurrentHashMap<>();
- @Resource
- private RedisTemplate<String, Object> redisTemplate;
-
- /**
- * 客户端发送给服务端的消息
- *
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
-
- try {
- //接受客户端发送的消息
- MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);
-
- //每个channel都有id,asLongText是全局channel唯一id
- String key = ctx.channel().id().asLongText();
- //存储channel的id和用户的主键
- clientMap.put(messageRequest.getUnionId(), key);
- log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");
-
- if (!channelMap.containsKey(key)) {
- //使用channel中的任务队列,做周期循环推送客户端消息,解决问题二和问题五
- Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
- //存储客户端和服务的通信的Chanel
- channelMap.put(key, ctx.channel());
- //存储每个channel中的future,保证每个channel中有一个定时任务在执行
- futureMap.put(key, future);
- } else {
- //每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
- ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
- }
-
- } catch (Exception e) {
-
- log.error("websocket服务器推送消息发生错误:", e);
-
- }
- }
-
- /**
- * 注册时执行
- */
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- super.channelRegistered(ctx);
- log.info("--channelRegistered注册时执行--" + ctx.channel().id().toString());
- }
-
- /**
- * 客户端连接时候的操作
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
- }
-
- /**
- * 离线时执行
- */
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- super.channelUnregistered(ctx);
- log.info("--channelUnregistered离线时执行--" + ctx.channel().id().toString());
- }
-
-
- /**
- * 客户端掉线时的操作
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-
- String key = ctx.channel().id().asLongText();
- //移除通信过的channel
- channelMap.remove(key);
- //移除和用户绑定的channel
- clientMap.remove(key);
- //关闭掉线客户端的future
- Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
- future.cancel(true);
- futureMap.remove(key);
- });
- log.info("一个客户端移除......" + ctx.channel().remoteAddress());
- ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
- }
-
- /**
- * 从客户端收到新的数据、读取完成时调用
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
- log.info("--channelReadComplete从客户端收到新的数据--");
- ctx.flush();
- }
-
-
- /**
- * 发生异常时执行的操作
- * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
- *
- * @param ctx
- * @param cause
- * @throws Exception
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- String key = ctx.channel().id().asLongText();
- //移除通信过的channel
- channelMap.remove(key);
- //移除和用户绑定的channel
- clientMap.remove(key);
- //移除定时任务
- Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
- future.cancel(true);
- futureMap.remove(key);
- });
- //关闭长连接
- ctx.close();
- log.info("异常发生 " + cause.getMessage());
- }
-
- public static Map<String, Channel> getChannelMap() {
- return channelMap;
- }
-
- public static Map<String, Future> getFutureMap() {
- return futureMap;
- }
-
- public static Map<Long, String> getClientMap() {
- return clientMap;
- }
-
- /**
- * redission 防止重复在线,多个实例的本地缓存是否存在同一个id和与幂等性,这样会导致想要接收方混乱的bug
- * @param key
- * @param v
- */
- private void addChannelMap(String key, Channel v) {
- try {
- //定义keykey的锁
- redisLockUtil.lock(key, key, 10000, 3, 5000);
- WebSocketHandler.channelMap.put(key, v);
- } finally {
- redisLockUtil.unlock(key.toString(), key.toString());
- }
- }
-
- private void addFutureMap(String key, Future v) {
- try {
- //定义keykey的锁
- redisLockUtil.lock(key, key, 10000, 3, 5000);
- WebSocketHandler.futureMap.put(key, v);
- } finally {
- redisLockUtil.unlock(key.toString(), key.toString());
- }
- }
-
- private void addClientMap(Long key, String v) {
- try {
- //定义keykey的锁
- redisLockUtil.lock(key.toString(), key.toString(), 10000, 3, 5000);
- WebSocketHandler.clientMap.put(key, v);
- } finally {
- redisLockUtil.unlock(key.toString(), key.toString());
- }
- }
-
- public static void sendMessage(String key, String message) {
- if (StringUtils.isEmpty(key)) {
- throw new RRException(ResultCode.PARAM_NOT_NULL);
- }
- Channel channel = channelMap.get(key);
- if (channel == null) {
- throw new RRException(ResultCode.ID_IS_NULL);
- }
- try {
- channel.writeAndFlush(message);
- } catch (Exception e) {
- throw new RRException(ResultCode.SOME_USERS_SEND_FAIL);
- }
- }
- }
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
-
- /**
- * @Description websocket初始化器
- **/
- @Slf4j
- @Component
- public class WebsocketInitialization {
-
- @Resource
- private WebsocketChannelInitializer websocketChannelInitializer;
-
- @Value("${websocket.port}")
- private Integer port;
-
- @Async
- public void init() throws InterruptedException {
- //bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- //workerGroup工作线程组,主要负责网络IO读写
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- //启动辅助类
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- //bootstrap绑定两个线程组
- serverBootstrap.group(bossGroup, workerGroup);
- //设置通道为NioChannel
- serverBootstrap.channel(NioServerSocketChannel.class);
- //可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
- serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
- //设置自定义的通道初始化器,用于入站操作
- serverBootstrap.childHandler(websocketChannelInitializer);
- //启动服务器,本质是Java程序发起系统调用,然后内核底层起了一个处于监听状态的服务,生成一个文件描述符FD
- ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
- //异步
- channelFuture.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import lombok.extern.slf4j.Slf4j;
- import java.time.LocalDateTime;
-
- /**
- * @Description 向客户端发送消息
- **/
- @Slf4j
- public class WebsocketRunnable implements Runnable {
-
- private ChannelHandlerContext channelHandlerContext;
-
- private MessageRequest messageRequest;
-
- public WebsocketRunnable(ChannelHandlerContext channelHandlerContext,MessageRequest messageRequest) {
- this.channelHandlerContext = channelHandlerContext;
- this.messageRequest = messageRequest;
- }
-
- @Override
- public void run() {
- try {
- log.info(Thread.currentThread().getName()+"--"+LocalDateTime.now().toString());
- channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
- } catch (Exception e) {
- log.error("websocket服务器推送消息发生错误:",e);
- }
- }
- }
RedissonConfiguration.java配置类
-
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.ClusterServersConfig;
- import org.redisson.config.Config;
- import org.redisson.config.SingleServerConfig;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.util.StringUtils;
-
- import java.util.ArrayList;
- import java.util.List;
-
- @Slf4j
- @Configuration
- public class RedissonConfiguration {
-
- @Autowired
- private RedisProperties redisProperties;
- /**
- * 初始化RedissonClient客户端
- * 注意:
- * 此实例集群为3节点,各节点1主1从
- * 集群模式,集群节点的地址须使用“redis://”前缀,否则将会报错。
- *
- * @return {@link RedissonClient}
- */
- @Bean
- public RedissonClient getRedissonClient() {
- Config config = new Config();
- if (redisProperties.getCluster() != null) {
- //集群模式配置
- List<String> nodes = redisProperties.getCluster().getNodes();
-
- List<String> clusterNodes = new ArrayList<>();
- for (int i = 0; i < nodes.size(); i++) {
- clusterNodes.add("redis://" + nodes.get(i));
- }
- ClusterServersConfig clusterServersConfig = config.useClusterServers()
- .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
-
- if (!StringUtils.isEmpty(redisProperties.getPassword())) {
- clusterServersConfig.setPassword(redisProperties.getPassword());
- }
- } else {
- //单节点配置
- String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
- SingleServerConfig serverConfig = config.useSingleServer();
- serverConfig.setAddress(address);
- if (!StringUtils.isEmpty(redisProperties.getPassword())) {
- serverConfig.setPassword(redisProperties.getPassword());
- }
- serverConfig.setDatabase(redisProperties.getDatabase());
- }
- //看门狗的锁续期时间,默认30000ms,这里配置成15000ms
- // config.setLockWatchdogTimeout(15000);
- config.setLockWatchdogTimeout(15000);
- return Redisson.create(config);
- }
- }
RedisLockUtil.java
-
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.core.annotation.Order;
- import org.springframework.stereotype.Component;
- import java.util.concurrent.TimeUnit;
-
- @Component
- //@Order(value = 2)
- public class RedisLockUtil {
- private final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
-
- @Autowired
- private RedissonClient redissonClient;
-
- /* @Autowired
- public RedisLockUtil(@Qualifier("customRedisson") RedissonClient redissonClient) {
- this.redissonClient = redissonClient;
- }*/
-
- /**
- * 源码
- * 1.固定有效期的锁:超过有效期leaseTime后,自动释放锁。
- *
- * public void lock(long leaseTime, TimeUnit unit) {
- * try {
- * this.lockInterruptibly(leaseTime, unit);
- * } catch (InterruptedException var5) {
- * Thread.currentThread().interrupt();
- * }
- * }
- * 2.没有有效期的锁:默认30秒,然后采用Watchdog进行续期,直到业务逻辑执行完毕。
- *
- * public void lock() {
- * try {
- * this.lockInterruptibly();
- * } catch (InterruptedException var2) {
- * Thread.currentThread().interrupt();
- * }
- * }
- * ————————————————
- */
-
-
- /**
- * 加锁
- * @param key 锁的 key
- * @param value value ( key + value 必须保证唯一)
- * @param expire key 的过期时间,单位 ms
- * @param retryTimes 重试次数,即加锁失败之后的重试次数
- * @param retryInterval 重试时间间隔,单位 ms
- * @return 加锁 true 成功
- */
- public RLock lock(String key, String value, long expire, int retryTimes, long retryInterval) {
- logger.info("locking... redisK = {}", key);
- RLock fairLock = redissonClient.getFairLock(key + ":" + value);
- try {
- boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);//是否加锁成功
- if (tryLock) {
- logger.info("locked... redisK = {}", key);
- return fairLock;
- } else {
- //重试获取锁
- logger.info("retry to acquire lock: [redisK = {}]", key);
- int count = 0;
- while(count < retryTimes) {
- try {
- Thread.sleep(retryInterval);
- tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
- if(tryLock) {
- logger.info("locked... redisK = {}", key);
- return fairLock;
- }
- logger.warn("{} times try to acquire lock", count + 1);
- count++;
- } catch (Exception e) {
- logger.error("acquire redis occurred an exception", e);
- break;
- }
- }
-
- logger.info("fail to acquire lock {}", key);
- }
- } catch (Throwable e1) {
- logger.error("acquire redis occurred an exception", e1);
- }
-
- return fairLock;
- }
-
- /**
- * 加锁
- * @param key 锁的 key
- * @param value value ( key + value 必须保证唯一)
- * @param expire key 的过期时间,单位 ms
- * @param retryTimes 重试次数,即加锁失败之后的重试次数
- * @param retryInterval 重试时间间隔,单位 ms
- * @return 加锁 true 成功
- */
- public boolean lock2(String key, String value, long expire, int retryTimes, long retryInterval) {
- logger.info("locking... redisK = {}", key);
- RLock fairLock = redissonClient.getFairLock(key + ":" + value);
- try {
- boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
- if (tryLock) {
- logger.info("locked... redisK = {}", key);
- return true;
- } else {
- //重试获取锁
- logger.info("retry to acquire lock: [redisK = {}]", key);
- int count = 0;
- while(count < retryTimes) {
- try {
- Thread.sleep(retryInterval);
- tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
- if(tryLock) {
- logger.info("locked... redisK = {}", key);
- return true;
- }
- logger.warn("{} times try to acquire lock", count + 1);
- count++;
- } catch (Exception e) {
- logger.error("acquire redis occurred an exception", e);
- break;
- }
- }
-
- logger.info("fail to acquire lock {}", key);
- return false;
- }
- } catch (Throwable e1) {
- logger.error("acquire redis occurred an exception", e1);
- return false;
- }
- }
-
-
- /**
- * 加锁
- * @param key 锁的 key
- * @param value value ( key + value 必须保证唯一)
- * @param expire key 的过期时间,单位 ms
- * @return 加锁 true 成功
- */
- public boolean lockCheck(String key, String value, long expire) {
- logger.info("locking... redisK = {}", key);
- RLock fairLock = redissonClient.getFairLock(key + ":" + value);
- boolean tryLock = false;
- try {
- tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
- } catch (Throwable e1) {
- logger.error("acquire redis occurred an exception", e1);
- }
- return tryLock;
- }
-
-
-
- /**
- * 加锁
- * @param key 锁的 key
- * @param value value ( key + value 必须保证唯一)
- * @param expire key 的过期时间,单位 ms
- * @return 加锁 true 成功
- */
- public boolean lockDog(String key, String value, long expire) {
- logger.info("locking... redisK = {}", key);
- RLock fairLock = redissonClient.getFairLock(key + ":" + value);
- boolean tryLock = false;
- try {
- fairLock.tryLock(0, TimeUnit.MILLISECONDS);
- } catch (Throwable e1) {
- logger.error("acquire redis occurred an exception", e1);
- }
- return tryLock;
- }
- /**
- * 释放KEY
- * @return 释放锁 true 成功
- */
- public boolean unlock(String key, String value) {
- RLock fairLock = redissonClient.getFairLock(key + ":" + value);
- try {
- //如果这里抛异常,后续锁无法释放
- if (fairLock.isLocked()) {
- fairLock.unlock();
- logger.info("release lock success");
-
- return true;
- }
- } catch (Throwable e) {
- logger.error("release lock occurred an exception", e);
- }finally {
- fairLock.unlock();
- }
-
- return false;
- }
- }
-
- import com.na.integration.socket.websocket.WebSocketHandler;
- import com.na.model.dto.NettyRedisConnectionDto;
- import com.na.model.vo.NettyRedisConnectionVo;
- import com.na.common.utils.JSONUtils;
- import io.netty.channel.Channel;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import java.time.LocalDateTime;
- import java.util.List;
- import java.util.Map;
-
- @RequestMapping("ws")
- @RestController
- public class WebsocketController {
- @Autowired
- private RedisTemplate redisTemplate;
-
- /**
- * 群发消息
- *
- * @param idList 要把消息发送给其他用户的主键
- */
- @RequestMapping("hello1")
- private Map hello(List<Long> idList) {
- //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
- Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
- //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
- Map<Long, String> clientMap = WebSocketHandler.getClientMap();
- //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
- idList.stream().forEach(id -> {
- String v = clientMap.get(id);
- Channel channel = channelMap.get(v);
- channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy")));
- });
- redisTemplate.convertAndSend("life.all", "hello publish/subscribe");
- return clientMap;
- }
-
-
- /**
- * 向redis 发布/订阅模式发送消息 可采用广播消息集群监听
- * 需要考虑 接收方是否在线,不在线的情况是缓存还是延迟推送
- * 需要考虑是否重复在线,多个实例的本地缓存是否存在同一个id,这样会导致想要接收方混乱的bug
- */
- @RequestMapping("sendMessage")
- private Map sendMessage(@RequestBody NettyRedisConnectionVo vo) {
- //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
- Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
- //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
- Map<Long, String> clientMap = WebSocketHandler.getClientMap();
- //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
- String v = clientMap.get(vo.getSendId());
- Channel channel = null;
- if (v != null) {
- channel = channelMap.get(v);
- Channel finalChannel = channel;
- //需要发送的 与redis监听定义不同的内容方便测试分辨
- channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + ",message=" + vo.getSendMessage())));
- } else {
- channel = (Channel) redisTemplate.opsForValue().get("id");
- //封装序列化
- NettyRedisConnectionDto dto = new NettyRedisConnectionDto()
- .setId(vo.getId())
- .setSendMessage(vo.getSendMessage())
- .setSendId(vo.getSendId())
- .setChannel(channel);
- redisTemplate.convertAndSend("life.all", JSONUtils.bean2JSONObject(dto));
- }
- return clientMap;
- }
-
- }
也可以启动方法代码写在springboot启动类里
WebsocketApplication.java
-
- import com.na.integration.socket.websocket.WebsocketInitialization;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
-
- @Slf4j
- @Component
- public class WebsocketApplication {
-
- @Resource
- private WebsocketInitialization websocketInitialization;
-
- @PostConstruct
- public void start() {
- try {
- log.info(Thread.currentThread().getName() + ":websocket启动中......");
- websocketInitialization.init();
- log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
- } catch (Exception e) {
- log.error("websocket发生错误:",e);
- }
- }
- }
- websocket:
- port: 7000 #端口
- url: /msg #访问url
注意:
1、unionId需要保持全局唯一。
2、websocket端口和springboot端口不一样
这里可以看到用户client1已经连接进来了
指定unionId用户发送请求
http://localhost:18088/ws/sendMessage
7001客户端也接收到了服务器发来的消息
部分代码参照:微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送----netty是yyds_netty 集群_码学弟的博客-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。