当前位置:   article > 正文

SpringBoot整合Netty+Websocket实现消息推送_springboot 整合websoket与netty

springboot 整合websoket与netty

前言

       Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。以下是Netty的主要优势:

  1. 高性能:Netty基于NIO(非阻塞IO)模型,采用事件驱动的设计,具有高性能的特点。它通过零拷贝技术、内存池化技术等手段,进一步提高了IO性能,降低了资源消耗。
  2. 易用性:Netty提供了丰富的API和功能,如对TCP、UDP和文件传输的支持,以及对SSL/TLS、压缩、编解码等功能的内置实现。这些功能简化了网络应用的开发,降低了学习和使用的难度。
  3. 可扩展性:Netty采用了模块化设计,各个模块之间耦合度低,易于扩展。开发者可以根据需要定制和扩展Netty的功能,如添加新的编解码器、处理器或协议。
  4. 稳定性:Netty经过了大规模生产环境的验证,具有高稳定性。它通过合理的线程模型、资源管理和错误处理机制,保证了系统的稳定性和可靠性。
  5. 社区活跃:Netty拥有一个活跃的开源社区,不断有新的功能和优化被贡献出来。这为开发者提供了强大的支持,也促进了Netty的发展和完善。
  6. 跨平台性:Netty可以在多种操作系统和平台上运行,如Windows、Linux和Mac OS等。这一特性使得开发者可以轻松地在不同环境下部署和维护网络应用。

        WebSocket 是一种网络通信协议,相比传统的HTTP协议,它具有以下优势: 

  1. 实时性:WebSocket 允许服务器主动向客户端推送数据,从而实现实时通信,这对于需要即时反馈的应用(如在线游戏、聊天应用等)至关重要。
  2. 全双工通信:WebSocket 支持双向通信,服务器和客户端可以同时发送和接收数据,提高了通信的灵活性。
  3. 节省带宽:由于 WebSocket 在单个 TCP 连接上运行,避免了为每个消息创建新连接所需的额外开销,减少了数据传输量。
  4. 更好的二进制支持:WebSocket 定义了二进制帧,可以更轻松地处理二进制内容,如图片、音视频等。
  5. 跨域通信:WebSocket 支持跨域通信,使得客户端可以与不同域名的服务器进行通信,增加了应用的灵活性和可访问性。
  6. 可扩展性:WebSocket 定义了扩展机制,用户可以根据需要扩展协议或实现自定义的子协议。
  7. 更好的支持实时应用:由于 WebSocket 的实时性和全双工通信特性,它特别适合用于需要实时反馈的应用,例如在线游戏、实时音视频聊天等。
  8. 更快的传输速度:由于 WebSocket 减少了不必要的连接和状态转换,通信速度更快。
  9. 更低的延迟:由于 WebSocket 建立的是持久连接,减少了建立和关闭连接的开销,从而降低了通信延迟。
  10. 更强的兼容性:虽然 WebSocket 协议并未在所有浏览器中得到完全支持,但有各种库和框架可以帮助实现兼容性,例如通过 polyfill 技术。

说明:以下为SpringBoot整合Netty+Websocket实现实时的消息通讯

一、引入Netty依赖

  1. <!--netty-->
  2. <dependency>
  3. <groupId>io.netty</groupId>
  4. <artifactId>netty-all</artifactId>
  5. <version>4.1.25.Final</version>
  6. </dependency>

二、 使用步骤

1.配置服务启动类

  1. package com.pzg.chat.communication;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.nio.NioServerSocketChannel;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. @Slf4j
  11. @Component
  12. public class WebSocketNettyServer {
  13. @Autowired
  14. WebSocketChannelInitializer webSocketChannelInitializer;
  15. /**
  16. * Netty服务器启动对象
  17. */
  18. private final ServerBootstrap serverBootstrap = new ServerBootstrap();;
  19. @PostConstruct
  20. public void WebSocketNettyServerInit() {
  21. // 初始化服务器启动对象
  22. // 主线程池
  23. NioEventLoopGroup mainGrp = new NioEventLoopGroup();
  24. // 从线程池
  25. NioEventLoopGroup subGrp = new NioEventLoopGroup();
  26. serverBootstrap
  27. // 指定使用上面创建的两个线程池
  28. .group(mainGrp, subGrp)
  29. // 指定Netty通道类型
  30. .channel(NioServerSocketChannel.class)
  31. // 指定通道初始化器用来加载当Channel收到事件消息后
  32. .childHandler(webSocketChannelInitializer);
  33. }
  34. public void start() throws InterruptedException {
  35. // 绑定服务器端口,以异步的方式启动服务器
  36. ChannelFuture future = serverBootstrap.bind("0.0.0.0",8089).sync();
  37. if (future.isSuccess()){
  38. log.info("netty初始化完成,端口8088");
  39. }
  40. }
  41. }

        说明:@PostConstruct用来保证容器初始化后触发该注解下的方法

 2.Netty服务初始化器

  1. package com.pzg.chat.communication;
  2. import com.pzg.chat.handler.ChatHandler;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelPipeline;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.handler.codec.http.HttpObjectAggregator;
  7. import io.netty.handler.codec.http.HttpServerCodec;
  8. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  9. import io.netty.handler.stream.ChunkedWriteHandler;
  10. import io.netty.handler.timeout.IdleStateHandler;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. @Component
  14. public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
  15. @Autowired
  16. private ChatHandler chatHandler;
  17. @Override
  18. protected void initChannel(SocketChannel socketChannel) {
  19. //获取对应的管道
  20. ChannelPipeline pipeline = socketChannel.pipeline();
  21. pipeline
  22. //添加HTTP编码解码器
  23. .addLast(new HttpServerCodec())
  24. //添加对大数据流的支持
  25. .addLast(new ChunkedWriteHandler())
  26. //添加聚合器
  27. .addLast(new HttpObjectAggregator(1024 * 64))
  28. //设置websocket连接前缀前缀
  29. //心跳检查(8秒)
  30. .addLast(new IdleStateHandler(8,0,0))
  31. //添加自定义处理器
  32. .addLast(new WebSocketServerProtocolHandler("/ws",null,true))
  33. .addLast(chatHandler);
  34. }
  35. }

3.自定义处理器类

  1. package com.pzg.chat.handler;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.pzg.chat.communication.context.impl.WebSocketContext;
  5. import com.pzg.chat.service.ChannelService;
  6. import io.netty.channel.*;
  7. import io.netty.channel.group.ChannelGroup;
  8. import io.netty.channel.group.DefaultChannelGroup;
  9. import io.netty.handler.codec.http.FullHttpRequest;
  10. import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
  11. import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
  12. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  13. import io.netty.handler.codec.http.websocketx.WebSocketFrame;
  14. import io.netty.handler.timeout.IdleState;
  15. import io.netty.handler.timeout.IdleStateEvent;
  16. import io.netty.util.concurrent.GlobalEventExecutor;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.stereotype.Component;
  20. import javax.annotation.PostConstruct;
  21. @ChannelHandler.Sharable
  22. @SuppressWarnings("all")
  23. @Component
  24. @Slf4j
  25. public class ChatHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
  26. private static ChatHandler chatHandler;
  27. @Autowired
  28. private ChannelService channelService;
  29. @Autowired
  30. private WebSocketContext webSocketContext;
  31. @PostConstruct
  32. public void init() {
  33. chatHandler = this;
  34. }
  35. /**
  36. * 创建ChannelGroup对象存储所有连接的用户
  37. */
  38. private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  39. /**
  40. * 有新消息时会调用这个方法
  41. *
  42. * @param channelHandlerContext 上下文处理器
  43. * @param textWebSocketFrame 文本
  44. * @throws Exception
  45. */
  46. @Override
  47. @SuppressWarnings("all")
  48. public void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
  49. System.out.println(frame.getClass());
  50. if (frame instanceof FullHttpRequest) {
  51. }
  52. //判断是否为关闭事件
  53. if (frame instanceof CloseWebSocketFrame) {
  54. ctx.channel().close();
  55. return;
  56. }
  57. if (frame instanceof PingWebSocketFrame) {
  58. return;
  59. }
  60. if (frame instanceof TextWebSocketFrame) {
  61. TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
  62. JSONObject jsonObject = JSON.parseObject(textWebSocketFrame.text());
  63. webSocketContext.executeWebSocketContext(jsonObject,ctx.channel());
  64. //类型转为(前后端达成的消息体)
  65. }
  66. //遍历出所有连接的通道
  67. }
  68. /**
  69. * 有新的连接建立时
  70. *
  71. * @param ctx
  72. * @throws Exception
  73. */
  74. @Override
  75. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  76. //加入通道组
  77. clients.add(ctx.channel());
  78. }
  79. /**
  80. * 不活跃时会调用这个方法
  81. *
  82. * @param ctx
  83. * @throws Exception
  84. */
  85. @Override
  86. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  87. //移除出通道组
  88. try {
  89. channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText());
  90. channelService.deleteChannelBindUserId(ctx.channel());
  91. }catch (Exception e){
  92. }
  93. clients.remove(ctx.channel());
  94. }
  95. @Override
  96. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  97. // 获取参数
  98. super.channelActive(ctx);
  99. }
  100. //检查客户端写心跳事件
  101. @Override
  102. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  103. Channel channel = ctx.channel();
  104. if (evt instanceof IdleStateEvent) {
  105. IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
  106. if (idleStateEvent.state() == IdleState.READER_IDLE) {
  107. try {
  108. channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText());
  109. channelService.deleteChannelBindUserId(ctx.channel());
  110. }catch (Exception e){
  111. }
  112. clients.remove(channel);
  113. channel.close();
  114. }
  115. } else {
  116. super.userEventTriggered(ctx, evt);
  117. }
  118. }
  119. }

        说明:webSocketContext.executeWebSocketContext(jsonObject,ctx.channel()); 为自己定义的处理消息的类,textWebSocketFrame.text()为对应的消息,可自行处理

4.缓存用户Channel接口和对应实现类

1.接口

  1. package com.pzg.chat.service;
  2. import io.netty.channel.Channel;
  3. public interface ChannelService {
  4. void setUserIdBindChannel(Channel channel);
  5. void setIdBindChannel(String id,Channel channel);
  6. void setChannelBindUserId(Channel channel);
  7. String getChannelBindUserId(Channel channel);
  8. void deleteChannelBindUserId(Channel channel);
  9. void deleteBindUserIdChannel();
  10. void deleteBindIdChannel(String id);
  11. void setUserIdAndIdBindChannel(String id ,Channel channel);
  12. void deleteBindUserIdAndIdChannel(String id);
  13. Channel getUserIdChannel(String userId);
  14. Channel getIdBindChannel(String Id);
  15. }

2.实现类 

  1. package com.pzg.chat.service.impl;
  2. import com.pzg.chat.service.ChannelService;
  3. import com.pzg.chat.utils.UserUtil;
  4. import io.netty.channel.Channel;
  5. import org.springframework.stereotype.Service;
  6. import java.util.*;
  7. @Service
  8. public class ChannelServiceImpl implements ChannelService {
  9. //保存用户id和channel的映射
  10. public static HashMap<String,Channel> userIdChannel = new HashMap<>();
  11. //保存channelId和channel映射关系
  12. public static HashMap<String,Channel> idChannel = new HashMap<>();
  13. //保存channel和userID映射关系
  14. public static HashMap<Channel,String> ChannelUserId = new HashMap<>();
  15. @Override
  16. public void setUserIdBindChannel(Channel channel) {
  17. String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
  18. userIdChannel.put(userId,channel);
  19. }
  20. @Override
  21. public void setIdBindChannel(String id, Channel channel) {
  22. idChannel.put(id,channel);
  23. }
  24. @Override
  25. public void setChannelBindUserId(Channel channel) {
  26. String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
  27. System.out.println("----------------------->"+userId);
  28. ChannelUserId.put(channel,userId);
  29. }
  30. @Override
  31. public String getChannelBindUserId(Channel channel) {
  32. return ChannelUserId.get(channel);
  33. }
  34. @Override
  35. public void deleteChannelBindUserId(Channel channel) {
  36. ChannelUserId.remove(channel);
  37. }
  38. @Override
  39. public void deleteBindUserIdChannel() {
  40. String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
  41. userIdChannel.remove(userId);
  42. }
  43. @Override
  44. public void deleteBindIdChannel(String id) {
  45. idChannel.remove(id);
  46. }
  47. @Override
  48. public void setUserIdAndIdBindChannel(String id, Channel channel) {
  49. setUserIdBindChannel(channel);
  50. setIdBindChannel(id,channel);
  51. }
  52. @Override
  53. public void deleteBindUserIdAndIdChannel(String id) {
  54. deleteBindIdChannel(id);
  55. deleteBindUserIdChannel();
  56. }
  57. @Override
  58. public Channel getUserIdChannel(String userId) {
  59. return userIdChannel.get(userId);
  60. }
  61. @Override
  62. public Channel getIdBindChannel(String Id) {
  63. return idChannel.get(Id);
  64. }
  65. }

        说明:缓存Channel主要保证消息能发送到对应的Channel中,消息可携带用户id通过id查找Channel,将信息存入即可 ,通过channel.writeAndFlush(new TextWebSocketFrame("消息内容"));刷出消息。

5.调用start()启动Netty服务

  1. package com.pzg.chat.listener;
  2. import com.pzg.chat.communication.WebSocketNettyServer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.ApplicationListener;
  5. import org.springframework.context.event.ContextRefreshedEvent;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class NettyStartListener implements ApplicationListener<ContextRefreshedEvent> {
  9. /**
  10. * 注入启动器
  11. */
  12. @Autowired
  13. private WebSocketNettyServer webSocketNettyServer;
  14. @Override
  15. public void onApplicationEvent(ContextRefreshedEvent event) {
  16. //判断event上下文中的父级是否为空
  17. if (event.getApplicationContext().getParent() == null) {
  18. try {
  19. //为空则调用start方法
  20. webSocketNettyServer.start();
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }

6.Websocket配置

  1. // 导出socket对象
  2. import {getToken} from '@/utils/auth';
  3. export {
  4. socket
  5. }
  6. import { Message } from 'element-ui'
  7. import {header} from "../../listening/header";
  8. import {asidefriend} from "../../listening/asidefriend";
  9. import {chatbox} from "../../listening/chatbox";
  10. import {chatcontent} from "../../listening/chatcontent";
  11. import {videocalls} from "../../listening/videocalls";
  12. import {voicecalls} from "../../listening/voicecalls";
  13. // socket主要对象
  14. var socket = {
  15. websock: null,
  16. ws_url: "ws://localhost:8089/ws",
  17. /**
  18. * 开启标识
  19. * */
  20. socket_open: false,
  21. /**
  22. * 心跳timer
  23. * */
  24. hearbeat_timer: null,
  25. /**
  26. * 心跳发送频率
  27. * */
  28. hearbeat_interval: 5000,
  29. /**
  30. * 是否开启重连
  31. * */
  32. is_reonnect: true,
  33. /**
  34. * 重新连接的次数
  35. * */
  36. reconnect_count: 3,
  37. /**
  38. * 当前重新连接的次数,默认为:1
  39. * */
  40. reconnect_current: 1,
  41. /**
  42. * 重新连接的时间类型
  43. * */
  44. reconnect_timer: null,
  45. /**
  46. * 重新连接的间隔
  47. * */
  48. reconnect_interval: 3000,
  49. i : 0,
  50. timer:null,
  51. /**
  52. * 初始化连接
  53. */
  54. init: () => {
  55. if (!("WebSocket" in window)) {
  56. Message({
  57. message: '当前浏览器与网站不兼容丫',
  58. type: 'error',
  59. });
  60. return null
  61. }
  62. if (socket.websock && socket.websock.readyState===1) {
  63. return socket.websock
  64. }
  65. socket.websock = new WebSocket(socket.ws_url)
  66. //接收消息
  67. socket.websock.onmessage = function (e) {
  68. //调用处理消息的方法
  69. socket.receive(e)
  70. }
  71. // 关闭连接
  72. socket.websock.onclose = function (e) {
  73. clearInterval(socket.hearbeat_interval);
  74. socket.socket_open = false;
  75. if (socket.websock!=null){
  76. header.getWebsocketStatus(socket.websock.readyState);
  77. }
  78. // 需要重新连接
  79. if (socket.is_reonnect) {
  80. socket.reconnect_timer = setTimeout(() => {
  81. // 超过重连次数
  82. if (socket.reconnect_current > socket.reconnect_count) {
  83. clearTimeout(socket.reconnect_timer)
  84. return
  85. }
  86. // 记录重连次数
  87. socket.reconnect_current++
  88. socket.reconnect()
  89. }, socket.reconnect_interval)
  90. }
  91. }
  92. // 连接成功
  93. socket.websock.onopen = function () {
  94. // Message({
  95. // message: '连接成功',
  96. // type: 'success',
  97. // });
  98. header.getWebsocketStatus(socket.websock.readyState);
  99. let data = {
  100. "action": 10002,
  101. "token":getToken(),
  102. "chatMsg": null,
  103. "extend": 1,
  104. };
  105. socket.send(data);
  106. socket.socket_open = true;
  107. socket.is_reonnect = true;
  108. //重修刷新好友内容
  109. window.dispatchEvent(new CustomEvent('connectInit'));
  110. // 开启心跳
  111. socket.heartbeat()
  112. };
  113. // 连接发生错误
  114. socket.websock.onerror = function (err) {
  115. Message({
  116. message: '服务连接发送错误!',
  117. type: 'error',
  118. });
  119. }
  120. },
  121. /**
  122. * 获取websocket对象
  123. * */
  124. getSocket:()=>{
  125. //创建了直接返回,反之重来
  126. if (socket.websock) {
  127. return socket.websock
  128. }else {
  129. socket.init();
  130. }
  131. },
  132. getStatus:()=> {
  133. if (socket.websock.readyState === 0) {
  134. return "未连接";
  135. } else if (socket.websock.readyState === 1) {
  136. return "已连接";
  137. } else if (socket.websock.readyState === 2) {
  138. return "连接正在关闭";
  139. } else if (socket.websock.readyState === 3) {
  140. return "连接已关闭";
  141. }
  142. },
  143. /**
  144. * 发送消息
  145. * @param {*} data 发送数据
  146. * @param {*} callback 发送后的自定义回调函数
  147. */
  148. send: (data, callback = null) => {
  149. // 开启状态直接发送
  150. if (socket.websock!=null && socket.websock.readyState === socket.websock.OPEN) {
  151. try {
  152. socket.websock.send(JSON.stringify(data));
  153. }catch (e) {
  154. if (socket.timer !=null){
  155. return
  156. }
  157. socket.timer = setInterval(()=>{
  158. if (i>=6){
  159. clearInterval(socket.timer);
  160. socket.timer = null;
  161. socket.i = 0;
  162. return
  163. }
  164. socket.websock.send(JSON.stringify(data));
  165. socket.i++;
  166. },2000)
  167. }
  168. if (callback) {
  169. callback()
  170. }
  171. // 正在开启状态,则等待1s后重新调用
  172. } else if (socket.websock!=null && socket.websock.readyState === socket.websock.CONNECTING) {
  173. setTimeout(function () {
  174. socket.send(data, callback)
  175. }, 1000)
  176. // 未开启,则等待1s后重新调用
  177. } else if (socket.websock!=null){
  178. socket.init();
  179. setTimeout(function () {
  180. socket.send(data, callback)
  181. }, 1000)
  182. }
  183. },
  184. /**
  185. * 接收消息
  186. * @param {*} message 接收到的消息
  187. */
  188. receive: (message) => {
  189. var recData = JSON.parse(message.data);
  190. /**
  191. *这部分是我们具体的对消息的处理
  192. * */
  193. console.log(recData)
  194. // 自行扩展其他业务处理...
  195. },
  196. /**
  197. * 心跳
  198. */
  199. heartbeat: () => {
  200. if (socket.hearbeat_timer) {
  201. clearInterval(socket.hearbeat_timer)
  202. }
  203. socket.hearbeat_timer = setInterval(() => {
  204. //发送心跳包
  205. let data = {
  206. "action": 10000,
  207. "token":getToken(),
  208. "chatMsg": null,
  209. "extend": null,
  210. };
  211. socket.send(data)
  212. }, socket.hearbeat_interval)
  213. },
  214. /**
  215. * 主动关闭连接
  216. */
  217. close: () => {
  218. if (socket.websock==null){
  219. return
  220. }
  221. let data = {
  222. "action": 10002,
  223. "token":getToken(),
  224. "chatMsg": null,
  225. "extend": 0,
  226. };
  227. socket.send(data);
  228. clearInterval(socket.hearbeat_interval);
  229. socket.is_reonnect = false;
  230. socket.websock.close();
  231. header.getWebsocketStatus(socket.websock.readyState);
  232. socket.websock=null
  233. },
  234. /**
  235. * 重新连接
  236. */
  237. reconnect: () => {
  238. if (socket.websock && socket.socket_open) {
  239. socket.websock.close()
  240. }
  241. socket.init()
  242. },
  243. }

        说明:通过登入后,在某个全局页面中调用socket.start()即可连接netty服务器,通过socket.send("消息")来发送消息。

三、结束语

        以上就是整个的使用教程,如果报错或者有异常,可以在私信我,看到消息第一时间帮你解决,坚持就是胜利,加油!。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/404169
推荐阅读
相关标签
  

闽ICP备14008679号