当前位置:   article > 正文

SpringBoot分布式Netty集群,通过Redis发布/订阅广播_springboot netty节点

springboot netty节点

一、前言

        之前做用springboot+websocket做双向通讯时,websocket的session存在无法序列化导致集群不能通过共享session来实现,后来采取了记录需要推送客户端ip,然后用http去请求web接口这个不友好的方式。当然需求只需要做扫码登录,这种方式影响也不会有什么影响。但集群问题一直没解决就在心里埋下了个种子。

二、正文

        用netty搭建websocket集群服务,因为netty中的channel是在本地的需要整合rabbitmq或者redis等发布/订阅模式来实现消息发送(或者Channel共享,具体还没考究母鸡能不能实现)。这里用redis做广播(如果用redis需要考虑消息丢失和ack等情况,这里只做演示)

三、流程图 

        根据流程图,用户端client1想给client2发送消息,如果是单机的话就很简单,直接拿到client2的Channel发送Message就行了。但如果是server集群的话,就需要广播消息,让每个集群节点都收到内容,Channel不在本地的话就忽略。

            

四、环境搭建

1、pom文件需要的依赖

  1. <!-- 缓存begin -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>
  6. <!-- redisson -->
  7. <dependency>
  8. <groupId>org.redisson</groupId>
  9. <artifactId>redisson</artifactId>
  10. <version>2.13.0</version>
  11. </dependency>
  12. <!-- netty -->
  13. <dependency>
  14. <groupId>io.netty</groupId>
  15. <artifactId>netty-all</artifactId>
  16. <version>4.1.29.Final</version>
  17. </dependency>
  18. <!--lombok 依赖包 -->
  19. <dependency>
  20. <groupId>org.projectlombok</groupId>
  21. <artifactId>lombok</artifactId>
  22. <optional>true</optional>
  23. </dependency>

2、RedisConfig配置文件,注意:要与MessageListener实现类统一序列化

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.data.redis.connection.RedisConnectionFactory;
  4. import org.springframework.data.redis.core.RedisTemplate;
  5. import org.springframework.data.redis.serializer.RedisSerializer;
  6. import java.net.UnknownHostException;
  7. /**
  8. * @Description
  9. * @Author kele
  10. * @Data 2023/8/31 16:34
  11. */
  12. @Configuration
  13. public class RedisConfig {
  14. @Bean
  15. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
  16. // 将template 泛型设置为 <String, Object>
  17. RedisTemplate<String, Object> template = new RedisTemplate();
  18. // 连接工厂,不必修改
  19. template.setConnectionFactory(redisConnectionFactory);
  20. /*
  21. * 序列化设置
  22. */
  23. // key、hash的key 采用 String序列化方式
  24. template.setKeySerializer(RedisSerializer.string());
  25. template.setHashKeySerializer(RedisSerializer.string());
  26. // value、hash的value 采用 Jackson 序列化方式
  27. template.setValueSerializer(RedisSerializer.json());
  28. template.setHashValueSerializer(RedisSerializer.json());
  29. template.afterPropertiesSet();
  30. return template;
  31. }
  32. }

3.定义redis订阅监听类配置

RedisMessageListenerConfiguration .java

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.listener.PatternTopic;
  6. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. /**
  10. * @Description
  11. * @Author kele
  12. * @Data 2023/8/31 16:07
  13. */
  14. @Configuration
  15. public class RedisMessageListenerConfiguration {
  16. @Autowired
  17. private LifeRedisMessageListener lifeRedisMessageListener;
  18. @Autowired
  19. private RedisConnectionFactory redisConnectionFactory;
  20. /**
  21. * 配置订阅关系
  22. */
  23. @Bean
  24. public RedisMessageListenerContainer container() {
  25. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  26. container.setConnectionFactory(redisConnectionFactory);
  27. //订阅频道
  28. List<PatternTopic> topicList = Arrays.asList(new PatternTopic("life.*"),new PatternTopic("*.life"));
  29. container.addMessageListener(lifeRedisMessageListener, topicList);
  30. return container;
  31. }
  32. }

LifeRedisMessageListener .java

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.na.integration.socket.websocket.WebSocketHandler;
  3. import com.na.model.dto.NettyRedisConnectionDto;
  4. import com.na.common.utils.JSONUtils;
  5. import io.netty.channel.Channel;
  6. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.data.redis.connection.Message;
  10. import org.springframework.data.redis.connection.MessageListener;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.data.redis.serializer.RedisSerializer;
  13. import org.springframework.stereotype.Component;
  14. import java.time.LocalDateTime;
  15. import java.util.Map;
  16. /**
  17. * @Description
  18. * @Author kele
  19. * @Data 2023/8/31 16:07
  20. */
  21. @Component
  22. @Slf4j
  23. public class LifeRedisMessageListener implements MessageListener {
  24. @Autowired
  25. private RedisTemplate redisTemplate;
  26. @Override
  27. public void onMessage(Message message, byte[] pattern) {
  28. //需要在加载bean的时候配置相同的redis序列化器,否则会乱码
  29. RedisSerializer keySerializer = redisTemplate.getKeySerializer();
  30. RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
  31. log.info("----------Life接收到发布者消息----------");
  32. log.info("|频道:{}", keySerializer.deserialize(message.getChannel()));
  33. log.info("|当前监听器绑定的pattern:{}", new String(pattern));
  34. log.info("|消息内容:{}", valueSerializer.deserialize(message.getBody()));
  35. log.info("---------------------------------");
  36. //反序列化
  37. JSONObject jsonObject = JSONUtils.toJsonObj((String) valueSerializer.deserialize(message.getBody()));
  38. NettyRedisConnectionDto dto = new NettyRedisConnectionDto();
  39. Long id = Long.valueOf(jsonObject.get("id").toString());
  40. dto.setId(id);
  41. dto.setChannel((Channel) jsonObject.get("key"));
  42. dto.setSendMessage(jsonObject.get("sendMessage").toString());
  43. dto.setSendId(Long.valueOf(jsonObject.get("sendId").toString()));
  44. /**
  45. * 发送内容
  46. */
  47. //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
  48. Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
  49. //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
  50. Map<Long, String> clientMap = WebSocketHandler.getClientMap();
  51. //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
  52. String v = clientMap.get(dto.getSendId());
  53. Channel channel = null;
  54. try {
  55. channel = channelMap.get(v);
  56. } catch (NullPointerException e) {
  57. log.error("消息id:" + id + "通道不在本地");
  58. return;
  59. }
  60. Channel finalChannel = channel;
  61. channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(
  62. Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "来自id为"
  63. + dto.getId() + ",发送的内容message=" + dto.getSendMessage())));
  64. // redisTemplate.opsForValue().get("");
  65. }

4、消息接收对象

  1. import lombok.Data;
  2. import lombok.experimental.Accessors;
  3. import java.io.Serializable;
  4. @Data
  5. @Accessors(chain = true)
  6. public class MessageRequest implements Serializable {
  7. private Long unionId;
  8. private Integer current = 1;
  9. private Integer size = 10;
  10. }

5、websocket通道初始化器

  1. import io.netty.channel.ChannelInitializer;
  2. import io.netty.channel.ChannelPipeline;
  3. import io.netty.channel.socket.SocketChannel;
  4. import io.netty.handler.codec.http.HttpObjectAggregator;
  5. import io.netty.handler.codec.http.HttpServerCodec;
  6. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  7. import io.netty.handler.stream.ChunkedWriteHandler;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.stereotype.Component;
  11. /**
  12. * @Description websocket通道初始化器
  13. **/
  14. @Component
  15. public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
  16. @Autowired
  17. private WebSocketHandler webSocketHandler;
  18. @Value("${websocket.url}")
  19. private String websocketUrl;
  20. @Override
  21. protected void initChannel(SocketChannel socketChannel) throws Exception {
  22. //获取pipeline通道
  23. ChannelPipeline pipeline = socketChannel.pipeline();
  24. //因为基于http协议,使用http的编码和解码器
  25. pipeline.addLast(new HttpServerCodec());
  26. //是以块方式写,添加ChunkedWriteHandler处理器
  27. pipeline.addLast(new ChunkedWriteHandler());
  28. /*
  29. 说明
  30. 1\. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
  31. 2\. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
  32. */
  33. pipeline.addLast(new HttpObjectAggregator(8192));
  34. /* 说明
  35. 1\. 对应websocket ,它的数据是以 帧(frame) 形式传递
  36. 2\. 可以看到WebSocketFrame 下面有六个子类
  37. 3\. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri
  38. 4\. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
  39. 5\. 是通过一个 状态码 101
  40. */
  41. pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
  42. //自定义的handler ,处理业务逻辑
  43. pipeline.addLast(webSocketHandler);
  44. }
  45. }

6、定义Handler处理器

  1. import com.alibaba.fastjson.JSON;
  2. import com.na.enums.ResultCode;
  3. import com.na.exceptions.RRException;
  4. import com.na.utils.RedisLockUtil;
  5. import io.netty.channel.Channel;
  6. import io.netty.channel.ChannelHandler;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.SimpleChannelInboundHandler;
  9. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  10. import io.netty.util.concurrent.Future;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.commons.lang.StringUtils;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.data.redis.core.RedisTemplate;
  15. import org.springframework.stereotype.Component;
  16. import javax.annotation.Resource;
  17. import java.io.IOException;
  18. import java.time.LocalDateTime;
  19. import java.util.Map;
  20. import java.util.Optional;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. import java.util.concurrent.TimeUnit;
  23. /**
  24. * @Description websocket处理器
  25. **/
  26. @Slf4j
  27. @Component
  28. @ChannelHandler.Sharable//保证处理器,在整个生命周期中就是以单例的形式存在,方便统计客户端的在线数量
  29. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  30. @Autowired
  31. private RedisLockUtil redisLockUtil;
  32. //通道map,存储channel,用于群发消息,以及统计客户端的在线数量,解决问题问题三,如果是集群环境使用redis的hash数据类型存储即可
  33. private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
  34. //任务map,存储future,用于停止队列任务
  35. private static Map<String, Future> futureMap = new ConcurrentHashMap<>();
  36. //存储channel的id和用户主键的映射,客户端保证用户主键传入的是唯一值,解决问题四,如果是集群中需要换成redis的hash数据类型存储即可
  37. private static Map<Long, String> clientMap = new ConcurrentHashMap<>();
  38. @Resource
  39. private RedisTemplate<String, Object> redisTemplate;
  40. /**
  41. * 客户端发送给服务端的消息
  42. *
  43. * @param ctx
  44. * @param msg
  45. * @throws Exception
  46. */
  47. @Override
  48. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  49. try {
  50. //接受客户端发送的消息
  51. MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);
  52. //每个channel都有id,asLongText是全局channel唯一id
  53. String key = ctx.channel().id().asLongText();
  54. //存储channel的id和用户的主键
  55. clientMap.put(messageRequest.getUnionId(), key);
  56. log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");
  57. if (!channelMap.containsKey(key)) {
  58. //使用channel中的任务队列,做周期循环推送客户端消息,解决问题二和问题五
  59. Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
  60. //存储客户端和服务的通信的Chanel
  61. channelMap.put(key, ctx.channel());
  62. //存储每个channel中的future,保证每个channel中有一个定时任务在执行
  63. futureMap.put(key, future);
  64. } else {
  65. //每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
  66. ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
  67. }
  68. } catch (Exception e) {
  69. log.error("websocket服务器推送消息发生错误:", e);
  70. }
  71. }
  72. /**
  73. * 注册时执行
  74. */
  75. @Override
  76. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  77. super.channelRegistered(ctx);
  78. log.info("--channelRegistered注册时执行--" + ctx.channel().id().toString());
  79. }
  80. /**
  81. * 客户端连接时候的操作
  82. *
  83. * @param ctx
  84. * @throws Exception
  85. */
  86. @Override
  87. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  88. log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
  89. }
  90. /**
  91. * 离线时执行
  92. */
  93. @Override
  94. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  95. super.channelUnregistered(ctx);
  96. log.info("--channelUnregistered离线时执行--" + ctx.channel().id().toString());
  97. }
  98. /**
  99. * 客户端掉线时的操作
  100. *
  101. * @param ctx
  102. * @throws Exception
  103. */
  104. @Override
  105. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  106. String key = ctx.channel().id().asLongText();
  107. //移除通信过的channel
  108. channelMap.remove(key);
  109. //移除和用户绑定的channel
  110. clientMap.remove(key);
  111. //关闭掉线客户端的future
  112. Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
  113. future.cancel(true);
  114. futureMap.remove(key);
  115. });
  116. log.info("一个客户端移除......" + ctx.channel().remoteAddress());
  117. ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
  118. }
  119. /**
  120. * 从客户端收到新的数据、读取完成时调用
  121. */
  122. @Override
  123. public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
  124. log.info("--channelReadComplete从客户端收到新的数据--");
  125. ctx.flush();
  126. }
  127. /**
  128. * 发生异常时执行的操作
  129. * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
  130. *
  131. * @param ctx
  132. * @param cause
  133. * @throws Exception
  134. */
  135. @Override
  136. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  137. String key = ctx.channel().id().asLongText();
  138. //移除通信过的channel
  139. channelMap.remove(key);
  140. //移除和用户绑定的channel
  141. clientMap.remove(key);
  142. //移除定时任务
  143. Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
  144. future.cancel(true);
  145. futureMap.remove(key);
  146. });
  147. //关闭长连接
  148. ctx.close();
  149. log.info("异常发生 " + cause.getMessage());
  150. }
  151. public static Map<String, Channel> getChannelMap() {
  152. return channelMap;
  153. }
  154. public static Map<String, Future> getFutureMap() {
  155. return futureMap;
  156. }
  157. public static Map<Long, String> getClientMap() {
  158. return clientMap;
  159. }
  160. /**
  161. * redission 防止重复在线,多个实例的本地缓存是否存在同一个id和与幂等性,这样会导致想要接收方混乱的bug
  162. * @param key
  163. * @param v
  164. */
  165. private void addChannelMap(String key, Channel v) {
  166. try {
  167. //定义keykey的锁
  168. redisLockUtil.lock(key, key, 10000, 3, 5000);
  169. WebSocketHandler.channelMap.put(key, v);
  170. } finally {
  171. redisLockUtil.unlock(key.toString(), key.toString());
  172. }
  173. }
  174. private void addFutureMap(String key, Future v) {
  175. try {
  176. //定义keykey的锁
  177. redisLockUtil.lock(key, key, 10000, 3, 5000);
  178. WebSocketHandler.futureMap.put(key, v);
  179. } finally {
  180. redisLockUtil.unlock(key.toString(), key.toString());
  181. }
  182. }
  183. private void addClientMap(Long key, String v) {
  184. try {
  185. //定义keykey的锁
  186. redisLockUtil.lock(key.toString(), key.toString(), 10000, 3, 5000);
  187. WebSocketHandler.clientMap.put(key, v);
  188. } finally {
  189. redisLockUtil.unlock(key.toString(), key.toString());
  190. }
  191. }
  192. public static void sendMessage(String key, String message) {
  193. if (StringUtils.isEmpty(key)) {
  194. throw new RRException(ResultCode.PARAM_NOT_NULL);
  195. }
  196. Channel channel = channelMap.get(key);
  197. if (channel == null) {
  198. throw new RRException(ResultCode.ID_IS_NULL);
  199. }
  200. try {
  201. channel.writeAndFlush(message);
  202. } catch (Exception e) {
  203. throw new RRException(ResultCode.SOME_USERS_SEND_FAIL);
  204. }
  205. }
  206. }

7、websocket初始化器

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.EventLoopGroup;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.nio.NioServerSocketChannel;
  6. import io.netty.handler.logging.LogLevel;
  7. import io.netty.handler.logging.LoggingHandler;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.scheduling.annotation.Async;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.Resource;
  13. /**
  14. * @Description websocket初始化器
  15. **/
  16. @Slf4j
  17. @Component
  18. public class WebsocketInitialization {
  19. @Resource
  20. private WebsocketChannelInitializer websocketChannelInitializer;
  21. @Value("${websocket.port}")
  22. private Integer port;
  23. @Async
  24. public void init() throws InterruptedException {
  25. //bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
  26. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  27. //workerGroup工作线程组,主要负责网络IO读写
  28. EventLoopGroup workerGroup = new NioEventLoopGroup();
  29. try {
  30. //启动辅助类
  31. ServerBootstrap serverBootstrap = new ServerBootstrap();
  32. //bootstrap绑定两个线程组
  33. serverBootstrap.group(bossGroup, workerGroup);
  34. //设置通道为NioChannel
  35. serverBootstrap.channel(NioServerSocketChannel.class);
  36. //可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
  37. serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
  38. //设置自定义的通道初始化器,用于入站操作
  39. serverBootstrap.childHandler(websocketChannelInitializer);
  40. //启动服务器,本质是Java程序发起系统调用,然后内核底层起了一个处于监听状态的服务,生成一个文件描述符FD
  41. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  42. //异步
  43. channelFuture.channel().closeFuture().sync();
  44. } finally {
  45. bossGroup.shutdownGracefully();
  46. workerGroup.shutdownGracefully();
  47. }
  48. }
  49. }

8、自定义消息发送类

  1. import io.netty.channel.ChannelHandlerContext;
  2. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.time.LocalDateTime;
  5. /**
  6. * @Description 向客户端发送消息
  7. **/
  8. @Slf4j
  9. public class WebsocketRunnable implements Runnable {
  10. private ChannelHandlerContext channelHandlerContext;
  11. private MessageRequest messageRequest;
  12. public WebsocketRunnable(ChannelHandlerContext channelHandlerContext,MessageRequest messageRequest) {
  13. this.channelHandlerContext = channelHandlerContext;
  14. this.messageRequest = messageRequest;
  15. }
  16. @Override
  17. public void run() {
  18. try {
  19. log.info(Thread.currentThread().getName()+"--"+LocalDateTime.now().toString());
  20. channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
  21. } catch (Exception e) {
  22. log.error("websocket服务器推送消息发生错误:",e);
  23. }
  24. }
  25. }

9、如果使用redission来加锁添加配置,不使用(跳过)

RedissonConfiguration.java配置类
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.redisson.Redisson;
  3. import org.redisson.api.RedissonClient;
  4. import org.redisson.config.ClusterServersConfig;
  5. import org.redisson.config.Config;
  6. import org.redisson.config.SingleServerConfig;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.util.StringUtils;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. @Slf4j
  15. @Configuration
  16. public class RedissonConfiguration {
  17. @Autowired
  18. private RedisProperties redisProperties;
  19. /**
  20. * 初始化RedissonClient客户端
  21. * 注意:
  22. * 此实例集群为3节点,各节点1主1从
  23. * 集群模式,集群节点的地址须使用“redis://”前缀,否则将会报错。
  24. *
  25. * @return {@link RedissonClient}
  26. */
  27. @Bean
  28. public RedissonClient getRedissonClient() {
  29. Config config = new Config();
  30. if (redisProperties.getCluster() != null) {
  31. //集群模式配置
  32. List<String> nodes = redisProperties.getCluster().getNodes();
  33. List<String> clusterNodes = new ArrayList<>();
  34. for (int i = 0; i < nodes.size(); i++) {
  35. clusterNodes.add("redis://" + nodes.get(i));
  36. }
  37. ClusterServersConfig clusterServersConfig = config.useClusterServers()
  38. .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
  39. if (!StringUtils.isEmpty(redisProperties.getPassword())) {
  40. clusterServersConfig.setPassword(redisProperties.getPassword());
  41. }
  42. } else {
  43. //单节点配置
  44. String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
  45. SingleServerConfig serverConfig = config.useSingleServer();
  46. serverConfig.setAddress(address);
  47. if (!StringUtils.isEmpty(redisProperties.getPassword())) {
  48. serverConfig.setPassword(redisProperties.getPassword());
  49. }
  50. serverConfig.setDatabase(redisProperties.getDatabase());
  51. }
  52. //看门狗的锁续期时间,默认30000ms,这里配置成15000ms
  53. // config.setLockWatchdogTimeout(15000);
  54. config.setLockWatchdogTimeout(15000);
  55. return Redisson.create(config);
  56. }
  57. }
RedisLockUtil.java
  1. import org.redisson.api.RLock;
  2. import org.redisson.api.RedissonClient;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.core.annotation.Order;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.TimeUnit;
  9. @Component
  10. //@Order(value = 2)
  11. public class RedisLockUtil {
  12. private final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
  13. @Autowired
  14. private RedissonClient redissonClient;
  15. /* @Autowired
  16. public RedisLockUtil(@Qualifier("customRedisson") RedissonClient redissonClient) {
  17. this.redissonClient = redissonClient;
  18. }*/
  19. /**
  20. * 源码
  21. * 1.固定有效期的锁:超过有效期leaseTime后,自动释放锁。
  22. *
  23. * public void lock(long leaseTime, TimeUnit unit) {
  24. * try {
  25. * this.lockInterruptibly(leaseTime, unit);
  26. * } catch (InterruptedException var5) {
  27. * Thread.currentThread().interrupt();
  28. * }
  29. * }
  30. * 2.没有有效期的锁:默认30秒,然后采用Watchdog进行续期,直到业务逻辑执行完毕。
  31. *
  32. * public void lock() {
  33. * try {
  34. * this.lockInterruptibly();
  35. * } catch (InterruptedException var2) {
  36. * Thread.currentThread().interrupt();
  37. * }
  38. * }
  39. * ————————————————
  40. */
  41. /**
  42. * 加锁
  43. * @param key 锁的 key
  44. * @param value value ( key + value 必须保证唯一)
  45. * @param expire key 的过期时间,单位 ms
  46. * @param retryTimes 重试次数,即加锁失败之后的重试次数
  47. * @param retryInterval 重试时间间隔,单位 ms
  48. * @return 加锁 true 成功
  49. */
  50. public RLock lock(String key, String value, long expire, int retryTimes, long retryInterval) {
  51. logger.info("locking... redisK = {}", key);
  52. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
  53. try {
  54. boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);//是否加锁成功
  55. if (tryLock) {
  56. logger.info("locked... redisK = {}", key);
  57. return fairLock;
  58. } else {
  59. //重试获取锁
  60. logger.info("retry to acquire lock: [redisK = {}]", key);
  61. int count = 0;
  62. while(count < retryTimes) {
  63. try {
  64. Thread.sleep(retryInterval);
  65. tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
  66. if(tryLock) {
  67. logger.info("locked... redisK = {}", key);
  68. return fairLock;
  69. }
  70. logger.warn("{} times try to acquire lock", count + 1);
  71. count++;
  72. } catch (Exception e) {
  73. logger.error("acquire redis occurred an exception", e);
  74. break;
  75. }
  76. }
  77. logger.info("fail to acquire lock {}", key);
  78. }
  79. } catch (Throwable e1) {
  80. logger.error("acquire redis occurred an exception", e1);
  81. }
  82. return fairLock;
  83. }
  84. /**
  85. * 加锁
  86. * @param key 锁的 key
  87. * @param value value ( key + value 必须保证唯一)
  88. * @param expire key 的过期时间,单位 ms
  89. * @param retryTimes 重试次数,即加锁失败之后的重试次数
  90. * @param retryInterval 重试时间间隔,单位 ms
  91. * @return 加锁 true 成功
  92. */
  93. public boolean lock2(String key, String value, long expire, int retryTimes, long retryInterval) {
  94. logger.info("locking... redisK = {}", key);
  95. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
  96. try {
  97. boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
  98. if (tryLock) {
  99. logger.info("locked... redisK = {}", key);
  100. return true;
  101. } else {
  102. //重试获取锁
  103. logger.info("retry to acquire lock: [redisK = {}]", key);
  104. int count = 0;
  105. while(count < retryTimes) {
  106. try {
  107. Thread.sleep(retryInterval);
  108. tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
  109. if(tryLock) {
  110. logger.info("locked... redisK = {}", key);
  111. return true;
  112. }
  113. logger.warn("{} times try to acquire lock", count + 1);
  114. count++;
  115. } catch (Exception e) {
  116. logger.error("acquire redis occurred an exception", e);
  117. break;
  118. }
  119. }
  120. logger.info("fail to acquire lock {}", key);
  121. return false;
  122. }
  123. } catch (Throwable e1) {
  124. logger.error("acquire redis occurred an exception", e1);
  125. return false;
  126. }
  127. }
  128. /**
  129. * 加锁
  130. * @param key 锁的 key
  131. * @param value value ( key + value 必须保证唯一)
  132. * @param expire key 的过期时间,单位 ms
  133. * @return 加锁 true 成功
  134. */
  135. public boolean lockCheck(String key, String value, long expire) {
  136. logger.info("locking... redisK = {}", key);
  137. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
  138. boolean tryLock = false;
  139. try {
  140. tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
  141. } catch (Throwable e1) {
  142. logger.error("acquire redis occurred an exception", e1);
  143. }
  144. return tryLock;
  145. }
  146. /**
  147. * 加锁
  148. * @param key 锁的 key
  149. * @param value value ( key + value 必须保证唯一)
  150. * @param expire key 的过期时间,单位 ms
  151. * @return 加锁 true 成功
  152. */
  153. public boolean lockDog(String key, String value, long expire) {
  154. logger.info("locking... redisK = {}", key);
  155. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
  156. boolean tryLock = false;
  157. try {
  158. fairLock.tryLock(0, TimeUnit.MILLISECONDS);
  159. } catch (Throwable e1) {
  160. logger.error("acquire redis occurred an exception", e1);
  161. }
  162. return tryLock;
  163. }
  164. /**
  165. * 释放KEY
  166. * @return 释放锁 true 成功
  167. */
  168. public boolean unlock(String key, String value) {
  169. RLock fairLock = redissonClient.getFairLock(key + ":" + value);
  170. try {
  171. //如果这里抛异常,后续锁无法释放
  172. if (fairLock.isLocked()) {
  173. fairLock.unlock();
  174. logger.info("release lock success");
  175. return true;
  176. }
  177. } catch (Throwable e) {
  178. logger.error("release lock occurred an exception", e);
  179. }finally {
  180. fairLock.unlock();
  181. }
  182. return false;
  183. }
  184. }

 10、定义controller,自定义发送消息

  1. import com.na.integration.socket.websocket.WebSocketHandler;
  2. import com.na.model.dto.NettyRedisConnectionDto;
  3. import com.na.model.vo.NettyRedisConnectionVo;
  4. import com.na.common.utils.JSONUtils;
  5. import io.netty.channel.Channel;
  6. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.data.redis.core.RedisTemplate;
  9. import org.springframework.web.bind.annotation.RequestBody;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.time.LocalDateTime;
  13. import java.util.List;
  14. import java.util.Map;
  15. @RequestMapping("ws")
  16. @RestController
  17. public class WebsocketController {
  18. @Autowired
  19. private RedisTemplate redisTemplate;
  20. /**
  21. * 群发消息
  22. *
  23. * @param idList 要把消息发送给其他用户的主键
  24. */
  25. @RequestMapping("hello1")
  26. private Map hello(List<Long> idList) {
  27. //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
  28. Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
  29. //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
  30. Map<Long, String> clientMap = WebSocketHandler.getClientMap();
  31. //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
  32. idList.stream().forEach(id -> {
  33. String v = clientMap.get(id);
  34. Channel channel = channelMap.get(v);
  35. channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy")));
  36. });
  37. redisTemplate.convertAndSend("life.all", "hello publish/subscribe");
  38. return clientMap;
  39. }
  40. /**
  41. * 向redis 发布/订阅模式发送消息 可采用广播消息集群监听
  42. * 需要考虑 接收方是否在线,不在线的情况是缓存还是延迟推送
  43. * 需要考虑是否重复在线,多个实例的本地缓存是否存在同一个id,这样会导致想要接收方混乱的bug
  44. */
  45. @RequestMapping("sendMessage")
  46. private Map sendMessage(@RequestBody NettyRedisConnectionVo vo) {
  47. //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
  48. Map<String, Channel> channelMap = WebSocketHandler.getChannelMap();
  49. //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
  50. Map<Long, String> clientMap = WebSocketHandler.getClientMap();
  51. //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
  52. String v = clientMap.get(vo.getSendId());
  53. Channel channel = null;
  54. if (v != null) {
  55. channel = channelMap.get(v);
  56. Channel finalChannel = channel;
  57. //需要发送的 与redis监听定义不同的内容方便测试分辨
  58. channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + ",message=" + vo.getSendMessage())));
  59. } else {
  60. channel = (Channel) redisTemplate.opsForValue().get("id");
  61. //封装序列化
  62. NettyRedisConnectionDto dto = new NettyRedisConnectionDto()
  63. .setId(vo.getId())
  64. .setSendMessage(vo.getSendMessage())
  65. .setSendId(vo.getSendId())
  66. .setChannel(channel);
  67. redisTemplate.convertAndSend("life.all", JSONUtils.bean2JSONObject(dto));
  68. }
  69. return clientMap;
  70. }
  71. }

11、启动类配置(与springBoot启动类区分)

也可以启动方法代码写在springboot启动类里

WebsocketApplication.java
  1. import com.na.integration.socket.websocket.WebsocketInitialization;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.PostConstruct;
  5. import javax.annotation.Resource;
  6. @Slf4j
  7. @Component
  8. public class WebsocketApplication {
  9. @Resource
  10. private WebsocketInitialization websocketInitialization;
  11. @PostConstruct
  12. public void start() {
  13. try {
  14. log.info(Thread.currentThread().getName() + ":websocket启动中......");
  15. websocketInitialization.init();
  16. log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
  17. } catch (Exception e) {
  18. log.error("websocket发生错误:",e);
  19. }
  20. }
  21. }

12、yml配置,其他配置根据自己实际情况来

  1. websocket:
  2. port: 7000 #端口
  3. url: /msg #访问url

五、集群测试

1、启动两个springboot server实例端口分别18088、18089,websocket端口分别是7000、7001,用测试工具创建ws协议请求,

注意:

1、unionId需要保持全局唯一

2、websocket端口和springboot端口不一样

这里可以看到用户client1已经连接进来了

2、如果要是用群发或者指定用户的话,就需要用到广播模式。

指定unionId用户发送请求

http://localhost:18088/ws/sendMessage

3 这里可以看到unionId=2的Channel实例在18089服务上从而发送成功,而18088的实例没有unionId=2的Channel就忽略。

7001客户端也接收到了服务器发来的消息 

部分代码参照:微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送----netty是yyds_netty 集群_码学弟的博客-CSDN博客 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号