当前位置:   article > 正文

基于Netty的UDP服务端开发_netty udp

netty udp

1.前言

之前基于Netty做了一套TCP与MQTT的服务端,随着系统接入的终端类型越来越多,出现了UDP通讯的设备,虽然这样的设备并非主流,而且通讯机制存在问题,为了考虑系统的兼容性,只能将整套服务做全。

2.UDP通讯优缺点

UDP 是一种面向非连接的协议,面向非连接指的是在正式通信前不必与对方先建立连接,不管对方状态就直接发送数据。至于对方是否可以接收到这些数据,UDP 协议无法控制,所以说 UDP 是一种不可靠的协议。

UDP 协议适用于一次只传送少量数据、对可靠性要求不高的应用环境。

与TCP 协议一样,UDP 协议直接位于 IP 协议之上。实际上,IP 协议属于 OSI 参考模型的网络层协议,而 UDP 协议和 TCP 协议都属于传输层协议。

因为 UDP 是面向非连接的协议,没有建立连接的过程,因此它的通信效率很高,但也正因为如此,它的可靠性不如 TCP 协议。

UDP 协议的主要作用是完成网络数据流和数据报之间的转换在信息的发送端,UDP 协议将网络数据流封装成数据报,然后将数据报发送出去;在信息的接收端,UDP 协议将数据报转换成实际数据内容。

可以认为 UDP 协议的 socket 类似于码头,数据报则类似于集装箱。码头的作用就是负责友送、接收集装箱,而 socket 的作用则是发送、接收数据报。因此,对于基于 UDP 协议的通信双方而言,没有所谓的客户端和服务器端的概念。

3.源码示例

UDP的源码包含了我这边的整个系统架构层面的内容,并非完整的源码,可以根据自己的系统结构进行调整

3.1.启动UDP服务

  1. /**
  2. * 网关数据监听
  3. * @author lenny
  4. * @date 20220314
  5. */
  6. @Component
  7. public class ApplicationEventListener implements CommandLineRunner {
  8. //因为我的网关服务是按终端协议节点进行区分的,当终端上下行数据都需要指定这个节点名称然后给到MQ
  9. @Value("${spring.application.name}")
  10. private String nodeName;
  11. //UDP服务的监听端口
  12. @Value("${gnss.udpserver.udpPort}")
  13. private int udpPort;
  14. @Override
  15. public void run(String... args) throws Exception {
  16. //启动UDP服务
  17. startUdpServer();
  18. //清除Redis所有此节点的在线终端(业务需要,Demo可以不用)
  19. RedisService redisService = SpringBeanService.getBean(RedisService.class);
  20. redisService.deleteAllOnlineTerminals(nodeName);
  21. //将所有此节点的终端设置为离线(业务需要,Demo可以不用)
  22. RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
  23. messageSender.noticeAllOffline(nodeName);
  24. }
  25. /**
  26. * 启动udp服务
  27. *
  28. * @throws Exception
  29. */
  30. private void startUdpServer() throws Exception {
  31. //计数器,必须等到所有服务启动成功才能进行后续的操作
  32. final CountDownLatch countDownLatch = new CountDownLatch(1);
  33. //启动UDP服务
  34. UdpServer udpServer = new UdpServer(udpPort, ProtocolEnum.UDP, countDownLatch);
  35. udpServer.start();
  36. //等待启动完成
  37. countDownLatch.await();
  38. }
  39. }

3.2.UdpServer类

  1. /**
  2. * UDP服务类
  3. * @author lenny
  4. * @date 20220316
  5. */
  6. @Slf4j
  7. public class UdpServer extends Thread{
  8. private int port;
  9. private ProtocolEnum protocolType;
  10. private EventLoopGroup workerGroup;
  11. private Bootstrap bootstrap = new Bootstrap();
  12. private CountDownLatch countDownLatch;
  13. public UdpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) {
  14. this.port = port;
  15. this.protocolType = protocolType;
  16. this.countDownLatch = countDownLatch;
  17. final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class);
  18. workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class);
  19. bootstrap.group( workerGroup)
  20. .channel(NioDatagramChannel.class)
  21. .option(ChannelOption.SO_BROADCAST, true)
  22. .handler(new ChannelInitializer<NioDatagramChannel>(){
  23. @Override
  24. protected void initChannel(NioDatagramChannel ch) throws Exception {
  25. ch.pipeline().addLast(new IdleStateHandler(UdpConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
  26. //数据初步解析,并进行组包拆包处理
  27. ch.pipeline().addLast(new ProtocolDecoder(protocolType));
  28. //登录鉴权业务
  29. ch.pipeline().addLast(UdpLoginHandler.INSTANCE);
  30. //数据完全解码,并根据数据进行业务处理
  31. ch.pipeline().addLast(executorGroup, UdpBusinessHandler.INSTANCE);
  32. }
  33. });
  34. }
  35. @Override
  36. public void run() {
  37. bind();
  38. }
  39. private void bind(){
  40. bootstrap.bind(port).addListener(future -> {
  41. if (future.isSuccess()) {
  42. log.info("UDP服务器启动,端口:{}" , port);
  43. countDownLatch.countDown();
  44. } else {
  45. log.error("UDP服务器启动失败,端口:{}", port, future.cause());
  46. System.exit(-1);
  47. }
  48. });
  49. }
  50. }

3.3.ProtocolDecoder数据预处理

  1. /**
  2. * <p>Description: 常规消息体初步解析解释器</p>
  3. *
  4. * @author lenny
  5. * @version 1.0.1
  6. * @date 2022-03-16
  7. */
  8. @Slf4j
  9. public class ProtocolDecoder extends MessageToMessageDecoder<DatagramPacket> {
  10. /**
  11. * 协议类型
  12. */
  13. private ProtocolEnum protocolType;
  14. public ProtocolDecoder(ProtocolEnum protocolType) {
  15. this.protocolType=protocolType;
  16. }
  17. @Override
  18. protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
  19. ByteBuf in=datagramPacket.content();
  20. log.info("收到:{}", ByteBufUtil.hexDump(in));
  21. Object decoded = null;
  22.         //decodePacket数据粘包与半包处理
  23. Object obj= PacketUtil.decodePacket(in);
  24. if(obj!=null) {
  25.             //按协议进行数据初步解析,得到对应的设备ID与消息体,通过设备ID可以进行鉴权处理
  26. decoded = decodeMessage((ByteBuf) obj);
  27. } else {
  28. return;
  29. }
  30. if (decoded != null) {
  31. out.add(decoded);
  32. }
  33. }
  34. /**
  35. * 预处理包(使用的是博实结的一款UDP通讯的产品)
  36. * @param frame
  37. * @return
  38. */
  39. private Object decodeMessage(ByteBuf frame) {
  40. //将完整包输出为16进制字符串
  41. String rowData=ByteBufUtil.hexDump(frame);
  42. frame.readShort();
  43. //信令
  44. int msgId=frame.readUnsignedByte();
  45. //消息长度(包长字节位置后的第一字节开始直到包尾的长度)
  46. int bodyLen=frame.readUnsignedShort();
  47. //伪IP
  48. byte[] terminalNumArr = new byte[4];
  49. frame.readBytes(terminalNumArr);
  50. //终端ID
  51. String terminalNum= CommonUtil.parseTerminalNum(terminalNumArr);
  52. //剩余消息体
  53. byte[] msgBodyArr = new byte[bodyLen-6];
  54. frame.readBytes(msgBodyArr);
  55. //校验—+包尾
  56. frame.readShort();
  57. //处理成统一标准给到处理器去处理后续的解析
  58. UdpMessage message=new UdpMessage();
  59. message.setTerminalNumArr(terminalNumArr);
  60. message.setTerminalNum(terminalNum);
  61. message.setMsgId(msgId);
  62. message.setProtocolType(protocolType);
  63. message.setMsgBodyArr(msgBodyArr);
  64. message.setMsgBody(Unpooled.wrappedBuffer(msgBodyArr));
  65. return message;
  66. }
  67. }

3.4.登录鉴权

我这边是通过当前设备是否在系统注册进行鉴权处理的

  1. /**
  2. * 登录业务处理
  3. * @author Lenny
  4. * @date 20200314
  5. */
  6. @Slf4j
  7. @ChannelHandler.Sharable
  8. public class UdpLoginHandler extends SimpleChannelInboundHandler<UdpMessage> {
  9. public static final UdpLoginHandler INSTANCE = new UdpLoginHandler();
  10. private RedisService redisService;
  11. private RabbitMessageSender messageSender;
  12. private Environment environment;
  13. private UdpLoginHandler() {
  14. redisService = SpringBeanService.getBean(RedisService.class);
  15. messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
  16. environment = SpringBeanService.getBean(Environment.class);
  17. }
  18. @Override
  19. protected void channelRead0(ChannelHandlerContext ctx, UdpMessage msg) throws Exception {
  20. //协议类型
  21. ProtocolEnum protocolType = msg.getProtocolType();
  22. //查询终端号码有无在平台注册
  23. String terminalNum = msg.getTerminalNum();
  24. TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(terminalNum);
  25. if (terminalInfo == null) {
  26. log.error("终端登录失败,未找到终端信息,协议:{},终端号:{},消息:{}", protocolType, terminalNum, msg);
  27. if (msg.getMsgBody() != null) {
  28. ReferenceCountUtil.release(msg.getMsgBody());
  29. }
  30. ctx.close();
  31. return;
  32. }
  33. //设置协议类型
  34. terminalInfo.setProtocolType(protocolType);
  35. //设置节点名
  36. terminalInfo.setNodeName(environment.getProperty("spring.application.name"));
  37. //保存终端信息和消息流水号到上下文属性中
  38. Session session = new Session(terminalInfo);
  39. ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx);
  40. if (oldCtx == null) {
  41. log.info("终端登录成功,协议:{},终端ID:{},终端号:{}", protocolType, terminalInfo.getTerminalStrId(), terminalNum);
  42. } else {
  43. log.info("终端重复登录关闭上一个连接,协议:{},终端ID:{},终端号:{}", protocolType, terminalInfo.getTerminalStrId(), terminalNum);
  44. oldCtx.close();
  45. }
  46. //通知上线
  47. messageSender.noticeOnline(terminalInfo);
  48. //登录验证通过后移除此handler
  49. ctx.pipeline().remove(this);
  50. ctx.fireChannelRead(msg);
  51. }
  52. @Override
  53. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  54. }
  55. @Override
  56. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  57. ctx.close();
  58. }
  59. }

3.5.业务处理

  1. /**
  2. * 业务处理
  3. * @author lenny
  4. * @date 20220314
  5. */
  6. @Slf4j
  7. @Sharable
  8. public class UdpBusinessHandler extends SimpleChannelInboundHandler<UdpMessage> {
  9. public static final UdpBusinessHandler INSTANCE = new UdpBusinessHandler();
  10. private RabbitMessageSender messageSender;
  11. private MessageServiceProvider messageServiceProvider;
  12. private UdpBusinessHandler() {
  13. messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
  14. messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class);
  15. }
  16. @Override
  17. protected void channelRead0(ChannelHandlerContext ctx, UdpMessage msg) throws Exception {
  18. //根据消息类型获取对应的消息处理器
  19. int msgId = msg.getMsgId();
  20. //找到对应的业务处理器
  21. BaseMessageService messageService = messageServiceProvider.getMessageService(msgId);
  22. ByteBuf msgBody = msg.getMsgBody();
  23. try {
  24. //通过业务处理器进行处理
  25. messageService.process(ctx, msg, msgBody);
  26. } catch (Exception e) {
  27. printExceptionLog(msg, messageService, e);
  28. } finally {
  29. ReferenceCountUtil.release(msgBody);
  30. }
  31. }
  32. /**
  33. * 打印异常日志
  34. *
  35. * @param msg
  36. * @param messageService
  37. * @param e
  38. */
  39. private void printExceptionLog(UdpMessage msg, BaseMessageService messageService, Exception e) {
  40. byte[] msgBodyArr = msg.getMsgBodyArr();
  41. log.error("收到{}({}),消息异常,协议:{},终端号码:{},消息体:{}", messageService.getDesc(), NumberUtil.formatMessageId(msg.getMsgId())
  42. , msg.getProtocolType(), msg.getTerminalNum(), ByteBufUtil.hexDump(msgBodyArr), e);
  43. }
  44. @Override
  45. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  46. Session session = SessionUtil.getSession(ctx.channel());
  47. TerminalProto terminalInfo = session.getTerminalInfo();
  48. boolean unbindResult = SessionUtil.unbindSession(ctx);
  49. if (unbindResult) {
  50. //通知离线
  51. messageSender.noticeOffline(terminalInfo);
  52. }
  53. }
  54. @Override
  55. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  56. TerminalProto terminalInfo = SessionUtil.getTerminalInfo(ctx);
  57. log.error("终端连接异常,终端ID:{},终端号码:{}", terminalInfo.getTerminalStrId(), terminalInfo.getTerminalNum(), cause);
  58. ctx.close();
  59. }
  60. @Override
  61. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  62. if (evt instanceof IdleStateEvent) {
  63. TerminalProto terminalInfo = SessionUtil.getTerminalInfo(ctx);
  64. if (terminalInfo == null) {
  65. log.info("{}秒未读到数据,关闭连接", UdpConstant.READER_IDLE_TIME);
  66. } else {
  67. log.info("{}秒未读到数据,关闭连接,终端ID:{},终端号码:{}", UdpConstant.READER_IDLE_TIME, terminalInfo.getTerminalStrId()
  68. , terminalInfo.getTerminalNum());
  69. }
  70. ctx.close();
  71. }
  72. }
  73. }

3.6.业务处理器举例

比如我们收到一个消息ID为0x81的数据

  1. /**
  2. * 点名查看
  3. * @author Lenny
  4. * @date 20220314
  5. */
  6. @Slf4j
  7. @MessageService(messageId = 0x81, desc = "点名查看")
  8. public class Message81Service extends BaseMessageService<UdpMessage> {
  9. @Autowired
  10. private RabbitMessageSender messageSender;
  11. @Override
  12. public Object process(ChannelHandlerContext ctx, UdpMessage msg, ByteBuf msgBodyBuf) throws Exception {
  13. //这里需要对业务进行实际解码,目前并未处理,目的是先验证整个流程的通畅性
  14. log.info(JSON.toJSONString(msg));
  15. return null;
  16. }
  17. }

4.说明

目前我这边已经利用了Netty完成了TCP、MQTT、UDP的通讯服务层,集成了近百款终端协议产品,包含了部标全套的服务,以及第三方企业标准标准几十家,有兴趣的朋友可以联系我。

请添加图片描述

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/201244
推荐阅读
相关标签
  

闽ICP备14008679号