当前位置:   article > 正文

基于SpringBoot整合Netty开发MQTT服务端_netty mqtt

netty mqtt

Netty认知

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,相比传统Socket,在并发性方面有着很大的提升。关于NIO,BIO,AIO之间的区别,可以参考这篇博客:NIO 、 BIO与AIO之间的区别_谁念西风独自凉-CSDN博客

 MQTT服务端实现

首先我们启动一个tcp服务,这里我用到了Redis与RabbitMQ,主要是与分布式WEB平台之间好对接

  1. @Component
  2. public class ApplicationEventListener implements CommandLineRunner {
  3. @Value("${spring.application.name}")
  4. private String nodeName;
  5. @Value("${gnss.mqttserver.tcpPort}")
  6. private int tcpPort;
  7. @Override
  8. public void run(String... args) throws Exception {
  9. //启动TCP服务
  10. startTcpServer();
  11. //清除Redis所有此节点的在线终端
  12. RedisService redisService = SpringBeanService.getBean(RedisService.class);
  13. redisService.deleteAllOnlineTerminals(nodeName);
  14. //将所有此节点的终端设置为离线
  15. RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
  16. messageSender.noticeAllOffline(nodeName);
  17. }
  18. /**
  19. * 启动TCP服务
  20. *
  21. * @throws Exception
  22. */
  23. private void startTcpServer() throws Exception {
  24. //计数器,必须等到所有服务启动成功才能进行后续的操作
  25. final CountDownLatch countDownLatch = new CountDownLatch(1);
  26. //启动TCP服务
  27. TcpServer tcpServer = new TcpServer(tcpPort, ProtocolEnum.MqttCommon, countDownLatch);
  28. tcpServer.start();
  29. //等待启动完成
  30. countDownLatch.await();
  31. }
  32. }

接下来我们编写一个TcpServer类实现TCP服务

  1. @Slf4j
  2. public class TcpServer extends Thread{
  3. private int port;
  4. private ProtocolEnum protocolType;
  5. private EventLoopGroup bossGroup;
  6. private EventLoopGroup workerGroup;
  7. private ServerBootstrap serverBootstrap = new ServerBootstrap();
  8. private CountDownLatch countDownLatch;
  9. public TcpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) {
  10. this.port = port;
  11. this.protocolType = protocolType;
  12. this.countDownLatch = countDownLatch;
  13. bossGroup = new NioEventLoopGroup(1);
  14. workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class);
  15. final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class);
  16. serverBootstrap.group(bossGroup, workerGroup)
  17. .channel(NioServerSocketChannel.class)
  18. .option(ChannelOption.SO_BACKLOG, 1024)
  19. .childOption(ChannelOption.SO_KEEPALIVE, true)
  20. .childOption(ChannelOption.TCP_NODELAY, true)
  21. .childHandler(new ChannelInitializer<SocketChannel>() {
  22. @Override
  23. protected void initChannel(SocketChannel ch) throws Exception {
  24. ch.pipeline().addLast(new IdleStateHandler(MqttConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
  25. ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
  26. ch.pipeline().addLast("decoder", new MqttDecoder());
  27. ch.pipeline().addLast(executorGroup, MqttBusinessHandler.INSTANCE);
  28. }
  29. });
  30. }
  31. @Override
  32. public void run() {
  33. bind();
  34. }
  35. /**
  36. * 绑定端口启动服务
  37. */
  38. private void bind() {
  39. serverBootstrap.bind(port).addListener(future -> {
  40. if (future.isSuccess()) {
  41. log.info("{} MQTT服务器启动,端口:{}", protocolType, port);
  42. countDownLatch.countDown();
  43. } else {
  44. log.error("{} MQTT服务器启动失败,端口:{}", protocolType, port, future.cause());
  45. System.exit(-1);
  46. }
  47. });
  48. }
  49. /**
  50. * 关闭服务端
  51. */
  52. public void shutdown() {
  53. workerGroup.shutdownGracefully();
  54. bossGroup.shutdownGracefully();
  55. log.info("{} TCP服务器关闭,端口:{}", protocolType, port);
  56. }
  57. }

编写一个解码器MqttBusinessHandler,实现对MQTT消息接收与处理

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class MqttBusinessHandler extends SimpleChannelInboundHandler<Object> {
  4. public static final MqttBusinessHandler INSTANCE = new MqttBusinessHandler();
  5. private MqttMsgBack mqttMsgBack;
  6. private MqttBusinessHandler() {
  7. mqttMsgBack= MqttMsgBack.INSTANCE;
  8. }
  9. /**
  10. * 接收到消息后处理
  11. * @param ctx
  12. * @param msg
  13. * @throws Exception
  14. */
  15. @Override
  16. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  17. if (null != msg) {
  18. MqttMessage mqttMessage = (MqttMessage) msg;
  19. MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
  20. Channel channel = ctx.channel();
  21. if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
  22. //在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
  23. //建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
  24. mqttMsgBack.connectionAck(ctx, mqttMessage);
  25. }
  26. switch (mqttFixedHeader.messageType()){
  27. //客户端发布消息
  28. case PUBLISH:
  29. mqttMsgBack.publishAck(ctx, mqttMessage);
  30. break;
  31. //发布释放
  32. case PUBREL:
  33. mqttMsgBack.publishComp(ctx, mqttMessage);
  34. break;
  35. //订阅主题
  36. case SUBSCRIBE:
  37. mqttMsgBack.subscribeAck(ctx, mqttMessage);
  38. break;
  39. //取消订阅主题
  40. case UNSUBSCRIBE:
  41. mqttMsgBack.unsubscribeAck(ctx, mqttMessage);
  42. break;
  43. //客户端发送心跳报文
  44. case PINGREQ:
  45. mqttMsgBack.pingResp(ctx, mqttMessage);
  46. break;
  47. //客户端主动断开连接
  48. case DISCONNECT:
  49. break;
  50. default:
  51. break;
  52. }
  53. }
  54. }
  55. @Override
  56. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  57. log.info("终端关闭连接,IP信息:{}", CommonUtil.getClientAddress(ctx));
  58. }
  59. @Override
  60. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  61. ctx.close();
  62. log.error("终端连接异常,IP信息:{}", CommonUtil.getClientAddress(ctx), cause);
  63. }
  64. /**
  65. * 服务端当读超时时会调用这个方法
  66. */
  67. @Override
  68. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
  69. ctx.close();
  70. log.error("读超时,IP信息:{}", CommonUtil.getClientAddress(ctx), evt);
  71. }

我们对接收到的消息进行业务处理

  1. @Slf4j
  2. public class MqttMsgBack {
  3. public static final MqttMsgBack INSTANCE = new MqttMsgBack();
  4. private RedisService redisService;
  5. private RabbitMessageSender messageSender;
  6. private Environment environment;
  7. private MessageServiceProvider messageServiceProvider;
  8. private MqttMsgBack() {
  9. redisService = SpringBeanService.getBean(RedisService.class);
  10. messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
  11. environment = SpringBeanService.getBean(Environment.class);
  12. messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class);
  13. }
  14. /**
  15. * 确认连接请求
  16. * @param ctx
  17. * @param mqttMessage
  18. */
  19. public void connectionAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
  20. MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
  21. MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
  22. MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
  23. //构建返回报文, 可变报头
  24. MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
  25. //构建返回报文, 固定报头
  26. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  27. //构建连接回复消息体
  28. MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
  29. ctx.writeAndFlush(connAck);
  30. //获取连接者的ClientId
  31. String clientIdentifier = mqttConnectMessage.payload().clientIdentifier();
  32. //查询终端号码有无在平台注册
  33. TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(clientIdentifier);
  34. if (terminalInfo == null) {
  35. log.error("终端登录失败,未找到终端信息,终端号:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx));
  36. ctx.close();
  37. return;
  38. }
  39. //设置节点名
  40. terminalInfo.setNodeName(environment.getProperty("spring.application.name"));
  41. //保存终端信息和消息流水号到上下文属性中
  42. Session session = new Session(terminalInfo);
  43. ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx);
  44. if (oldCtx == null) {
  45. log.info("终端登录成功,终端ID:{},终端号:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx));
  46. } else {
  47. log.info("终端重复登录关闭上一个连接,终端ID:{},终端号:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx));
  48. oldCtx.close();
  49. }
  50. //通知上线
  51. messageSender.noticeOnline(terminalInfo);
  52. log.info("终端登录成功,终端号:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx));
  53. }
  54. /**
  55. * 根据qos发布确认
  56. * @param ctx
  57. * @param mqttMessage
  58. */
  59. public void publishAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
  60. MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
  61. MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
  62. MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
  63. //得到主题
  64. String topicName = mqttPublishMessage.variableHeader().topicName();
  65. //获取消息体
  66. ByteBuf msgBodyBuf = mqttPublishMessage.payload();
  67. log.info("收到:{}", ByteBufUtil.hexDump(msgBodyBuf));
  68. MqttCommonMessage msg=new MqttCommonMessage();
  69. msg.setTerminalNum(SessionUtil.getTerminalInfo(ctx).getTerminalNum());
  70. msg.setStrMsgId(topicName);
  71. //根据主题获取对应的主题消息处理器
  72. BaseMessageService messageService = messageServiceProvider.getMessageService(topicName);
  73. try {
  74. Object result = messageService.process(ctx, msg, msgBodyBuf);
  75. log.info("收到{}({}),终端ID:{},内容:{}", messageService.getDesc(), topicName,msg.getTerminalNum(), msg.getMsgBodyItems());
  76. } catch (Exception e) {
  77. log.error("收到{}({}),消息异常,终端ID:{},消息体:{}", messageService.getDesc(), topicName,msg.getTerminalNum(),ByteBufUtil.hexDump(msgBodyBuf), e);
  78. }
  79. switch (qos) {
  80. //至多一次
  81. case AT_MOST_ONCE:
  82. break;
  83. //至少一次
  84. case AT_LEAST_ONCE:
  85. //构建返回报文, 可变报头
  86. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
  87. //构建返回报文, 固定报头
  88. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  89. //构建PUBACK消息体
  90. MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
  91. log.info("Qos:AT_LEAST_ONCE:{}",pubAck.toString());
  92. ctx.writeAndFlush(pubAck);
  93. break;
  94. //刚好一次
  95. case EXACTLY_ONCE:
  96. //构建返回报文,固定报头
  97. MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
  98. //构建返回报文,可变报头
  99. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
  100. MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
  101. log.info("Qos:EXACTLY_ONCE回复:{}"+mqttMessageBack.toString());
  102. ctx.writeAndFlush(mqttMessageBack);
  103. break;
  104. default:
  105. break;
  106. }
  107. }
  108. /**
  109. * 发布完成 qos2
  110. * @param ctx
  111. * @param mqttMessage
  112. */
  113. public void publishComp (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
  114. MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
  115. //构建返回报文, 固定报头
  116. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
  117. //构建返回报文, 可变报头
  118. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  119. MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
  120. log.info("发布完成回复:{}"+mqttMessageBack.toString());
  121. ctx.writeAndFlush(mqttMessageBack);
  122. }
  123. /**
  124. * 订阅确认
  125. * @param ctx
  126. * @param mqttMessage
  127. */
  128. public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
  129. MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
  130. MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
  131. //构建返回报文, 可变报头
  132. MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  133. Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
  134. List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
  135. for (int i = 0; i < topics.size(); i++) {
  136. grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
  137. }
  138. // 构建返回报文 有效负载
  139. MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
  140. // 构建返回报文 固定报头
  141. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
  142. // 构建返回报文 订阅确认
  143. MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
  144. log.info("订阅回复:{}", subAck.toString());
  145. ctx.writeAndFlush(subAck);
  146. }
  147. /**
  148. * 取消订阅确认
  149. * @param ctx
  150. * @param mqttMessage
  151. */
  152. public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
  153. MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
  154. // 构建返回报文 可变报头
  155. MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  156. // 构建返回报文 固定报头
  157. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
  158. // 构建返回报文 取消订阅确认
  159. MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
  160. log.info("取消订阅回复:{}",unSubAck.toString());
  161. ctx.writeAndFlush(unSubAck);
  162. }
  163. /**
  164. * 心跳响应
  165. * @param ctx
  166. * @param mqttMessage
  167. */
  168. public void pingResp (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
  169. MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
  170. MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
  171. log.info("心跳回复:{}", mqttMessageBack.toString());
  172. ctx.writeAndFlush(mqttMessageBack);
  173. }
  174. }

我们可以根据客户端发布消息的主题匹配不同的处理器

 最后,我们在对应的处理器里面实现对主题消息的处理逻辑,比如:定位消息,指令消息等等,比如简单实现对定位数据Location主题的消息处理

  1. @Slf4j
  2. @MessageService(strMessageId = "Location", desc = "定位")
  3. public class LocationMessageService extends BaseMessageService<MqttCommonMessage> {
  4. @Autowired
  5. private RabbitMessageSender messageSender;
  6. @Override
  7. public Object process(ChannelHandlerContext ctx, MqttCommonMessage msg, ByteBuf msgBodyBuf) throws Exception {
  8. byte[] msgByteArr = new byte[msgBodyBuf.readableBytes()];
  9. msgBodyBuf.readBytes(msgByteArr);
  10. String data = new String(msgByteArr);
  11. msg.putMessageBodyItem("位置", data);
  12. return null;
  13. }
  14. }

 后续

目前仅仅是实现MQTT服务端消息接收与消息回复,后续可以根据接入的物联网设备进行对应主题消息的业务处理

有物联网兴趣的同学可以对加我微信一起交流学习

请添加图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/201291?site
推荐阅读
相关标签
  

闽ICP备14008679号