赞
踩
目录 |
---|
IM即时通讯系统[SpringBoot+Netty]——梳理(二) |
IM即时通讯系统[SpringBoot+Netty]——梳理(三) |
IM即时通讯系统[SpringBoot+Netty]——梳理(四) |
IM即时通讯系统[SpringBoot+Netty]——梳理(五) |
首先从市面上看im系统(无非就这三种方式):
优点
:可以快速的上手,使用
缺点
:功能缺失,可持续性不强,没有团队做后期的维护和扩展,是否和自己公司的技术栈相匹配
优点
:既不用开发im系统,也不需要运维服务器,大型的服务商技术比较成熟,消息传递的可靠性高,根据服务商官方的sdk和ui库,很容易的给自己的服务加上im功能
缺点
:无法窥探服务商的源码(闭源),定制化的需求很难满足,官方的扩展如果没有解决你的需求,基本上就无解了,还要有信息和数据是重要的资产,放在别人的手里不太好,服务的费用高
优点
:切合公司技术栈进行开发,不用担心后期维护,定制自己的需求,数据安全得到保护
缺点
:需要有特别熟悉im系统的人开发,对技术水平有一定的要求,人力成本增加
这是早期的京东客服实现的技术架构
这个架构会造成资源的浪费,没有消息发送的时候,轮询也不会停止
客户端
:PC端(MAC、WINDOS)、手机端(安卓、苹果)、WEB端
服务层
:
接入层
:im系统的门户,是im系统中较为核心
的模块,维护着客户端和服务端的长链接,消息由客户端发送给接入层,接入层交递给逻辑层进行处理;接入层主要分为四个功能
:第一个是保持长链接、第二个是协议解析、第三个是我们的session维护、第四个是消息推送;当消息处理完成后,也是由接入层投递给客户端;在接入层和客户端中必须
要有协议(应用层协议:文本协议和二进制协议—MQTT、XMPP、HTPP等协议;私有协议)逻辑层
:业务系统的一个又一个的模块:用户、关系链、群组、消息存储层
:MySQL、Redis
长连接在收发消息即时,有消息来可以通过长连接可以直接投递给用户,对比长轮询而言,避免了许多的空循环(可以参考本文:web通讯的四种方式)
接入层和逻辑层可以通过rpc调用
或者mq解耦
逻辑层连接的各大持久层完成持久化工作
接入层
:去维护我们客户端的长连接和消息的收发,协议可以考虑使用TCP协议(可靠的);选择一个合适的应用层协议
(MQTT、XMPP、私有协议);接入层还要做好用户session的维护,接入层和传统的web开发有不同,接入层是有状态的服务,传统的http是无状态的服务
逻辑层
:处理消息收发的核心逻辑,配合接入层和存储层,真正的做到消息的不丢、不漏、不串
存储层:
要有合理的设计,为逻辑层提供数据服务,能够承载海量的聊天记录数据
这里我觉得不错的地方,用导入用户资料的逻辑做示范:
然后这里就是一些增删改查的逻辑,这里就不写了,自己过一遍知道大致意思,后面也一样
为什么要这么说呢?你看微信、QQ为什么地位这么牢固呢?就是因为他们里面有你的好友,如果你换了一款聊天软件这些好友你就都没有了,你说这是不是价值挺高。
弱好友好关系设计:
强好友关系设计:
这里贴一个添加好友的具体逻辑的代码,其他的和这个大致思路差不多
// 添加好友的逻辑 @Transactional public ResponseVO doAddFriend(RequestBase requestBase, String fromId, FriendDto dto, Integer appId){ // A-B // Friend表插入 A 和 B 两条记录 // 查询是否有记录存在,如果存在则判断状态,如果是已经添加,则提示已经添加了,如果是未添加,则修改状态 // 第一条数据的插入 LambdaQueryWrapper<ImFriendShipEntity> lqw = new LambdaQueryWrapper<>(); lqw.eq(ImFriendShipEntity::getAppId, appId); lqw.eq(ImFriendShipEntity::getFromId, fromId); lqw.eq(ImFriendShipEntity::getToId, dto.getToId()); ImFriendShipEntity entity = imFriendShipMapper.selectOne(lqw); long seq = 0L; // 不存在这条消息 if(entity == null){ // 直接添加 entity = new ImFriendShipEntity(); seq = redisSeq.doGetSeq(appId + ":" + Constants.SeqConstants.Friendship); entity.setAppId(appId); entity.setFriendSequence(seq); entity.setFromId(fromId); BeanUtils.copyProperties(dto, entity); entity.setStatus(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()); entity.setCreateTime(System.currentTimeMillis()); int insert = imFriendShipMapper.insert(entity); if(insert != 1){ // TODO 添加好友失败 return ResponseVO.errorResponse(FriendShipErrorCode.ADD_FRIEND_ERROR); } writeUserSeq.writeUserSeq(appId, fromId, Constants.SeqConstants.Friendship, seq); }else{ // 存在这条消息,去根据状态做判断 // 他已经是你的好友了 if(entity.getStatus() == FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()){ // TODO 对方已经是你的好友 return ResponseVO.errorResponse(FriendShipErrorCode.TO_IS_YOUR_FRIEND); }else{ ImFriendShipEntity update = new ImFriendShipEntity(); if(StringUtils.isNotEmpty(dto.getAddSource())){ update.setAddSource(dto.getAddSource()); } if(StringUtils.isNotEmpty(dto.getRemark())){ update.setRemark(dto.getRemark()); } if(StringUtils.isNotEmpty(dto.getExtra())){ update.setExtra(dto.getExtra()); } seq = redisSeq.doGetSeq(appId + ":" + Constants.SeqConstants.Friendship); update.setFriendSequence(seq); update.setStatus(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()); int res = imFriendShipMapper.update(update, lqw); if(res != 1){ // TODO 添加好友失败 return ResponseVO.errorResponse(FriendShipErrorCode.ADD_FRIEND_ERROR); } writeUserSeq.writeUserSeq(appId, fromId, Constants.SeqConstants.Friendship, seq); } } // 第二条数据的插入 LambdaQueryWrapper<ImFriendShipEntity> lqw1 = new LambdaQueryWrapper<>(); lqw1.eq(ImFriendShipEntity::getAppId, appId); lqw1.eq(ImFriendShipEntity::getFromId, dto.getToId()); lqw1.eq(ImFriendShipEntity::getToId, fromId); ImFriendShipEntity entity1 = imFriendShipMapper.selectOne(lqw1); // 不存在就直接添加 if(entity1 == null){ entity1 = new ImFriendShipEntity(); entity1.setAppId(appId); entity1.setFromId(dto.getToId()); BeanUtils.copyProperties(dto, entity1); entity1.setToId(fromId); entity1.setFriendSequence(seq); entity1.setStatus(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()); entity1.setCreateTime(System.currentTimeMillis()); int insert = imFriendShipMapper.insert(entity1); if(insert != 1){ // TODO 添加好友失败 return ResponseVO.errorResponse(FriendShipErrorCode.ADD_FRIEND_ERROR); } writeUserSeq.writeUserSeq(appId, dto.getToId(), Constants.SeqConstants.Friendship, seq); }else{ // 存在就判断状态 if(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode() != entity1.getStatus()){ // TODO 对方已经是你的好友 return ResponseVO.errorResponse(FriendShipErrorCode.TO_IS_YOUR_FRIEND); }else{ ImFriendShipEntity entity2 = new ImFriendShipEntity(); entity2.setFriendSequence(seq); entity2.setStatus(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()); imFriendShipMapper.update(entity2, lqw1); writeUserSeq.writeUserSeq(appId, dto.getToId(), Constants.SeqConstants.Friendship, seq); } } // TODO TCP通知 // A B 添加好友,要把添加好友的信息,发送给除了A其他的端,还要发送给B的所有端 // 发送给from AddFriendPack addFriendPack = new AddFriendPack(); BeanUtils.copyProperties(entity, addFriendPack); addFriendPack.setSequence(seq); if(requestBase != null){ messageProducer.sendToUser(fromId, requestBase.getClientType(), requestBase.getImei(), FriendshipEventCommand.FRIEND_ADD, addFriendPack, requestBase.getAppId()); }else{ messageProducer.sendToUser(fromId, FriendshipEventCommand.FRIEND_ADD, addFriendPack, requestBase.getAppId()); } // 发送给to AddFriendPack addFriendToPack = new AddFriendPack(); BeanUtils.copyProperties(entity1, addFriendToPack); messageProducer.sendToUser(entity1.getFromId(), FriendshipEventCommand.FRIEND_ADD, addFriendToPack, requestBase.getAppId()); // 之后回调 if(appConfig.isDestroyGroupAfterCallback()){ AddFriendAfterCallbackDto addFriendAfterCallbackDto = new AddFriendAfterCallbackDto(); addFriendAfterCallbackDto.setFromId(fromId); addFriendAfterCallbackDto.setToItem(dto); callbackService.callback(appId, Constants.CallbackCommand.AddFriendAfter, JSONObject.toJSONString(addFriendAfterCallbackDto)); } return ResponseVO.successResponse(); }
后面的seq和回调、TCP通知可以先不看
这里的校验好友可以分为两种,一种是单向好友校验,一种是双向好友校验,这里贴出代码
// 校验好友关系 @Override public ResponseVO checkFriendShip(CheckFriendShipReq req) { // 双向校验的修改 // 1、先是把req中的所有的toIds都转化为key为属性,value为0的map Map<String, Integer> result = req.getToIds().stream().collect(Collectors.toMap(Function.identity(), s-> 0)); List<CheckFriendShipResp> resp = new ArrayList<>(); if(req.getCheckType() == CheckFriendShipTypeEnum.SINGLE.getType()){ resp = imFriendShipMapper.checkFriendShip(req); }else{ resp = imFriendShipMapper.checkFriendShipBoth(req); } // 2、将复杂sql查询出来的数据转换为map Map<String, Integer> collect = resp.stream() .collect(Collectors.toMap(CheckFriendShipResp::getToId, CheckFriendShipResp::getStatus)); // 3、最后比对之前result中和collect是否完全相同,collect中没有的话,就将这个数据封装起来放到resp中去 for (String toId : result.keySet()){ if(!collect.containsKey(toId)){ CheckFriendShipResp checkFriendShipResp = new CheckFriendShipResp(); checkFriendShipResp.setFromId(req.getFromId()); checkFriendShipResp.setToId(toId); checkFriendShipResp.setStatus(result.get(toId)); resp.add(checkFriendShipResp); } } return ResponseVO.successResponse(resp); }
这里还要一个点,就是那个result最后和collect 里面的做一下对比,如果我们要校验的用户,不存在于数据库(双向校验在下面出现status=4的情况是,那个用户存在于数据库但是它的status为0),collect就查询不出来,也就要把那个数据也要加到resp中去,此时它的status=0
重要的点就是imFriendShipMapper这里面的两个sql语句
checkFriendShip(单向校验)
@Select("<script>" +
"select from_id as fromId, to_id as toId, if(status = 1, 1, 0) as status from im_friendship where from_id = #{fromId} and to_id in " +
"<foreach collection='toIds' index = 'index' item = 'id' separator = ',' close = ')' open = '('>" +
"#{id}" +
"</foreach>" +
"</script>")
public List<CheckFriendShipResp> checkFriendShip(CheckFriendShipReq req);
也就是我通过fromId和toId只要能查到,就算是校验成功,校验结果再通过if(status = 1, 1, 0) as status 来做判断,最后返回给前面
checkFriendShipBoth(双向校验)
@Select("<script>" + "select a.fromId, a.toId, ( " + "case " + "when a.status = 1 and b.status = 1 then 1 " + "when a.status = 1 and b.status != 1 then 2 " + "when a.status != 1 and b.status = 1 then 3 " + "when a.status != 1 and b.status != 1 then 4 " + "end" + ")" + "as status from " + "(select from_id as fromId, to_id as toId, if(status = 1, 1, 0) as status from im_friendship where app_id = #{appId} and from_id = #{fromId} and to_id in " + "<foreach collection='toIds' index='index' item='id' separator=',' close=')' open='('>" + "#{id}" + "</foreach>" + ") as a inner join" + "(select from_id as fromId, to_id as toId, if(status = 1, 1, 0) as status from im_friendship where app_id = #{appId} and to_id = #{fromId} and from_id in " + "<foreach collection='toIds' index='index' item='id' separator=',' close=')' open='('>" + "#{id}" + "</foreach>" + ") as b " + "on a.fromId = b.toId and a.toId = b.fromId" + "</script>") public List<CheckFriendShipResp> checkFriendShipBoth(CheckFriendShipReq req);
这里的校验黑名单业务和上面的校验好友业务是差不多的,这里也贴一下代码
// 校验黑名单 @Override public ResponseVO checkFriendBlack(CheckFriendShipReq req) { Map<String, Integer> toIdMap = req.getToIds().stream().collect(Collectors.toMap(Function.identity(),s -> 0)); List<CheckFriendShipResp> resp = new ArrayList<>(); if(req.getCheckType() == CheckFriendShipTypeEnum.SINGLE.getType()){ resp = imFriendShipMapper.checkFriendShipBlack(req); }else { resp = imFriendShipMapper.checkFriendShipBlackBoth(req); } Map<String, Integer> collect = resp.stream().collect(Collectors.toMap(CheckFriendShipResp::getToId, CheckFriendShipResp::getStatus)); for (String toId : toIdMap.keySet()) { if(!collect.containsKey(toId)){ CheckFriendShipResp checkFriendShipResp = new CheckFriendShipResp(); checkFriendShipResp.setToId(toId); checkFriendShipResp.setFromId(req.getFromId()); checkFriendShipResp.setStatus(toIdMap.get(toId)); resp.add(checkFriendShipResp); } } return ResponseVO.successResponse(resp); }
checkFriendShipBlack(单向校验)
@Select("<script>" +
" select from_id AS fromId, to_id AS toId , if(black = 1,1,0) as status from im_friendship where app_id = #{appId} and from_id = #{fromId} and to_id in " +
"<foreach collection='toIds' index='index' item='id' separator=',' close=')' open='('>" +
" #{id} " +
"</foreach>" +
"</script>"
)
List<CheckFriendShipResp> checkFriendShipBlack(CheckFriendShipReq req);
checkFriendShipBlackBoth(双向校验)
@Select("<script>" + " select a.fromId,a.toId , ( \n" + " case \n" + " when a.black = 1 and b.black = 1 then 1 \n" + " when a.black = 1 and b.black != 1 then 2 \n" + " when a.black != 1 and b.black = 1 then 3 \n" + " when a.black != 1 and b.black != 1 then 4 \n" + " end \n" + " ) \n " + " as status from "+ " (select from_id AS fromId , to_id AS toId , if(black = 1,1,0) as black from im_friendship where app_id = #{appId} and from_id = #{fromId} AND to_id in " + "<foreach collection='toIds' index='index' item='id' separator=',' close=')' open='('>" + " #{id} " + "</foreach>" + " ) as a INNER join" + " (select from_id AS fromId, to_id AS toId , if(black = 1,1,0) as black from im_friendship where app_id = #{appId} and to_id = #{fromId} AND from_id in " + "<foreach collection='toIds' index='index' item='id' separator=',' close=')' open='('>" + " #{id} " + "</foreach>" + " ) as b " + " on a.fromId = b.toId AND b.fromId = a.toId "+ "</script>" ) List<CheckFriendShipResp> checkFriendShipBlackBoth(CheckFriendShipReq toId);
这里的新建好友申请是在添加好友的业务中实现的,会根据用户的一个字段,是否需要申请才能加好友,代码如下
还有审批申请的代码
// 审批好友请求 @Override @Transactional public ResponseVO approverFriendRequest(ApproverFriendRequestReq req) { ImFriendShipRequestEntity imFriendShipRequestEntity = imFriendShipRequestMapper.selectById(req.getId()); if(imFriendShipRequestEntity == null){ throw new ApplicationException(FriendShipErrorCode. FRIEND_REQUEST_IS_NOT_EXIST); } if(!req.getOperater().equals(imFriendShipRequestEntity.getToId())){ //只能审批发给自己的好友请求 throw new ApplicationException(FriendShipErrorCode.NOT_APPROVER_OTHER_MAN_REQUEST); } long seq = redisSeq.doGetSeq(req.getAppId() + ":" + Constants.SeqConstants.FriendshipRequest); ImFriendShipRequestEntity update = new ImFriendShipRequestEntity(); // 这里审批是指同意或者拒绝,所以要写活 update.setApproveStatus(req.getStatus()); update.setUpdateTime(System.currentTimeMillis()); update.setId(req.getId()); update.setSequence(seq); imFriendShipRequestMapper.updateById(update); writeUserSeq.writeUserSeq(req.getAppId(),req.getOperater(), Constants.SeqConstants.FriendshipRequest,seq); // 如果是统一的话,就可以直接调用添加好友的逻辑了 if(ApproverFriendRequestStatusEnum.AGREE.getCode() == req.getStatus()){ FriendDto dto = new FriendDto(); dto.setAddSource(imFriendShipRequestEntity.getAddSource()); dto.setAddWorking(imFriendShipRequestEntity.getAddWording()); dto.setRemark(imFriendShipRequestEntity.getRemark()); dto.setToId(imFriendShipRequestEntity.getToId()); ResponseVO responseVO = imFriendShipService.doAddFriend(req , imFriendShipRequestEntity.getFromId(), dto, req.getAppId()); if(!responseVO.isOk() && responseVO.getCode() != FriendShipErrorCode.TO_IS_YOUR_FRIEND.getCode()){ return responseVO; } } // TODO TCP通知 // 通知审批人的其他端 ApproverFriendRequestPack approverFriendRequestPack = new ApproverFriendRequestPack(); approverFriendRequestPack.setStatus(req.getStatus()); approverFriendRequestPack.setId(req.getId()); approverFriendRequestPack.setSequence(seq); messageProducer.sendToUser(imFriendShipRequestEntity.getToId(), req.getClientType(), req.getImei(), FriendshipEventCommand.FRIEND_REQUEST_APPROVER, approverFriendRequestPack, req.getAppId()); return ResponseVO.successResponse(); }
上图中左面的是微信的,一个用户可以在多个组内,右面的是qq的,一个用户只能在一个分组内,本系统实现的左边的方式,所以要设计一下数据库
这部分主打的就是一个联合,像好友分组创建需要用到添加成员,删除好友分组,也需要清空组内的成员,添加群组成员的时候,也需要获取群组,耦合性很强
单聊不能像群聊那样聊的热火朝天的,所以我们要实现群聊
下面是腾讯云
本系统实现的是这两种群组类型
这里没啥说的
复杂、耦合度高
这里也没啥,就是查询这个group_member就可以找到用户加入的群了
@Select("select group_id from im_group_member where app_id = #{appId} and member_id = #{memberId}")
List<String> getJoinedGroupId(Integer appId, String memberId);
略
略
略
略
这个可以看我的另一篇文章 IO线程模型
这个东西很大,这里就做一点基础的阐述
官网:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是一个异步事件驱动的网络应用程序框架。用于快速开发可维护的高性能协议服务器和客户端。
官网:
Netty is an NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
Netty是一个NIO客户端-服务器框架,可以快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化和优化了网络编程,如TCP和UDP套接字服务器。
什么应用场景下会用到Netty?
DiscardServerHandler
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { static Set<Channel> channelList = new HashSet<>(); // 有客户端连接进来就触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 通知其他人我上线了 channelList.forEach((e)->{ e.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + "上线了"); }); channelList.add(ctx.channel()); } // 有读写事件发生的时候触发这个方法 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = (String) msg; System.out.println("收到数据: " + message); // // 通知分发给聊天室内所有的客户端 // channelList.forEach((e)->{ // if(e == ctx.channel()){ // e.writeAndFlush("[自己]: " + message); // }else{ // e.writeAndFlush("[客户端]:" + ctx.channel().remoteAddress() + " " + message); // } // }); } /** * channel 处于不活跃的时候会调用 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 通知其他客户端 我下线了 channelList.remove(ctx.channel()); // 通知其他人我上线了 channelList.forEach((e)->{ e.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + "下线了"); }); } }
主要就是写Handler,把复杂的逻辑,用几个API就可以弄好了
DiscardServer
public class DiscardServer { private int port; public DiscardServer(int port){ this.port = port; } public void run(){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 线程池 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. System.out.println("tcp start success"); ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
Starter
public class Starter {
public static void main(String[] args) {
new DiscardServer(8001).run();
}
}
网络调试助手——》操作系统——》网路——》对方操作系统——》找到对应的进程(传过去的不是字符串)
这里使用的是网络调试助手
Netty底层只认ByteBuf,我们不能将字符串直接发送给客户端,所以要在Server中加上一些编解码的代码,然后我们在接受消息的时候,就不用自己去解码了,直接就可以用了
public class DiscardServer { private int port; public DiscardServer(int port){ this.port = port; } public void run(){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 线程池 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { Charset gbk = Charset.forName("GBK"); ch.pipeline().addLast("decoder", new StringDecoder(gbk)); ch.pipeline().addLast("encoder", new StringEncoder(gbk)); ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. System.out.println("tcp start success"); ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
里面的这些Handler都要注意位置
这里启动好聊天室的程序并启动一个python脚本向服务端循环发送消息
python
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("127.0.0.1",8001))
for i in range(100):
print(i)
string = "hello1哈"
body = bytes(string, 'gbk')
s.sendall(body)
当我们执行这个脚本,就会在服务端的控制台看到信息,我们看到的应该是100条一行一行的hello哈应该才是合理的,但是执行后会发现100条消息都显示在同一行了,第二次有的在同一行,有的各自在一行中
第一次发送
第二次发送
产生这个现象的原因就是TCP发送是流式发送的,有的时候发送的一套完整的,有的时候发送的是一段一段的数据,要怎么解决这个问题
第一种解决方案
可以在server的pipeline中加一些东西,去限制读取的字节数,缺点是可能要考虑数据大小的问题
第二种解决方案
加这个分割符号,这个的缺点是正经要读取的数据中,不能在出现分割的字符串了
这里给出用私有协议去解决,也就是比如6123456
,第一个6是要读取后面6个数字
这里先提到ByteBuf的核心API
public class NettyByteBuf { public static void main(String[] args) { // 创建byteBuf对象,该对象内部包含一个字节数组byte[10] ByteBuf byteBuf = Unpooled.buffer(10); System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 8; i++) { byteBuf.writeByte(i); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5; i++) { System.out.println(byteBuf.getByte(i)); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5; i++) { System.out.println(byteBuf.readByte()); } System.out.println("byteBuf=" + byteBuf); System.out.println(byteBuf.readableBytes()); } }
从上面控制台的结果不难看出,ridx的意思是已经读取到哪里了,widx已经占用了多少了,cap是一共的容量有多少
ridx也就是读索引,widx是写索引
常用API | 作用 |
---|---|
Unpooled.buffer(10) | 创建一个字节数组[10] |
byteBuf.writeByte(i) | 往byteBuf中写入i |
byteBuf.getByte(i) | 获取btyeBuf中第i个字节,读索引不动 |
byteBuf.readByte() | 从开头开始读字节,读索引自动的向后移动 |
byteBuf.readableBytes() | 获取到byteBuf中还没有读取到的字节 |
byteBuf.markReaderIndex() | 记录读索引的位置 |
byteBuf.resetReaderIndex() | 返回记录的读索引的位置 |
// 继承了这个类就可以去 自定义协议了 public class MyDecodecer extends ByteToMessageDecoder { // 数据长度 + 数据 @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // 一个int是4字节,可读长度要大于4才可以继续执行 if(byteBuf.readableBytes() < 4){ return; } // 数据长度 int i = byteBuf.readInt(); if(byteBuf.readableBytes() < i){ byteBuf.resetReaderIndex(); return; } // 开辟一个byte数组去接收数据 byte[] data = new byte[i]; byteBuf.readBytes(data); System.out.println(new String(data)); byteBuf.markReaderIndex(); } }
所以就可以自定义一个私有协议,按照你的规则去读取数据,记得把这个放到pipeline里面哦!
这样就可以解决半包和黏包问题了
可以先了解一下短连接和长连接 HTTP长连接和短连接
public class HeartbeatHandler extends ChannelInboundHandlerAdapter { int readTimeout = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // // IdleStateEven 超时类型 IdleStateEvent event = (IdleStateEvent) evt; // ALL_IDLE : 一段时间内没有数据接收或者发送 // READER_IDLE : 一段时间内没有数据接收 // WRITER_IDLE : 一段时间内没有数据发送 if(event.state() == IdleState.READER_IDLE){ readTimeout++; } if(readTimeout >= 3){ System.out.println("超时超过3次,断开连接"); ctx.close(); } System.out.println("触发了:" + event.state() + "事件"); } }
这个实现的效果就是读超时3秒就会触发一次心跳检测,逻辑是超过三次就会断开连接
==UploadFileDecodecer ==
public class UploadFileDecodecer extends ByteToMessageDecoder { // 数据长度 + 数据 @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // 一个int是4字节,可读长度要大于4才可以继续执行 if(byteBuf.readableBytes() < 8){ return; } // 数据长度 int command = byteBuf.readInt(); FileDto fileDto = new FileDto(); fileDto.setCommand(command); // 文件名长度 int fileNameLen = byteBuf.readInt(); if(byteBuf.readableBytes() < fileNameLen){ byteBuf.resetReaderIndex(); return; } // 开辟一个byte数组去接收数据 byte[] data = new byte[fileNameLen]; byteBuf.readBytes(data); String fileName = new String(data); fileDto.setFileName(fileName); if(command == 2){ int dataLen = byteBuf.readInt(); if(byteBuf.readableBytes() < dataLen){ byteBuf.resetReaderIndex(); return; } byte[] fileData = new byte[dataLen]; byteBuf.readBytes(fileData); fileDto.setBytes(fileData); } byteBuf.markReaderIndex(); list.add(fileDto); } }
把这部分放到pipeline中放到UploadFileHandler前面,这里面通过自定义的协议解析出,文件的命令和文件名,文件的具体数据,然后封装到FileDto中,最后放到pipeline中,后面使用即可
UploadFileHandler
public class UploadFileHandler extends ChannelInboundHandlerAdapter { // 有客户端连接进来就触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } // 有读写事件发生的时候触发这个方法 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof FileDto){ FileDto fileDto = (FileDto) msg; if(fileDto.getCommand() == 1){ // 创建文件 File file = new File("E://" + fileDto.getFileName()); if(!file.exists()){ file.createNewFile(); } }else if(fileDto.getCommand() == 2){ // 写入文件 save2File("E://" + fileDto.getFileName(), fileDto.getBytes()); } } } public static boolean save2File(String fname, byte[] msg){ OutputStream fos = null; try{ File file = new File(fname); File parent = file.getParentFile(); boolean bool; if ((!parent.exists()) & (!parent.mkdirs())) { return false; } fos = new FileOutputStream(file,true); fos.write(msg); fos.flush(); return true; }catch (FileNotFoundException e){ return false; }catch (IOException e){ File parent; return false; } finally{ if (fos != null) { try{ fos.close(); }catch (IOException e) {} } } } }
这里用到从解码的地方拿到的FileDto中,没有就创建,有就写
可以使用下面的python脚本测试
#-*- coding: UTF-8 -*- import socket,os,struct s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(("127.0.0.1",8001)) filepath = "D://txt.txt" if os.path.isfile(filepath): filename = os.path.basename(filepath).encode('utf-8') # 请求传输文件 command = 1 body_len = len(filename) fileNameData = bytes(filename) i = body_len.to_bytes(4, byteorder='big') c = command.to_bytes(4, byteorder='big') s.sendall(c + i + fileNameData) fo = open(filepath,'rb') while True: command = 2; c = command.to_bytes(4, byteorder='big') filedata = fo.read(1024) print(len(filedata)) b = len(filedata).to_bytes(4, byteorder='big') if not filedata: break s.sendall(c + i + fileNameData + b + filedata) fo.close() #s.close() else: print(False)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。