当前位置:   article > 正文

【开源物联网】MQTT物联网网关Broker与Java开源实现_mqtt物联网协议java实现

mqtt物联网协议java实现

1、概述

MQTT是一种面向物联网通信的主流协议,源和目标之间通过中间代理实现面向主题的发布/订阅通信方式,如下图所示:

发布者发布某个主题的消息,通过MQTT代理Broker处理,已订阅该主题的接收者即可收到消息。为了控制消息的转发流程,需要对MQTT代理Broker逻辑进行定制。因此,需要研究MQTT代理Broker的工作原理。本项目研发了一套开源框架奇辰Open-API,物联网网关系统作为其中一个组件集成了MQTT代理Broker功能,实现MQTT开箱即用。

2、实现原理

MQTT底层是通过TCP/IP协议实现的,采用Java的非阻塞的通信框架Netty库实现。Netty开发的基础概念是Channel通道,Channel封装隐藏好了底层Socket的工作。物联网网关在实现MQTT代理Broker时为接入Broker的发布者、订阅者建立Channel,通过对Channel的管理及Channel消息内容的处理实现应用所需的网关业务功能。

3、开源实现

3.1程序入口

基于Java Netty库实现的物联网网关入口如下:

  1. class ChThread extends Thread {
  2. private Server server;
  3. public ChThread(Server server) {
  4. this.server = server;
  5. }
  6. public void run() {
  7. server.startup();
  8. }
  9. }
  10. @SpringBootApplication
  11. public class GatewayApplication {
  12. public static void main(String[] args) throws InterruptedException {
  13. SpringApplication.run(GatewayApplication.class, args);
  14. MqttBroker mqttBroker = new MqttBroker();
  15. ChThread mqtt_t = new ChThread(mqttBroker);
  16. mqtt_t.start();
  17. WSServer wSServer = new WSServer();
  18. ChThread ws_t = new ChThread(wSServer);
  19. ws_t.start();
  20. }
  21. }

考虑到物联网设备可能采用多种协议接入物联网网关,为不同协议开启不同线程进行运行,如第18-20行和第22-24行分别开启了原生MQTT协议服务和基于WebSocket的MQTT协议服务。本文重点介绍原生MQTT协议Broker实现。

3.2Broker启动

Broker的启动如下:

  1. public class MqttBroker extends Server {
  2. private int port = 1883;
  3. private NioEventLoopGroup bossGroup;
  4. private NioEventLoopGroup workGroup;
  5. /**
  6. * 启动服务
  7. *
  8. * @throws InterruptedException
  9. */
  10. public void startup() {
  11. try {
  12. bossGroup = new NioEventLoopGroup(1);
  13. workGroup = new NioEventLoopGroup();
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. bootstrap.group(bossGroup, workGroup);
  16. bootstrap.channel(NioServerSocketChannel.class);
  17. bootstrap.option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_BACKLOG, 1024)
  18. .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  19. .option(ChannelOption.SO_RCVBUF, 10485760);
  20. bootstrap.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)
  21. .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  22. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  23. protected void initChannel(SocketChannel ch) {
  24. ChannelPipeline channelPipeline = ch.pipeline();
  25. // 设置读写空闲超时时间
  26. channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
  27. channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
  28. channelPipeline.addLast("decoder", new MqttDecoder());
  29. channelPipeline.addLast(new MqttHandler());
  30. }
  31. });
  32. ChannelFuture f = bootstrap.bind(port).sync();
  33. f.channel().closeFuture().sync();
  34. } catch (Exception e) {
  35. System.out.println("start exception" + e.toString());
  36. }
  37. }
  38. /**
  39. * 关闭服务
  40. */
  41. public void shutdown() throws InterruptedException {
  42. if (workGroup != null && bossGroup != null) {
  43. bossGroup.shutdownGracefully();
  44. workGroup.shutdownGracefully();
  45. System.out.println("shutdown success");
  46. }
  47. }
  48. }

关键属性

Broker的运行实例化了Netty的ServerBootstrap服务,服务的两个关键属性bossGroup和workGroup分别管理了Broker的核心线程组与工作线程组。其中:

  • bossGroup核心线程组负责Broker的核心业务,包括监听客户端的连接并进行相应处理;
  • workGroup承载了Broker为每个客户端连接建立的Channel的运行。

参数设置

 调用ServerBootstrap的option和childOption分别为Channel创建时的属性进行设置。

3.3Channel消息处理

Broker在处理Channel的消息时采用Pipeline管道的方式,Pipeline由一系列Handler处理器构成,Handler分为Input输入和Output处理器,分别继承Netty的ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。顾名思义,当消息流入网关Broker是依次调用Input Handler进行处理;当消息从网关Broker流出时依次调用Output Handler进行处理。

MQTT协议处理

为了实现对MQTT协议的处理,为Broker添加MQTT解码MqttDecoder和MQTT编码MqttEncoder 分别作用于输入MQTT消息和以MQTT协议输出的内容,见Broker启动代码的第35、36行。

3.4MQTT网关业务

前面工作基于Netty框架实现了基础的消息处理框架,针对不同场景需要自定义网关处理逻辑。在MQTT的Channel处理Pipeline里添加自定义MqttHandler,见Broker启动代码第37行。

MqttHandler的具体处理逻辑如下:

  1. public class MqttHandler extends ChannelInboundHandlerAdapter {
  2. private Logger log = LoggerFactory.getLogger(this.getClass());
  3. private ChannelSupervise channelSupervise = ChannelSupervise.getBean(ChannelSupervise.class);
  4. private RestTemplate restTemplate = new RestTemplate();
  5. private ReceiveHandler receiveHandler = new ReceiveHandler();
  6. // @Autowired
  7. // KafkaProducer kafkaProducer;
  8. /**
  9. * 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
  10. */
  11. @Override
  12. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  13. super.channelRegistered(ctx);
  14. channelSupervise.addChannel(ctx.channel());
  15. }
  16. /**
  17. * 客户端与服务端 断连时执行 channelInactive方法之后执行
  18. */
  19. @Override
  20. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  21. super.channelUnregistered(ctx);
  22. channelSupervise.removeChannel(ctx.channel());
  23. }
  24. /**
  25. * 从客户端收到新的数据时,这个方法会在收到消息时被调用
  26. */
  27. @Override
  28. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
  29. MqttMessage mqttMessage = (MqttMessage) msg;
  30. log.info("info--" + mqttMessage.toString());
  31. MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
  32. Channel channel = ctx.channel();
  33. if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {
  34. // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
  35. // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
  36. MqttConnectPayload payload = (MqttConnectPayload) mqttMessage.payload();
  37. IotChannel iotChannel = channelSupervise.findChannel(channel.id().asLongText());
  38. if (iotChannel != null) {
  39. iotChannel.set_connected(true);
  40. iotChannel.setClient_id(payload.clientIdentifier());
  41. iotChannel.setUsername(payload.userName());
  42. iotChannel.setPassword(payload.passwordInBytes());
  43. try {
  44. // RestTemplate restTemplate = RestTemplate.getBean("restTemplate");
  45. // kafkaProducer.send(mqttMessage);
  46. ResponseEntity<String> responseEntity = restTemplate.getForEntity(
  47. "https://iot.lokei.cn/auth/device",
  48. String.class);
  49. HttpStatus statusCode = responseEntity.getStatusCode(); // 获取响应码
  50. if (statusCode == HttpStatus.OK) {
  51. iotChannel.set_registered(true);
  52. } else {
  53. iotChannel.set_registered(true);
  54. }
  55. } catch (Exception e) {
  56. iotChannel.set_registered(false);
  57. }
  58. }
  59. MqttMsgBack.connack(channel, mqttMessage);
  60. }
  61. switch (mqttFixedHeader.messageType()) {
  62. case PUBLISH: // 客户端发布消息
  63. // PUBACK报文是对QoS 1等级的PUBLISH报文的响应
  64. // System.out.println("123");
  65. // KafkaProducer kafkaProducer = KafkaProducer.getBean("kafkaProducer");
  66. // kafkaProducer.send(mqttMessage);
  67. receiveHandler.receiveMessage(mqttMessage);
  68. MqttMsgBack.puback(channel, mqttMessage);
  69. break;
  70. case PUBREL: // 发布释放
  71. // PUBREL报文是对PUBREC报文的响应
  72. // to do
  73. MqttMsgBack.pubcomp(channel, mqttMessage);
  74. break;
  75. case SUBSCRIBE: // 客户端订阅主题
  76. // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
  77. // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
  78. // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
  79. // to do
  80. MqttSubscribePayload payload = (MqttSubscribePayload) mqttMessage.payload();
  81. List<MqttTopicSubscription> topicSubscriptions = payload.topicSubscriptions();
  82. for (MqttTopicSubscription mqttTopicSubscription : topicSubscriptions) {
  83. channelSupervise.subTopic(mqttTopicSubscription.topicName(), channel);
  84. }
  85. MqttMsgBack.suback(channel, mqttMessage);
  86. break;
  87. case UNSUBSCRIBE: // 客户端取消订阅
  88. // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
  89. // to do
  90. MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage;
  91. channelSupervise.unsubTopic(mqttUnsubscribeMessage.payload().topics(), channel);
  92. MqttMsgBack.unsuback(channel, mqttMessage);
  93. break;
  94. case PINGREQ: // 客户端发起心跳
  95. // 客户端发送PINGREQ报文给服务端的
  96. // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
  97. // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
  98. MqttMsgBack.pingresp(channel, mqttMessage);
  99. break;
  100. case DISCONNECT: // 客户端主动断开连接
  101. // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
  102. // to do
  103. break;
  104. default:
  105. break;
  106. }
  107. }
  108. /**
  109. * 从客户端收到新的数据、读取完成时调用
  110. */
  111. @Override
  112. public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
  113. }
  114. /**
  115. * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
  116. */
  117. @Override
  118. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  119. super.exceptionCaught(ctx, cause);
  120. ctx.close();
  121. }
  122. /**
  123. * 客户端与服务端第一次建立连接时执行
  124. */
  125. @Override
  126. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  127. super.channelActive(ctx);
  128. }
  129. /**
  130. * 客户端与服务端 断连时执行
  131. */
  132. @Override
  133. public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
  134. super.channelInactive(ctx);
  135. }
  136. /**
  137. * 服务端 当读超时时 会调用这个方法
  138. */
  139. @Override
  140. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
  141. super.userEventTriggered(ctx, evt);
  142. ctx.close();
  143. }
  144. @Override
  145. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  146. super.channelWritabilityChanged(ctx);
  147. }
  148. }

MqttHandler继承自ChannelInboundHandlerAdapter实现对输入Mqtt消息进行自定义处理。核心业务代码见第36行,当检测到Channel有数据后读取Mqtt消息内容类型进行相应处理。定义MqttMsgBack和ReceiveHandler分别对收到Mqtt消费进行消息反馈和消息进一步处理。

Mqtt消息订阅

收到Mqtt消息识别出消息主题后,为了将消息转发给订阅者,在ReceiveHandler里面实现此功能如下:

  1. public class ReceiveHandler {
  2. private ChannelSupervise channelSupervise = ChannelSupervise.getBean(ChannelSupervise.class);
  3. public void receiveMessage(MqttMessage mqttMessage) {
  4. MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
  5. String topic = mqttPublishMessage.variableHeader().topicName();
  6. ChannelGroup channelGroup = channelSupervise.subChannels(topic);
  7. if ((!(channelGroup == null)) && channelGroup.size() > 0) {
  8. MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false,
  9. mqttPublishMessage.fixedHeader().qosLevel(), false,
  10. mqttPublishMessage.fixedHeader().remainingLength());
  11. // 构建返回报文, 可变报头
  12. MqttPublishVariableHeader mqttMessageIdVariableHeader = new MqttPublishVariableHeader(topic,
  13. mqttPublishMessage.variableHeader().packetId());
  14. MqttPublishMessage mqttMessagePublish = new MqttPublishMessage(mqttFixedHeader, mqttMessageIdVariableHeader,
  15. mqttPublishMessage.payload());
  16. channelGroup.writeAndFlush(mqttMessagePublish);
  17. }
  18. }
  19. }

在第8行获取所有订阅过当前处理主题的Channel,将消息已Mqtt协议方法发送出去。

4、更多

开源项目:Open-Api

 更多信息:www.lokei.cn 

5、后续完善

 到此已实现一个开源MQTT网关基础框架。后续可以继续完善的地方主要有:考虑到前端技术大量使用http、web方式,研究在Websocket之上如何传输MQTT消息,这样可以打通前端主流技术和底层设备的MQTT接入。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/850677
推荐阅读
相关标签
  

闽ICP备14008679号