赞
踩
MQTT是一种面向物联网通信的主流协议,源和目标之间通过中间代理实现面向主题的发布/订阅通信方式,如下图所示:
发布者发布某个主题的消息,通过MQTT代理Broker处理,已订阅该主题的接收者即可收到消息。为了控制消息的转发流程,需要对MQTT代理Broker逻辑进行定制。因此,需要研究MQTT代理Broker的工作原理。本项目研发了一套开源框架奇辰Open-API,物联网网关系统作为其中一个组件集成了MQTT代理Broker功能,实现MQTT开箱即用。
MQTT底层是通过TCP/IP协议实现的,采用Java的非阻塞的通信框架Netty库实现。Netty开发的基础概念是Channel通道,Channel封装隐藏好了底层Socket的工作。物联网网关在实现MQTT代理Broker时为接入Broker的发布者、订阅者建立Channel,通过对Channel的管理及Channel消息内容的处理实现应用所需的网关业务功能。
基于Java Netty库实现的物联网网关入口如下:
- class ChThread extends Thread {
- private Server server;
-
- public ChThread(Server server) {
- this.server = server;
- }
-
- public void run() {
- server.startup();
- }
- }
-
- @SpringBootApplication
- public class GatewayApplication {
-
- public static void main(String[] args) throws InterruptedException {
- SpringApplication.run(GatewayApplication.class, args);
- MqttBroker mqttBroker = new MqttBroker();
- ChThread mqtt_t = new ChThread(mqttBroker);
- mqtt_t.start();
-
- WSServer wSServer = new WSServer();
- ChThread ws_t = new ChThread(wSServer);
- ws_t.start();
- }
-
- }

考虑到物联网设备可能采用多种协议接入物联网网关,为不同协议开启不同线程进行运行,如第18-20行和第22-24行分别开启了原生MQTT协议服务和基于WebSocket的MQTT协议服务。本文重点介绍原生MQTT协议Broker实现。
Broker的启动如下:
- public class MqttBroker extends Server {
- private int port = 1883;
-
- private NioEventLoopGroup bossGroup;
-
- private NioEventLoopGroup workGroup;
-
- /**
- * 启动服务
- *
- * @throws InterruptedException
- */
- public void startup() {
-
- try {
- bossGroup = new NioEventLoopGroup(1);
- workGroup = new NioEventLoopGroup();
-
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup, workGroup);
- bootstrap.channel(NioServerSocketChannel.class);
-
- bootstrap.option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_BACKLOG, 1024)
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.SO_RCVBUF, 10485760);
-
- bootstrap.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
- bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline channelPipeline = ch.pipeline();
- // 设置读写空闲超时时间
- channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
- channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
- channelPipeline.addLast("decoder", new MqttDecoder());
- channelPipeline.addLast(new MqttHandler());
- }
- });
- ChannelFuture f = bootstrap.bind(port).sync();
- f.channel().closeFuture().sync();
-
- } catch (Exception e) {
- System.out.println("start exception" + e.toString());
- }
-
- }
-
- /**
- * 关闭服务
- */
- public void shutdown() throws InterruptedException {
- if (workGroup != null && bossGroup != null) {
- bossGroup.shutdownGracefully();
- workGroup.shutdownGracefully();
- System.out.println("shutdown success");
- }
- }
- }

关键属性
Broker的运行实例化了Netty的ServerBootstrap服务,服务的两个关键属性bossGroup和workGroup分别管理了Broker的核心线程组与工作线程组。其中:
参数设置
调用ServerBootstrap的option和childOption分别为Channel创建时的属性进行设置。
Broker在处理Channel的消息时采用Pipeline管道的方式,Pipeline由一系列Handler处理器构成,Handler分为Input输入和Output处理器,分别继承Netty的ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。顾名思义,当消息流入网关Broker是依次调用Input Handler进行处理;当消息从网关Broker流出时依次调用Output Handler进行处理。
MQTT协议处理
为了实现对MQTT协议的处理,为Broker添加MQTT解码MqttDecoder和MQTT编码MqttEncoder 分别作用于输入MQTT消息和以MQTT协议输出的内容,见Broker启动代码的第35、36行。
前面工作基于Netty框架实现了基础的消息处理框架,针对不同场景需要自定义网关处理逻辑。在MQTT的Channel处理Pipeline里添加自定义MqttHandler,见Broker启动代码第37行。
MqttHandler的具体处理逻辑如下:
- public class MqttHandler extends ChannelInboundHandlerAdapter {
-
- private Logger log = LoggerFactory.getLogger(this.getClass());
-
- private ChannelSupervise channelSupervise = ChannelSupervise.getBean(ChannelSupervise.class);
-
- private RestTemplate restTemplate = new RestTemplate();
-
- private ReceiveHandler receiveHandler = new ReceiveHandler();
-
- // @Autowired
- // KafkaProducer kafkaProducer;
-
- /**
- * 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
- */
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- super.channelRegistered(ctx);
- channelSupervise.addChannel(ctx.channel());
- }
-
- /**
- * 客户端与服务端 断连时执行 channelInactive方法之后执行
- */
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- super.channelUnregistered(ctx);
- channelSupervise.removeChannel(ctx.channel());
- }
-
- /**
- * 从客户端收到新的数据时,这个方法会在收到消息时被调用
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
- MqttMessage mqttMessage = (MqttMessage) msg;
- log.info("info--" + mqttMessage.toString());
- MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
- Channel channel = ctx.channel();
-
- if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {
- // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
- // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
- MqttConnectPayload payload = (MqttConnectPayload) mqttMessage.payload();
- IotChannel iotChannel = channelSupervise.findChannel(channel.id().asLongText());
- if (iotChannel != null) {
- iotChannel.set_connected(true);
- iotChannel.setClient_id(payload.clientIdentifier());
- iotChannel.setUsername(payload.userName());
- iotChannel.setPassword(payload.passwordInBytes());
- try {
- // RestTemplate restTemplate = RestTemplate.getBean("restTemplate");
- // kafkaProducer.send(mqttMessage);
- ResponseEntity<String> responseEntity = restTemplate.getForEntity(
- "https://iot.lokei.cn/auth/device",
- String.class);
- HttpStatus statusCode = responseEntity.getStatusCode(); // 获取响应码
- if (statusCode == HttpStatus.OK) {
- iotChannel.set_registered(true);
- } else {
- iotChannel.set_registered(true);
- }
- } catch (Exception e) {
- iotChannel.set_registered(false);
- }
- }
- MqttMsgBack.connack(channel, mqttMessage);
- }
-
- switch (mqttFixedHeader.messageType()) {
- case PUBLISH: // 客户端发布消息
- // PUBACK报文是对QoS 1等级的PUBLISH报文的响应
- // System.out.println("123");
- // KafkaProducer kafkaProducer = KafkaProducer.getBean("kafkaProducer");
- // kafkaProducer.send(mqttMessage);
- receiveHandler.receiveMessage(mqttMessage);
- MqttMsgBack.puback(channel, mqttMessage);
- break;
- case PUBREL: // 发布释放
- // PUBREL报文是对PUBREC报文的响应
- // to do
- MqttMsgBack.pubcomp(channel, mqttMessage);
- break;
- case SUBSCRIBE: // 客户端订阅主题
- // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
- // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
- // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
- // to do
- MqttSubscribePayload payload = (MqttSubscribePayload) mqttMessage.payload();
- List<MqttTopicSubscription> topicSubscriptions = payload.topicSubscriptions();
- for (MqttTopicSubscription mqttTopicSubscription : topicSubscriptions) {
- channelSupervise.subTopic(mqttTopicSubscription.topicName(), channel);
- }
- MqttMsgBack.suback(channel, mqttMessage);
- break;
- case UNSUBSCRIBE: // 客户端取消订阅
- // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
- // to do
- MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage;
- channelSupervise.unsubTopic(mqttUnsubscribeMessage.payload().topics(), channel);
- MqttMsgBack.unsuback(channel, mqttMessage);
- break;
- case PINGREQ: // 客户端发起心跳
- // 客户端发送PINGREQ报文给服务端的
- // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
- // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
- MqttMsgBack.pingresp(channel, mqttMessage);
- break;
- case DISCONNECT: // 客户端主动断开连接
- // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
- // to do
- break;
- default:
- break;
- }
- }
-
- /**
- * 从客户端收到新的数据、读取完成时调用
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
- }
-
- /**
- * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- super.exceptionCaught(ctx, cause);
- ctx.close();
- }
-
- /**
- * 客户端与服务端第一次建立连接时执行
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- }
-
- /**
- * 客户端与服务端 断连时执行
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
- super.channelInactive(ctx);
- }
-
- /**
- * 服务端 当读超时时 会调用这个方法
- */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
- super.userEventTriggered(ctx, evt);
- ctx.close();
- }
-
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- super.channelWritabilityChanged(ctx);
- }
- }

MqttHandler继承自ChannelInboundHandlerAdapter实现对输入Mqtt消息进行自定义处理。核心业务代码见第36行,当检测到Channel有数据后读取Mqtt消息内容类型进行相应处理。定义MqttMsgBack和ReceiveHandler分别对收到Mqtt消费进行消息反馈和消息进一步处理。
Mqtt消息订阅
收到Mqtt消息识别出消息主题后,为了将消息转发给订阅者,在ReceiveHandler里面实现此功能如下:
- public class ReceiveHandler {
-
- private ChannelSupervise channelSupervise = ChannelSupervise.getBean(ChannelSupervise.class);
-
- public void receiveMessage(MqttMessage mqttMessage) {
- MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
- String topic = mqttPublishMessage.variableHeader().topicName();
- ChannelGroup channelGroup = channelSupervise.subChannels(topic);
- if ((!(channelGroup == null)) && channelGroup.size() > 0) {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false,
- mqttPublishMessage.fixedHeader().qosLevel(), false,
- mqttPublishMessage.fixedHeader().remainingLength());
- // 构建返回报文, 可变报头
- MqttPublishVariableHeader mqttMessageIdVariableHeader = new MqttPublishVariableHeader(topic,
- mqttPublishMessage.variableHeader().packetId());
- MqttPublishMessage mqttMessagePublish = new MqttPublishMessage(mqttFixedHeader, mqttMessageIdVariableHeader,
- mqttPublishMessage.payload());
- channelGroup.writeAndFlush(mqttMessagePublish);
- }
- }
- }

在第8行获取所有订阅过当前处理主题的Channel,将消息已Mqtt协议方法发送出去。
开源项目:Open-Api
更多信息:www.lokei.cn
到此已实现一个开源MQTT网关基础框架。后续可以继续完善的地方主要有:考虑到前端技术大量使用http、web方式,研究在Websocket之上如何传输MQTT消息,这样可以打通前端主流技术和底层设备的MQTT接入。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。