赞
踩
Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。以下是Netty的主要优势:
- 高性能:Netty基于NIO(非阻塞IO)模型,采用事件驱动的设计,具有高性能的特点。它通过零拷贝技术、内存池化技术等手段,进一步提高了IO性能,降低了资源消耗。
- 易用性:Netty提供了丰富的API和功能,如对TCP、UDP和文件传输的支持,以及对SSL/TLS、压缩、编解码等功能的内置实现。这些功能简化了网络应用的开发,降低了学习和使用的难度。
- 可扩展性:Netty采用了模块化设计,各个模块之间耦合度低,易于扩展。开发者可以根据需要定制和扩展Netty的功能,如添加新的编解码器、处理器或协议。
- 稳定性:Netty经过了大规模生产环境的验证,具有高稳定性。它通过合理的线程模型、资源管理和错误处理机制,保证了系统的稳定性和可靠性。
- 社区活跃:Netty拥有一个活跃的开源社区,不断有新的功能和优化被贡献出来。这为开发者提供了强大的支持,也促进了Netty的发展和完善。
- 跨平台性:Netty可以在多种操作系统和平台上运行,如Windows、Linux和Mac OS等。这一特性使得开发者可以轻松地在不同环境下部署和维护网络应用。
WebSocket 是一种网络通信协议,相比传统的HTTP协议,它具有以下优势:
- 实时性:WebSocket 允许服务器主动向客户端推送数据,从而实现实时通信,这对于需要即时反馈的应用(如在线游戏、聊天应用等)至关重要。
- 全双工通信:WebSocket 支持双向通信,服务器和客户端可以同时发送和接收数据,提高了通信的灵活性。
- 节省带宽:由于 WebSocket 在单个 TCP 连接上运行,避免了为每个消息创建新连接所需的额外开销,减少了数据传输量。
- 更好的二进制支持:WebSocket 定义了二进制帧,可以更轻松地处理二进制内容,如图片、音视频等。
- 跨域通信:WebSocket 支持跨域通信,使得客户端可以与不同域名的服务器进行通信,增加了应用的灵活性和可访问性。
- 可扩展性:WebSocket 定义了扩展机制,用户可以根据需要扩展协议或实现自定义的子协议。
- 更好的支持实时应用:由于 WebSocket 的实时性和全双工通信特性,它特别适合用于需要实时反馈的应用,例如在线游戏、实时音视频聊天等。
- 更快的传输速度:由于 WebSocket 减少了不必要的连接和状态转换,通信速度更快。
- 更低的延迟:由于 WebSocket 建立的是持久连接,减少了建立和关闭连接的开销,从而降低了通信延迟。
- 更强的兼容性:虽然 WebSocket 协议并未在所有浏览器中得到完全支持,但有各种库和框架可以帮助实现兼容性,例如通过 polyfill 技术。
说明:以下为SpringBoot整合Netty+Websocket实现实时的消息通讯
- <!--netty-->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.25.Final</version>
- </dependency>
- package com.pzg.chat.communication;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- @Slf4j
- @Component
- public class WebSocketNettyServer {
-
- @Autowired
- WebSocketChannelInitializer webSocketChannelInitializer;
- /**
- * Netty服务器启动对象
- */
- private final ServerBootstrap serverBootstrap = new ServerBootstrap();;
-
- @PostConstruct
- public void WebSocketNettyServerInit() {
- // 初始化服务器启动对象
- // 主线程池
- NioEventLoopGroup mainGrp = new NioEventLoopGroup();
- // 从线程池
- NioEventLoopGroup subGrp = new NioEventLoopGroup();
- serverBootstrap
- // 指定使用上面创建的两个线程池
- .group(mainGrp, subGrp)
- // 指定Netty通道类型
- .channel(NioServerSocketChannel.class)
- // 指定通道初始化器用来加载当Channel收到事件消息后
- .childHandler(webSocketChannelInitializer);
- }
-
- public void start() throws InterruptedException {
- // 绑定服务器端口,以异步的方式启动服务器
- ChannelFuture future = serverBootstrap.bind("0.0.0.0",8089).sync();
- if (future.isSuccess()){
- log.info("netty初始化完成,端口8088");
- }
- }
- }
说明:@PostConstruct用来保证容器初始化后触发该注解下的方法
- package com.pzg.chat.communication;
-
- import com.pzg.chat.handler.ChatHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- import io.netty.handler.stream.ChunkedWriteHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- @Autowired
- private ChatHandler chatHandler;
-
- @Override
- protected void initChannel(SocketChannel socketChannel) {
- //获取对应的管道
- ChannelPipeline pipeline = socketChannel.pipeline();
-
- pipeline
- //添加HTTP编码解码器
- .addLast(new HttpServerCodec())
- //添加对大数据流的支持
- .addLast(new ChunkedWriteHandler())
- //添加聚合器
- .addLast(new HttpObjectAggregator(1024 * 64))
- //设置websocket连接前缀前缀
- //心跳检查(8秒)
- .addLast(new IdleStateHandler(8,0,0))
- //添加自定义处理器
- .addLast(new WebSocketServerProtocolHandler("/ws",null,true))
- .addLast(chatHandler);
-
- }
- }
- package com.pzg.chat.handler;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.pzg.chat.communication.context.impl.WebSocketContext;
- import com.pzg.chat.service.ChannelService;
- import io.netty.channel.*;
- import io.netty.channel.group.ChannelGroup;
- import io.netty.channel.group.DefaultChannelGroup;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import io.netty.util.concurrent.GlobalEventExecutor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- @ChannelHandler.Sharable
- @SuppressWarnings("all")
- @Component
- @Slf4j
- public class ChatHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
-
- private static ChatHandler chatHandler;
-
- @Autowired
- private ChannelService channelService;
-
-
- @Autowired
- private WebSocketContext webSocketContext;
-
-
- @PostConstruct
- public void init() {
- chatHandler = this;
- }
-
- /**
- * 创建ChannelGroup对象存储所有连接的用户
- */
- private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-
- /**
- * 有新消息时会调用这个方法
- *
- * @param channelHandlerContext 上下文处理器
- * @param textWebSocketFrame 文本
- * @throws Exception
- */
- @Override
- @SuppressWarnings("all")
- public void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
- System.out.println(frame.getClass());
- if (frame instanceof FullHttpRequest) {
- }
- //判断是否为关闭事件
- if (frame instanceof CloseWebSocketFrame) {
- ctx.channel().close();
- return;
- }
- if (frame instanceof PingWebSocketFrame) {
- return;
- }
- if (frame instanceof TextWebSocketFrame) {
- TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
- JSONObject jsonObject = JSON.parseObject(textWebSocketFrame.text());
- webSocketContext.executeWebSocketContext(jsonObject,ctx.channel());
- //类型转为(前后端达成的消息体)
- }
- //遍历出所有连接的通道
- }
-
- /**
- * 有新的连接建立时
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- //加入通道组
- clients.add(ctx.channel());
- }
-
- /**
- * 不活跃时会调用这个方法
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- //移除出通道组
- try {
- channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText());
- channelService.deleteChannelBindUserId(ctx.channel());
- }catch (Exception e){
-
- }
- clients.remove(ctx.channel());
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 获取参数
- super.channelActive(ctx);
- }
-
- //检查客户端写心跳事件
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- Channel channel = ctx.channel();
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
- if (idleStateEvent.state() == IdleState.READER_IDLE) {
- try {
- channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText());
- channelService.deleteChannelBindUserId(ctx.channel());
- }catch (Exception e){
-
- }
- clients.remove(channel);
- channel.close();
- }
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
-
-
- }
说明:webSocketContext.executeWebSocketContext(jsonObject,ctx.channel()); 为自己定义的处理消息的类,textWebSocketFrame.text()为对应的消息,可自行处理
- package com.pzg.chat.service;
-
- import io.netty.channel.Channel;
-
- public interface ChannelService {
-
- void setUserIdBindChannel(Channel channel);
-
- void setIdBindChannel(String id,Channel channel);
-
- void setChannelBindUserId(Channel channel);
-
- String getChannelBindUserId(Channel channel);
-
- void deleteChannelBindUserId(Channel channel);
-
- void deleteBindUserIdChannel();
-
- void deleteBindIdChannel(String id);
-
- void setUserIdAndIdBindChannel(String id ,Channel channel);
-
- void deleteBindUserIdAndIdChannel(String id);
-
- Channel getUserIdChannel(String userId);
-
- Channel getIdBindChannel(String Id);
- }
- package com.pzg.chat.service.impl;
- import com.pzg.chat.service.ChannelService;
- import com.pzg.chat.utils.UserUtil;
- import io.netty.channel.Channel;
- import org.springframework.stereotype.Service;
-
- import java.util.*;
-
- @Service
- public class ChannelServiceImpl implements ChannelService {
-
- //保存用户id和channel的映射
- public static HashMap<String,Channel> userIdChannel = new HashMap<>();
- //保存channelId和channel映射关系
- public static HashMap<String,Channel> idChannel = new HashMap<>();
- //保存channel和userID映射关系
- public static HashMap<Channel,String> ChannelUserId = new HashMap<>();
-
- @Override
- public void setUserIdBindChannel(Channel channel) {
- String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
- userIdChannel.put(userId,channel);
- }
-
- @Override
- public void setIdBindChannel(String id, Channel channel) {
- idChannel.put(id,channel);
- }
-
- @Override
- public void setChannelBindUserId(Channel channel) {
- String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
- System.out.println("----------------------->"+userId);
- ChannelUserId.put(channel,userId);
- }
-
- @Override
- public String getChannelBindUserId(Channel channel) {
- return ChannelUserId.get(channel);
- }
-
- @Override
- public void deleteChannelBindUserId(Channel channel) {
- ChannelUserId.remove(channel);
- }
-
- @Override
- public void deleteBindUserIdChannel() {
- String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
- userIdChannel.remove(userId);
- }
-
- @Override
- public void deleteBindIdChannel(String id) {
- idChannel.remove(id);
- }
-
- @Override
- public void setUserIdAndIdBindChannel(String id, Channel channel) {
- setUserIdBindChannel(channel);
- setIdBindChannel(id,channel);
- }
-
- @Override
- public void deleteBindUserIdAndIdChannel(String id) {
- deleteBindIdChannel(id);
- deleteBindUserIdChannel();
- }
-
- @Override
- public Channel getUserIdChannel(String userId) {
- return userIdChannel.get(userId);
- }
-
- @Override
- public Channel getIdBindChannel(String Id) {
- return idChannel.get(Id);
- }
-
- }
说明:缓存Channel主要保证消息能发送到对应的Channel中,消息可携带用户id通过id查找Channel,将信息存入即可 ,通过channel.writeAndFlush(new TextWebSocketFrame("消息内容"));刷出消息。
- package com.pzg.chat.listener;
-
- import com.pzg.chat.communication.WebSocketNettyServer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.ApplicationListener;
- import org.springframework.context.event.ContextRefreshedEvent;
- import org.springframework.stereotype.Component;
-
- @Component
- public class NettyStartListener implements ApplicationListener<ContextRefreshedEvent> {
-
- /**
- * 注入启动器
- */
- @Autowired
- private WebSocketNettyServer webSocketNettyServer;
-
- @Override
- public void onApplicationEvent(ContextRefreshedEvent event) {
- //判断event上下文中的父级是否为空
- if (event.getApplicationContext().getParent() == null) {
- try {
- //为空则调用start方法
- webSocketNettyServer.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- // 导出socket对象
- import {getToken} from '@/utils/auth';
- export {
- socket
- }
- import { Message } from 'element-ui'
- import {header} from "../../listening/header";
- import {asidefriend} from "../../listening/asidefriend";
- import {chatbox} from "../../listening/chatbox";
- import {chatcontent} from "../../listening/chatcontent";
- import {videocalls} from "../../listening/videocalls";
- import {voicecalls} from "../../listening/voicecalls";
- // socket主要对象
- var socket = {
- websock: null,
- ws_url: "ws://localhost:8089/ws",
- /**
- * 开启标识
- * */
- socket_open: false,
- /**
- * 心跳timer
- * */
- hearbeat_timer: null,
- /**
- * 心跳发送频率
- * */
- hearbeat_interval: 5000,
- /**
- * 是否开启重连
- * */
- is_reonnect: true,
- /**
- * 重新连接的次数
- * */
- reconnect_count: 3,
- /**
- * 当前重新连接的次数,默认为:1
- * */
- reconnect_current: 1,
- /**
- * 重新连接的时间类型
- * */
- reconnect_timer: null,
- /**
- * 重新连接的间隔
- * */
- reconnect_interval: 3000,
-
- i : 0,
-
- timer:null,
- /**
- * 初始化连接
- */
- init: () => {
- if (!("WebSocket" in window)) {
- Message({
- message: '当前浏览器与网站不兼容丫',
- type: 'error',
- });
- return null
- }
-
- if (socket.websock && socket.websock.readyState===1) {
- return socket.websock
- }
-
- socket.websock = new WebSocket(socket.ws_url)
- //接收消息
- socket.websock.onmessage = function (e) {
- //调用处理消息的方法
- socket.receive(e)
- }
-
- // 关闭连接
- socket.websock.onclose = function (e) {
- clearInterval(socket.hearbeat_interval);
- socket.socket_open = false;
- if (socket.websock!=null){
- header.getWebsocketStatus(socket.websock.readyState);
- }
- // 需要重新连接
- if (socket.is_reonnect) {
- socket.reconnect_timer = setTimeout(() => {
- // 超过重连次数
- if (socket.reconnect_current > socket.reconnect_count) {
- clearTimeout(socket.reconnect_timer)
- return
- }
- // 记录重连次数
- socket.reconnect_current++
- socket.reconnect()
- }, socket.reconnect_interval)
- }
- }
-
- // 连接成功
- socket.websock.onopen = function () {
- // Message({
- // message: '连接成功',
- // type: 'success',
- // });
- header.getWebsocketStatus(socket.websock.readyState);
- let data = {
- "action": 10002,
- "token":getToken(),
- "chatMsg": null,
- "extend": 1,
- };
- socket.send(data);
- socket.socket_open = true;
- socket.is_reonnect = true;
- //重修刷新好友内容
- window.dispatchEvent(new CustomEvent('connectInit'));
- // 开启心跳
- socket.heartbeat()
- };
- // 连接发生错误
- socket.websock.onerror = function (err) {
- Message({
- message: '服务连接发送错误!',
- type: 'error',
- });
- }
- },
- /**
- * 获取websocket对象
- * */
-
- getSocket:()=>{
- //创建了直接返回,反之重来
- if (socket.websock) {
- return socket.websock
- }else {
- socket.init();
- }
- },
-
- getStatus:()=> {
- if (socket.websock.readyState === 0) {
- return "未连接";
- } else if (socket.websock.readyState === 1) {
- return "已连接";
- } else if (socket.websock.readyState === 2) {
- return "连接正在关闭";
- } else if (socket.websock.readyState === 3) {
- return "连接已关闭";
- }
- },
-
- /**
- * 发送消息
- * @param {*} data 发送数据
- * @param {*} callback 发送后的自定义回调函数
- */
- send: (data, callback = null) => {
- // 开启状态直接发送
- if (socket.websock!=null && socket.websock.readyState === socket.websock.OPEN) {
- try {
- socket.websock.send(JSON.stringify(data));
- }catch (e) {
- if (socket.timer !=null){
- return
- }
- socket.timer = setInterval(()=>{
- if (i>=6){
- clearInterval(socket.timer);
- socket.timer = null;
- socket.i = 0;
- return
- }
- socket.websock.send(JSON.stringify(data));
- socket.i++;
- },2000)
- }
- if (callback) {
- callback()
- }
-
- // 正在开启状态,则等待1s后重新调用
- } else if (socket.websock!=null && socket.websock.readyState === socket.websock.CONNECTING) {
- setTimeout(function () {
- socket.send(data, callback)
- }, 1000)
-
- // 未开启,则等待1s后重新调用
- } else if (socket.websock!=null){
- socket.init();
- setTimeout(function () {
- socket.send(data, callback)
- }, 1000)
- }
- },
-
- /**
- * 接收消息
- * @param {*} message 接收到的消息
- */
- receive: (message) => {
- var recData = JSON.parse(message.data);
- /**
- *这部分是我们具体的对消息的处理
- * */
- console.log(recData)
- // 自行扩展其他业务处理...
- },
-
- /**
- * 心跳
- */
- heartbeat: () => {
- if (socket.hearbeat_timer) {
- clearInterval(socket.hearbeat_timer)
- }
- socket.hearbeat_timer = setInterval(() => {
- //发送心跳包
- let data = {
- "action": 10000,
- "token":getToken(),
- "chatMsg": null,
- "extend": null,
- };
- socket.send(data)
- }, socket.hearbeat_interval)
- },
-
- /**
- * 主动关闭连接
- */
- close: () => {
- if (socket.websock==null){
- return
- }
- let data = {
- "action": 10002,
- "token":getToken(),
- "chatMsg": null,
- "extend": 0,
- };
- socket.send(data);
- clearInterval(socket.hearbeat_interval);
- socket.is_reonnect = false;
- socket.websock.close();
- header.getWebsocketStatus(socket.websock.readyState);
- socket.websock=null
- },
-
- /**
- * 重新连接
- */
- reconnect: () => {
- if (socket.websock && socket.socket_open) {
- socket.websock.close()
- }
- socket.init()
- },
- }
说明:通过登入后,在某个全局页面中调用socket.start()即可连接netty服务器,通过socket.send("消息")来发送消息。
以上就是整个的使用教程,如果报错或者有异常,可以在私信我,看到消息第一时间帮你解决,坚持就是胜利,加油!。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。