赞
踩
一些固定的 util 类:
https://blog.csdn.net/YKenan/article/details/106319712
导包
<dependency> <groupId>springCloud</groupId> <artifactId>springCloud_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!-- 聊天 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.17</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency>
WebSocket + Vue 的一个简单示例: https://blog.csdn.net/YKenan/article/details/106363153
package com.springCloud.netty.config.listener; import com.springCloud.netty.config.webSocket.WebSocketServer; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; /** * 在 IOC 的容器的启动过程, 当所有的 bean 都已经处理完成之后, spring ioc 容器会有-个发布事件的动作. * 让我们的 bean 实现 ApplicationListener 接口, 这样当发布事件时, [spring] 的 ioc 容器就会以容器的实例对象作为事件源类, 并从中找到事件的监听者, 此时 ApplicationListener 接口实例中的 onApplicationEvent (Event) 方法就会被调用. */ @Component public class NettyListener implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { System.out.println("我的父容器: " + contextRefreshedEvent.getApplicationContext().getParent()); WebSocketServer.getInstance().start(); } }
WebSocketServer: 定义 NIO 主从线程, 端口.
package com.springCloud.netty.config.webSocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.Data; import org.springframework.stereotype.Component; @Data @Component public class WebSocketServer { private static class SingletonWSServer { static final WebSocketServer instance = new WebSocketServer(); } public static WebSocketServer getInstance() { return SingletonWSServer.instance; } private EventLoopGroup mainGroup; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public WebSocketServer() { mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketServerInitializer()); } public void start() { this.future = server.bind(8009); System.err.println("netty websocket server 启动完毕..."); } }
WebSocketServerInitializer: 获取管道, 数据流和访问路径.
package com.springCloud.netty.config.webSocket; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) { // 获取管道 (pipeline) ChannelPipeline pipeline = socketChannel.pipeline(); // Websocket 基于 http 协议, 所需要的 http 编码器 pipeline.addLast(new HttpServerCodec()); // 在 http 上有一些数据流产生, 有大有小, 我们对其处理, 既然如此, 我们需要使用 netty 对下数据流读写提供支持, 这两个类叫: pipeline.addLast(new ChunkedWriteHandler()); // 对 httpMessage 进行聚合处理, 聚合成 request和 response pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 本 handler 会帮你处理一些繁重复杂的事请, 会帮你处理握手动作: handshaking (close, ping, pong) ping + pong = 心跳, 对于 websocket 来讲, 都是以 frame 进行传输的, 不同的数据类型对应的 frame 也不同. pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 自定义的 handler pipeline.addLast(new ChatHandler()); } }
ChatHandler: 处理类.
- 获取客户端信息
- 判断消息的类型, 根据不同的类型处理不同的业务
2.1 当 WebSocket 第一次 open 的时候, 初始化 channel, 把用的 channel 和 userID 关联起来. (主要用于两个用户之间的传输数据, 每个用户对应一个 channel)
2.2 聊天类型的消息, 把聊天记录保存到数据库中, 同时标记消息的签收状态 [未签收] (将聊天记录存在数据库中)
2.3 签收消息类型, 针对具体的消息进行签收, 修改数据库中对应的消息的签收状态 [已签收] (修改数据库中信息存在的状态)- 处理异常
package com.springCloud.netty.config.webSocket; import com.springCloud.common.util.JsonUtils; import com.springCloud.common.util.SpringUtil; import com.springCloud.netty.service.impl.ChatServiceImpl; import com.springCloud.netty.util.enums.MsgActionEnum; import com.springCloud.netty.util.enums.MsgSignFlagEnum; import com.springCloud.netty.pojo.Chat; import com.springCloud.netty.pojo.WebSocket; import com.springCloud.netty.service.ChatService; import com.springCloud.netty.util.relation.UserChannelRel; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; /** * 用于处理消息的 handler * 由于它的传输数据的载体时 frame, 这个 frame 在 netty 中, 是用于 websocket 专门处理文本对象的, frame 是消息的载体, 此类叫做: TextWebSocketFrame */ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 用于记录和管理所有客户端的 Channel private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) { // 获取客户端所传输的信息 String content = textWebSocketFrame.text(); // 获取消息的内容 WebSocket webSocket = JsonUtils.jsonToPojo(content, WebSocket.class); assert webSocket != null; Integer action = webSocket.getAction(); /* * * 判断消息的类型, 根据不同的类型处理不同的业务 * 1. 当 WebSocket 第一次 open 的时候, 初始化 channel, 把用的 channel 和 userID 关联起来. * 2. 聊天类型的消息, 把聊天记录保存到数据库中, 同时标记消息的签收状态 [未签收] * 3. 签收消息类型, 针对具体的消息进行签收, 修改数据库中对应的消息的签收状态 [已签收] * 4. 心跳类型消息 */ Channel channel = channelHandlerContext.channel(); if (action.equals(MsgActionEnum.CONNECT.type)) { String sendId = webSocket.getChat().getSendId(); UserChannelRel.put(sendId, channel); // 测试输出 UserChannelRel.output(); } else if (action.equals(MsgActionEnum.CHAT.type)) { // 获取发送过来的聊天参数 Chat chat = webSocket.getChat(); String message = chat.getMessage(); String sendId = chat.getSendId(); String reviverId = chat.getReceiveId(); // 得到 bean, 保存数据库 ChatServiceImpl chatServiceImpl = (ChatServiceImpl) SpringUtil.getBean("chatServiceImpl"); Chat saveChat = chatServiceImpl.save(new Chat("", sendId, reviverId, message, MsgSignFlagEnum.unSign.type)); WebSocket webSocketMsg = new WebSocket(); webSocketMsg.setChat(saveChat); // 发送消息 Channel reviverChannel = UserChannelRel.get(reviverId); if (reviverChannel == null) { // 离线用户 System.out.println("离线用户"); } else { // 从 ChannelGroup 查找对应的组是否存在 Channel findChannel = clients.find(reviverChannel.id()); if (findChannel != null) { // 用户在线 reviverChannel.writeAndFlush( new TextWebSocketFrame( JsonUtils.objectToJson(webSocket) ) ); } else { // 离线用户 System.out.println("离线用户"); } } } else if (action.equals(MsgActionEnum.SIGNED.type)) { ChatService chatService = (ChatService) SpringUtil.getBean("chatService"); // 扩展字段 SIGNED 在类型消息中, 代表需要去签收的信息 id 号. 逗号间隔 String msgStr = webSocket.getExtend(); String[] msgStrId = msgStr.split(","); List<String> msgStrIdList = new ArrayList<>(); for (String mid : msgStrId) { if (StringUtils.isNotBlank(mid)) { msgStrIdList.add(mid); } } // 批量 update if (!msgStrIdList.isEmpty()) { chatService.updateChatList(msgStrIdList); } } else if (action.equals(MsgActionEnum.KEEP_ALIVE.type)) { System.out.println("心跳类型消息"); } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { super.handlerAdded(ctx); clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); clients.remove(ctx.channel()); // 这句话没有必要写 System.out.println("客户端断开, Channel 对应的长 ID 为: " + ctx.channel().id().asLongText()); System.out.println("客户端断开, Channel 对应的短 ID 为: " + ctx.channel().id().asShortText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); // 发生了异常, 之后关闭连接, 移除 ctx.channel().close(); clients.remove(ctx.channel()); } }
Chat: 和数据库对应的类, 基于存储信息的类.
WebSocket: 这个没有和数据库中表连接, 只是源于方便用的.
Chat
package com.springCloud.netty.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; import java.io.Serializable; @Table(name = "chat") @AllArgsConstructor @NoArgsConstructor @Entity @ToString @Data public class Chat implements Serializable { @Id private String id; @Column private String sendId; @Column private String receiveId; @Column private String message; @Column private Integer status; }
WebSocket
package com.springCloud.netty.pojo; import lombok.Data; import lombok.ToString; import java.io.Serializable; /** * 聊天内容: * id: 聊天的 id 号, * action: 聊天的动作, * chatMsg: 聊天的信息, * extend: 扩展内容. */ @ToString @Data public class WebSocket implements Serializable { private String id; private Integer action; private String extend; private Chat chat; }
处理类中用到了两个枚举类:
- MsgActionEnum: 信息行为
- MsgSignFlagEnum: 信息转状态 (已读, 未读)
MsgActionEnum
package com.springCloud.netty.util.enums; /** * 发送消息的动作 枚举 */ public enum MsgActionEnum { CONNECT(1, "第一次(或重连)初始化连接"), CHAT(2, "聊天消息"), SIGNED(3, "消息签收"), KEEP_ALIVE(4, "客户端保持心跳"), PULL_FRIEND(5, "拉取好友"); public final Integer type; public final String content; MsgActionEnum(Integer type, String content) { this.type = type; this.content = content; } public Integer getType() { return type; } }
MsgSignFlagEnum
package com.springCloud.netty.util.enums; /** * 消息签收状态 枚举 */ public enum MsgSignFlagEnum { unSign(0, "未签收"), signed(1, "已签收"); public final Integer type; public final String content; MsgSignFlagEnum(Integer type, String content) { this.type = type; this.content = content; } public Integer getType() { return type; } }
用户 id 号与 Channel 关联起来, 一一对应.
package com.springCloud.netty.util.relation; import io.netty.channel.Channel; import java.util.HashMap; import java.util.Map; /** * 用户 id 号与 Channel 关联起来 */ public class UserChannelRel { private static HashMap<String, Channel> manage = new HashMap<>(); public static void put(String userId, Channel channel) { manage.put(userId, channel); } public static Channel get(String userId) { return manage.get(userId); } public static void output() { for (Map.Entry<String, Channel> stringChannelEntry : manage.entrySet()) { System.out.println("UserId: " + stringChannelEntry.getKey() + ", ChannelId: " + stringChannelEntry.getValue().id().asLongText()); } } }
<template> <div id="msg-outer"> <div class="msg"> <div v-for="(message, index) in msgFor" :key="index"> <!-- 好友的信息 --> <div class="msg-left" v-if="message.receiveShow"> <div> <img v-if="friend.headIcon" :src="friend.headIcon" class="head"/> <span>{{message.data}}</span> <div class="popper-arrow"></div> </div> </div> <!-- 我的信息 --> <div class="msg-right" v-if="message.sendShow"> <div> <span>{{message.data}}</span> <div class="popper-arrow"></div> <img v-if="my.headIcon" :src="my.headIcon" class="head"/> </div> </div> </div> </div> <div class="footer"> <el-input type="textarea" :rows="2" placeholder="请输入内容" v-model="sendMessage"> </el-input> <el-button type="primary" @click="sendWebSocket(sendMessage)">发送</el-button> </div> </div> </template> <script> import ElInput from '../../../node_modules/element-ui/packages/input/src/input.vue' import ElButton from '../../../node_modules/element-ui/packages/button/src/button.vue' export default { components: { ElButton, ElInput }, props: {}, data () { return { chat: { id: '', friendId: '' }, // 我的信息 my: {}, // 好友的信息 friend: {}, // 聊天的信息 msgFor: [], // 发送的信息 sendMessage: '', // 信息行为 action: { // "第一次 (或重连) 初始化连接" CONNECT: 1, // "聊天消息" CHAT: 2, // "消息签收" SIGNED: 3, // "客户端保持心跳" KEEP_ALIVE: 4, // "拉取好友" PULL_FRIEND: 5 }, webSocket: null } }, mounted () { this.chat.id = this.$route.query.id this.chat.friendId = this.$route.query.friendId // 我的信息 this.my = JSON.parse(localStorage.getItem('user')) // 好友的信息 this.$axios({ url: 'http://localhost:8498/user/' + this.chat.friendId, method: 'get', headers: { 'Content-Type': 'application/json', 'Access-Control-Allow-Methods': 'POST, OPTIONS', 'Access-Control-Allow-Origin': '*' }, proxy: { host: 'localhost', port: 8498 } }).then(res => { if (res.data.flag) { this.friend = res.data.data } console.log(res.data.data) }) // 设置聊天记录为最后一条 let elementsByClassName = document.getElementsByClassName('msg')[0] elementsByClassName.scrollTop = elementsByClassName.offsetHeight + elementsByClassName.scrollHeight }, watch: {}, created () { this.initWebSocket() }, destroyed () { // 离开路由之后断开 webSocket 连接 this.webSocket.close() }, methods: { // 发送信息的函数 sendMessageWebSocket (action, extend, sendId, receiveId, message, status) { return { action: action, extend: extend, chat: { sendId: sendId, receiveId: receiveId, message: message, status: status } } }, // 初始化 webSocket initWebSocket () { if (window.WebSocket) { // 如果 WebSocket 状态已经是链接是时候无需再次创建链接 if (this.webSocket !== null && this.webSocket !== undefined && this.webSocket.readyState === this.webSocket.OPEN) { return false } // 创建 WebSocket 对象 this.webSocket = new WebSocket('ws://127.0.0.1:8009/ws') this.webSocket.onopen = this.onOpenWebSocket this.webSocket.onmessage = this.onMessageWebSocket this.webSocket.onerror = this.onErrorWebSocket this.webSocket.onclose = this.closeWebSocket } else { this.$message.error('您的浏览器的版本过低, 请尽快上级版本!') } }, // 连接建立之后执行 send 方法发送数据 onOpenWebSocket () { console.log('链接建立成功!') let wsMsg = this.sendMessageWebSocket(this.action.CONNECT, null, this.my.id, null, null, null) this.webSocket.send(JSON.stringify(wsMsg)) }, // 连接建立失败重连 onErrorWebSocket () { this.initWebSocket() }, // 数据发送 sendWebSocket (data) { if (data === '') { this.$message.warning('请输入数据') return false } else if (data.length > 3000) { this.$message.warning('输入数据太长') return false } // 如果 WebSocket 状态失链, 需要重新链接再次发送 if (this.webSocket !== null && this.webSocket !== undefined && this.webSocket.readyState === this.webSocket.OPEN) { let message = { data: data, receiveShow: false, sendShow: true } this.msgFor.push(message) let wsMsg = this.sendMessageWebSocket(this.action.CHAT, this.friend.id, this.my.id, this.friend.id, data, 0) console.log(wsMsg) this.webSocket.send(JSON.stringify(wsMsg)) } else { this.initWebSocket() let message = { data: data, receiveShow: false, sendShow: true } this.msgFor.push(message) let wsMsg = this.sendMessageWebSocket(this.action.CHAT, this.friend.id, this.my.id, this.friend.id, data, 0) console.log(wsMsg) this.webSocket.send(JSON.stringify(wsMsg)) setTimeout(() => { this.webSocket.send(data) }, 1000) } // 设置聊天记录为最后一条 setTimeout(() => { // 设置聊天记录为最后一条 let elementsByClassName = document.getElementsByClassName('msg')[0] elementsByClassName.scrollTop = elementsByClassName.offsetHeight + elementsByClassName.scrollHeight }, 250) // 数据发送完, 清空数据 this.sendMessage = '' }, // 数据接收 onMessageWebSocket (e) { // 建立信息展示对象 let data = JSON.parse(e.data).chat console.log(data) let message = { data: data.message, receiveShow: true, sendShow: false } this.msgFor.push(message) setTimeout(() => { // 设置聊天记录为最后一条 let elementsByClassName = document.getElementsByClassName('msg')[0] elementsByClassName.scrollTop = elementsByClassName.offsetHeight + elementsByClassName.scrollHeight }, 250) }, // 关闭 closeWebSocket (e) { console.log('断开连接', e) } } } </script> <style scoped> #msg-outer { width: 70%; margin: 20px auto; } .msg { padding: 9px; height: 300px; overflow-y: auto; } .msg .msg-left, .msg .msg-right { display: flow-root; width: 100%; height: 100%; } .msg .msg-left > div { float: left; } .msg .msg-right > div { float: right; } .msg .head { width: 40px; height: 40px; border-radius: 45%; } .msg .msg-left > div, .msg .msg-right > div { position: relative; } .msg .msg-left > div span, .msg .msg-right > div span { display: inline-block; background-color: #dbdada; padding: 5px 25px; border-radius: 5px; max-width: 550px; white-space: pre-wrap; word-break: break-all; overflow-wrap: break-word; } .msg .msg-left > div span { margin-left: 10px; } .msg .msg-right > div span { margin-right: 10px; } .msg .msg-left > div .popper-arrow, .msg .msg-right > div .popper-arrow { display: block; width: 19px; height: 10px; background-color: #dbdada; border-radius: 500%; position: absolute; bottom: 9px; filter: drop-shadow(0 2px 12px rgba(0,0,0,.03)); z-index: -1; } .msg .msg-left > div .popper-arrow { left: 42px; transform: rotateZ(20deg); } .msg .msg-right > div .popper-arrow { right: 42px; transform: rotateZ(-20deg); } .footer { margin-top: 20px; } #msg-outer { background-color: rgba(0, 0, 0, 0.03); } #msg-outer >>> textarea, #msg-outer >>> button { border-radius: 0; } #msg-outer >>> button { float: right; } </style>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。