赞
踩
MQTT
全称(Message Queue Telemetry Transport
):一种基于发布/订阅(publish/subscribe
)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing
)中的一个标准传输协议。
MQTT
是一种基于发布/订阅
模式的轻量级通讯协议,该协议构建在TCP/IP
协议上。 MQTT
最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽
占用的即时通讯协议,MQTT
在物联网、小型设备、移动应用等方面有广泛应用。
MQTT
协议将消息的发布者(publisher
)与订阅者(subscriber
)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ
有点类似。
TCP
协议位于传输层,MQTT
协议位于应用层,MQTT
协议构建于TCP/IP
协议上,也就是说只要支持TCP/IP
协议栈的地方,都可以使用MQTT
协议。
特点:
TCP/IP
网络连接,提供有序,无损,双向连接
QoS
支持,可靠传输保证应用:
MQTT
协议为什么在物联网(IOT
)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP
协议呢?
首先HTTP
协议它是一种同步协议
,客户端请求后需要等待服务器的响应。而在物联网(IOT
)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信
不稳定等,显然异步消息协议更为适合IOT
应用程序。
HTTP
是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT
)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP
要实现这样的功能不但很困难,而且成本极高。
Publisher
(发布者):消息的发出者,负责发送消息Subscriber
(订阅者):消息的订阅者,负责接收并处理消息Broker
(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT
协议的消息中间件都可以充当Topic
(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。Payload
(负载);可以理解为发送消息的内容。QoS
(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2
三个等级,下面分别介绍下:
QoS 0
(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;QoS 1
(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;QoS 2
(Exactly Once):只有一次,确保消息只到达一次MQTT
通过交换一些预定义的MQTT
控制报文来工作,每条MQTT
命令消息的消息头都包含一个固定的报头
,有些消息会携带一个可变报文头
和一个负荷
。消息格式如下:
固定包头,存在于所有MQTT控制包
可变包头,存在于某些MQTT控制包
载荷,存在于某些MQTT控制包
MQTT
固定报文头最少有两个字节
,第一个字节包含消息类型(Message Type)
和QoS级别
等标志位。第二个字节开始是剩余长度
字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节。
剩余长度
使用了一种可变长度的结构来编码,这种结构使用单一字节
表示0-127
的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。
例如十进制的数字64可以被编码成一个单独的字节,十进制为64,八进制为0x40。十进制数字321(=65+2×128)被编码为两个字节,低位在前。第一个字节是65+128 = 193。注意最高位的128表示后面至少还有一个字节。第二个字节是2,表示2*127。(注:321 = 11000001 00000010,第一个字节是“标识符后面还有一个字节”+65,第二个字节是“标识符后面没有字节了”+256)。
可变报文头主要包含协议名
、协议版本
、连接标志(Connect Flags)
、心跳间隔时间(Keep Alive timer)
、连接返回码(Connect Return Code)
、主题名(Topic Name)
等
有效负荷(Payload),可以理解为消息主题(body)
当 MQTT
发送的消息类型是 CONNECT
(连接)、PUBLISH
(发布)、SUBSCRIBE
(订阅)、SUBACK
(订阅确认)、则会带有负荷。
各种类型消息的控制报文参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html
MQTT
的消息类型(Message Type)(控制报文类型)
名字 | 值 | 报文流动方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | 禁止 | 保留 |
消息质量(QoS
):
QoS 0
:最多分发一次。消息的传递完全依赖于底层的TCP/IP
协议,协议里没有定义应答和重试,消息要么只会到达服务端一次,要么根本没有到达。QoS 1
:至少分发一次。服务器的消息接收由PUBACK
消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。QoS 2
:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。通过下面的例子可以更深刻的理解上面三个传输质量等级。
比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0
质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。
之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1
质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。
最后用户用完单车后,需要提交付款表单,可以使用QoS level 2
质量消息,这样确保只传递一次数据,否则用户就会多付钱了。
在Linux上搭建MQTT服务
打开EMQ官网:https://www.emqx.io/cn/products/broker
点击开始试用
选择服务器对应版本
复制下载命令到ssh工具中执行
下载完成
下载完成后执行安装命令
安装成功后执行命令:
sudo emqx start
出现以下信息表示启动成功
浏览器访问ip:18083
进入管理界面,默认账号为admin,密码为public
点击了解 RabbitMQ 搭建
搭建完 RabbitMQ
后,启用 RabbitMQ
的 MQTT
插件了,默认是不启用的,使用如下命令开启即可:rabbitmq-plugins enable rabbitmq_mqtt
开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。
如果使用 RabbitMQ与Web
端交互,那么底层使用的是 WebSocket
,所以需要开启 RabbitMQ
的 MQTT WEB
支持,使用如下命令开启即可:rabbitmq-plugins enable rabbitmq_web_mqtt
开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;
<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
在application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;
rabbitmq:
mqtt:
url: tcp://localhost:1883
username: guest
password: guest
defaultTopic: testTopic
编写一个Java配置类从配置文件中读取配置便于使用;
@Data @EqualsAndHashCode(callSuper = false) @Component @ConfigurationProperties(prefix = "rabbitmq.mqtt") public class MqttConfig { /** * RabbitMQ连接用户名 */ private String username; /** * RabbitMQ连接密码 */ private String password; /** * RabbitMQ的MQTT默认topic */ private String defaultTopic; /** * RabbitMQ的MQTT连接地址 */ private String url; }
添加MQTT消息订阅者相关配置,使用@ServiceActivator
注解声明一个服务激活器,通过MessageHandler来处理订阅消息;
/** * MQTT消息订阅者相关配置 */ @Slf4j @Configuration public class MqttInboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //设置消息质量:0->至多一次;1->至少一次;2->只有一次 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //处理订阅消息 log.info("handleMessage : {}",message.getPayload()); } }; } }
添加MQTT消息发布者相关配置;
/** * MQTT消息发布者相关配置 */ @Configuration public class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttConfig.getUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisherClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
添加MQTT网关,用于向主题中发送消息;
/** * MQTT网关,通过接口将数据传递到集成流 */ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 发送消息到默认topic */ void sendToMqtt(String payload); /** * 发送消息到指定topic */ void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); /** * 发送消息到指定topic并设置QOS */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;
/** * MQTT测试接口 */ @Api(tags = "MqttController", description = "MQTT测试接口") @RestController @RequestMapping("/mqtt") public class MqttController { @Autowired private MqttGateway mqttGateway; @PostMapping("/sendToDefaultTopic") @ApiOperation("向默认主题发送消息") public CommonResult sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload); return CommonResult.success(null); } @PostMapping("/sendToTopic") @ApiOperation("向指定主题发送消息") public CommonResult sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic); return CommonResult.success(null); } }
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
spring: application: name: provider #MQTT配置信息 mqtt: #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883 url: tcp://ip:1883 #用户名 username: admin #密码 password: public #客户端id(不能重复) client: id: provider-id #MQTT默认的消息推送主题,实际可在调用接口时指定 default: topic: topic server: port: 8081
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class MqttProviderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客户端对象 */ private MqttClient client; /** * 客户端连接服务端 */ public void connect(){ try { //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接到服务端都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttProviderCallBack()); client.connect(options); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息 * @param qos 服务质量等级 * 0 只会发送一次,不管成不成功 * 1 未成功会继续发送,直到成功,可能会收到多次 * 2 未成功会继续发送,但会保证只收到一次 * @param retained 保留标志 * 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订阅这个主题时,会把消息推送给这个订阅者 * 但服务端对同一个主题只会保留一条retained消息(最后收到的那条) */ public void publish(int qos,boolean retained,String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主题目的地,用于发布/订阅消息 MqttTopic mqttTopic = client.getTopic(topic); //提供一种机制来跟踪消息的传递进度。 //用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度 MqttDeliveryToken token; try { //将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。 //一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; @Configuration public class MqttProviderCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 与服务器断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { System.out.println(clientId + "与服务器断开连接"); } /** * 消息到达的回调 */ @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { IMqttAsyncClient client = iMqttDeliveryToken.getClient(); System.out.println(client.getClientId() + "发布消息成功!"); } }
import com.xdemo.mqttprovider.mqtt.MqttProviderConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SendController { @Autowired private MqttProviderConfig providerClient; @RequestMapping("/sendMessage") @ResponseBody public String sendMessage(int qos,boolean retained,String topic,String message){ try { providerClient.publish(qos,retained,topic,message); return "发送成功"; }catch (Exception e){ e.printStackTrace(); return "发送失败"; } } }
在父工程下创建一个Springboot项目作为消息消费者,导入以下依赖
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
spring: application: name: consumer #MQTT配置信息 mqtt: #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883 url: tcp://ip:1883 #用户名 username: admin #密码 password: public #客户端id(不能重复) client: id: consumer-id #MQTT默认的消息推送主题,实际可在调用接口时指定 default: topic: topic server: port: 8082
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration public class MqttConsumerConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init(){ connect(); } /** * 客户端连接服务端 */ public void connect(){ try { //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接到服务端都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttConsumerCallBack()); client.connect(options); //订阅主题 //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 int[] qos = {1,1}; //主题 String[] topics = {"topic1","topic2"}; //订阅主题 client.subscribe(topics,qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 断开连接 */ public void disConnect(){ try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅主题 */ public void subscribe(String topic,int qos){ try { client.subscribe(topic,qos); } catch (MqttException e) { e.printStackTrace(); } } }
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttConsumerCallBack implements MqttCallback { /** * 客户端断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { System.out.println("与服务器断开连接,可重连"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class TestController { @Autowired private MqttConsumerConfig client; @Value("${spring.mqtt.client.id}") private String clientId; @RequestMapping("connect") @ResponseBody public String connect(){ client.connect(); return clientId + "连接到服务器"; } @RequestMapping("disConnect") @ResponseBody public String disConnect(){ client.disConnect(); return clientId + "与服务器断开连接"; } }
可以使用 MQTT
客户端来测试 MQTT
的即时通讯功能,这里使用的是 MQTTBox
这个客户端工具。
首先下载并安装好 MQTTBox
,下载地址:http://workswithweb.com/mqttbox.html
点击Create MQTT Client按钮来创建一个MQTT客户端;
接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;
再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;
发布者向主题中发布消息,订阅者可以实时接收到
分别启动两个项目,可以在管理界面看到创建的两个客户端
调用发布消息接口发布消息
消费者控制台打印
客户端断线消息恢复
把消费者与服务端断开连接
再调用发布消息接口发送两条消息到topic1,然后再把消费者连接到服务端
控制台没有东西打印
修改消费者客户端配置,把setCleanSession改为false
重启项目,把消费者客户端断开连接,调用发布消息接口发布两条消息,再把消费者和服务端连接上
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。