当前位置:   article > 正文

一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布_springboot mqtt

springboot mqtt

一、MQTT介绍

1.1 什么是MQTT?

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

图片

MQTT具有协议简洁、轻巧、可扩展性强、低开销、低带宽占用等优点,已经有PHP,JAVA,Python,C,C#,Go等多个语言版本,基本可以使用在任何平台上。在物联网、小型设备、移动应用等方面有较广泛的应用,特别适合用来当做物联网的通信协议。

1.2 MQTT特点

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

MQTT协议是为硬件性能有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  • 1.使用发布/订阅消息模式,提供多对多的消息发布,解除应用程序耦合;

  • 2.对负载内容屏蔽的消息传输;

  • 3.使用TCP/IP 提供网络连接;

  • 4.支持三种消息发布服务质量(QoS):

    • QoS 0(最多一次):消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这个级别可用于如下情况,环境传感器数据,丢失一次数据无所谓,因为不久后还会有第二次发送。

    • QoS 1(至少一次):确保消息到达,但消息重复可能会发生。

    • QoS 2(只有一次):确保消息到达一次。这个级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

  • 5.传输数据小,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;(用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。)

1.3 MQTT应用场景

MQTT作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有着广泛的应用。MQTT服务只负责消息的接收和传递,应用系统连接到MQTT服务器后,可以实现采集数据接收、解析、业务处理、存储入库、数据展示等功能。常见的应用场景主要有以下几个方面:

(1)消息推送: 如PC端的推送公告,比如安卓的推送服务,还有一些即时通信软件如微信、易信等也是采用的推送技术。

(2)智能点餐: 通过MQTT消息队列产品,消费者可在餐桌上扫码点餐,并与商家后端系统连接实现自助下单、支付。

(3)信息更新: 实现商场超市等场所的电子标签、公共场所的多媒体屏幕的显示更新管理。

(4)扫码出站: 最常见的停车场扫码缴费,自动起竿;地铁闸口扫码进出站。

二、MQTT的角色组成

2.1 MQTT的客户端和服务端

2.1.1 服务端(Broker)

EMQX就是一个MQTT的Broker,emqx只是基于erlang语言开发的软件而已,其它的MQ还有ActiveMQ、RabbitMQ、HiveMQ等等。

EMQX服务端:https://www.emqx.io/zh/downloads?os=Windows

2.1.2 客户端(发布/订阅)

EMQX客户端:https://mqttx.app/zh

这个是用来测试验证的客户端,实际项目是通过代码来实现我们消息的生产者和消费者。

2.2 MQTT中的几个概念

相比RabbitMQ等消息队列,MQTT要相对简单一些,只有Broker、Topic、发布者、订阅者等几部分构成。接下来我们先简单整理下MQTT日常使用中最常见的几个概念:

  • 1.Topic主题:MQTT消息的主要传播途径, 我们向主题发布消息, 订阅主题, 从主题中读取消息并进行.业务逻辑处理, 主题是消息的通道

  • 2.生产者:MQTT消息的发送者, 他们向主题发送消息

  • 3.消费者:MQTT消息的接收者, 他们订阅自己需要的主题, 并从中获取消息

  • 4.broker服务:消息转发器, 消息是通过它来承载的, EMQX就是我们的broker, 在使用中我们不用关心它的具体实现

其实, MQTT的使用流程就是: 生产者给broker的某个topic发消息->broker通过topic进行消息的传递->订阅该主题的消费者拿到消息并进行相应的业务逻辑

三、EMQX的安装和使用

下面以Windows为例,演示Windows下如何安装和使用EXQX。

step 1:下载EMQ安装包,配置EMQ环境

EMQX服务端:https://www.emqx.io/zh/downloads?os=Windows

step 2:下载压缩包解压,cmd进入bin文件夹

图片

step 3:启动EMQX服务

在命令行输入:emqx start 启动服务,打卡浏览器输入:http://localhost:18083/ 进入登录页面。默认用户名密码 admin/public 。登录成功后,会进入emqx的后台管理页面,如下图所示:

图片

四、使用SpringBoot整合MQTT协议

前面介绍了MQTT协议以及如何安装和启动MQTT服务。接下来演示如何在SpringBoot项目中整合MQTT实现消息的订阅和发布。

4.1 创建工程

首先,创建spring-boot-starter-mqtt父工程,在父工程下分别创建消息的提供者spring-boot-starter-mqtt-provider 模块和消息的消费者spring-boot-starter-mqtt-consumer模块。

图片

4.2 实现生产者

接下来,修改生产者模块spring-boot-starter-mqtt-provider 相关的代码,实现消息发布的功能模块。

4.2.1 导入依赖包

修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.integration</groupId>
  17. <artifactId>spring-integration-mqtt</artifactId>
  18. <version>5.3.2.RELEASE</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.projectlombok</groupId>
  22. <artifactId>lombok</artifactId>
  23. <version>1.18.4</version>
  24. </dependency>
  25. </dependencies>
4.2.2 修改配置文件

修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示:

  1. spring:
  2. application:
  3. name: provider
  4. #MQTT配置信息
  5. mqtt:
  6. #MQTT服务地址,端口号默认11883,如果有多个,用逗号隔开
  7. url: tcp://127.0.0.1:11883
  8. #用户名
  9. username: admin
  10. #密码
  11. password: public
  12. #客户端id(不能重复)
  13. client:
  14. id: provider-id
  15. #MQTT默认的消息推送主题,实际可在调用接口是指定
  16. default:
  17. topic: topic
  18. server:
  19. port: 8080

4.2.3 消息生产者客户端配置

创建MqttProviderConfig配置类,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示:

  1. import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Configuration;
  5. import javax.annotation.PostConstruct;
  6. @Configuration
  7. @Slf4j
  8. public class MqttProviderConfig {
  9. @Value("${spring.mqtt.username}")
  10. private String username;
  11. @Value("${spring.mqtt.password}")
  12. private String password;
  13. @Value("${spring.mqtt.url}")
  14. private String hostUrl;
  15. @Value("${spring.mqtt.client.id}")
  16. private String clientId;
  17. @Value("${spring.mqtt.default.topic}")
  18. private String defaultTopic;
  19. /**
  20. * 客户端对象
  21. */
  22. private MqttClient client;
  23. /**
  24. * 在bean初始化后连接到服务器
  25. */
  26. @PostConstruct
  27. public void init(){
  28. connect();
  29. }
  30. /**
  31. * 客户端连接服务端
  32. */
  33. public void connect(){
  34. try{
  35. //创建MQTT客户端对象
  36. client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
  37. //连接设置
  38. MqttConnectOptions options = new MqttConnectOptions();
  39. //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
  40. //设置为true表示每次连接服务器都是以新的身份
  41. options.setCleanSession(true);
  42. //设置连接用户名
  43. options.setUserName(username);
  44. //设置连接密码
  45. options.setPassword(password.toCharArray());
  46. //设置超时时间,单位为秒
  47. options.setConnectionTimeout(100);
  48. //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
  49. options.setKeepAliveInterval(20);
  50. //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
  51. options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
  52. //设置回调
  53. client.setCallback(new MqttProviderCallBack());
  54. client.connect(options);
  55. } catch(MqttException e){
  56. e.printStackTrace();
  57. }
  58. }
  59. public void publish(int qos,boolean retained,String topic,String message){
  60. MqttMessage mqttMessage = new MqttMessage();
  61. mqttMessage.setQos(qos);
  62. mqttMessage.setRetained(retained);
  63. mqttMessage.setPayload(message.getBytes());
  64. //主题的目的地,用于发布/订阅信息
  65. MqttTopic mqttTopic = client.getTopic(topic);
  66. //提供一种机制来跟踪消息的传递进度
  67. //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
  68. MqttDeliveryToken token;
  69. try {
  70. //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
  71. //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
  72. token = mqttTopic.publish(mqttMessage);
  73. token.waitForCompletion();
  74. } catch (MqttException e) {
  75. e.printStackTrace();
  76. }
  77. }
  78. }
4.2.4 生产者客户端消息回调

创建MqttProviderCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示:

  1. package com.xct.mqttprovider.mqtt;
  2. import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
  3. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  4. import org.eclipse.paho.client.mqttv3.MqttCallback;
  5. import org.eclipse.paho.client.mqttv3.MqttMessage;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.stereotype.Component;
  9. @Configuration
  10. public class MqttProviderCallBack implements MqttCallback {
  11. @Value("${spring.mqtt.client.id}")
  12. private String clientId;
  13. /**
  14. * 与服务器断开连接的回调
  15. */
  16. @Override
  17. public void connectionLost(Throwable throwable) {
  18. System.out.println(clientId + "与服务器断开连接");
  19. }
  20. /**
  21. * 消息到达的回调
  22. */
  23. @Override
  24. public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
  25. }
  26. /**
  27. * 消息发布成功的回调
  28. */
  29. @Override
  30. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  31. IMqttAsyncClient client = iMqttDeliveryToken.getClient();
  32. System.out.println(client.getClientId() + "发布消息成功!");
  33. }
  34. }

4.2.5 创建Controller控制器实现消息发布功能

创建SendController控制器类,实现消息的发送功能,示例代码如下所示:

  1. @Controllerpublic
  2. public class SendController {
  3. @Autowired
  4. private MqttProviderConfig providerClient;
  5. @RequestMapping("/sendMessage")
  6. @ResponseBody
  7. public String sendMessage(int qos,boolean retained,String topic,String message){
  8. try {
  9. providerClient.publish(qos, retained, topic, message);
  10. return "发送成功";
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. return "发送失败";
  14. }
  15. }
  16. }

4.3 实现消费者

前面完成了生成者消息发布的模块,接下来修改消费者模块spring-boot-starter-mqtt-consumer实现消息订阅、处理的功能。

4.3.1 导入依赖包

修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.integration</groupId>
  17. <artifactId>spring-integration-mqtt</artifactId>
  18. <version>5.3.2.RELEASE</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.projectlombok</groupId>
  22. <artifactId>lombok</artifactId>
  23. <version>1.18.4</version>
  24. </dependency>
  25. </dependencies>
4.3.2 修改配置文件

修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示:

  1. spring:
  2. application:
  3. name: consumer
  4. #MQTT配置信息
  5. mqtt:
  6. #MQTT服务端地址,端口默认为11883,如果有多个,用逗号隔开
  7. url: tcp://127.0.0.1:11883
  8. #用户名
  9. username: admin
  10. #密码
  11. password: public
  12. #客户端id(不能重复)
  13. client:
  14. id: consumer-id
  15. #MQTT默认的消息推送主题,实际可在调用接口时指定
  16. default:
  17. topic: topic
  18. server:
  19. port: 8085
4.3.3 消费者客户端配置

创建消费者客户端配置类MqttConsumerConfig,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示:

  1. import javax.annotation.PostConstruct;
  2. import org.eclipse.paho.client.mqttv3.MqttClient;
  3. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  4. import org.eclipse.paho.client.mqttv3.MqttException;
  5. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class MqttConsumerConfig {
  10. @Value("${spring.mqtt.username}")
  11. private String username;
  12. @Value("${spring.mqtt.password}")
  13. private String password;
  14. @Value("${spring.mqtt.url}")
  15. private String hostUrl;
  16. @Value("${spring.mqtt.client.id}")
  17. private String clientId;
  18. @Value("${spring.mqtt.default.topic}")
  19. private String defaultTopic;
  20. /**
  21. * 客户端对象
  22. */
  23. private MqttClient client;
  24. /**
  25. * 在bean初始化后连接到服务器
  26. */
  27. @PostConstruct
  28. public void init(){
  29. connect();
  30. }
  31. /**
  32. * 客户端连接服务端
  33. */
  34. public void connect(){
  35. try {
  36. //创建MQTT客户端对象
  37. client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
  38. //连接设置
  39. MqttConnectOptions options = new MqttConnectOptions();
  40. //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
  41. //设置为true表示每次连接到服务端都是以新的身份
  42. options.setCleanSession(true);
  43. //设置连接用户名
  44. options.setUserName(username);
  45. //设置连接密码
  46. options.setPassword(password.toCharArray());
  47. //设置超时时间,单位为秒
  48. options.setConnectionTimeout(100);
  49. //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
  50. options.setKeepAliveInterval(20);
  51. //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
  52. options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
  53. //设置回调
  54. client.setCallback(new MqttConsumerCallBack());
  55. client.connect(options);
  56. //订阅主题
  57. //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
  58. int[] qos = {1,1};
  59. //主题
  60. String[] topics = {"topic1","topic2"};
  61. //订阅主题
  62. client.subscribe(topics,qos);
  63. } catch (MqttException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. /**
  68. * 断开连接
  69. */
  70. public void disConnect(){
  71. try {
  72. client.disconnect();
  73. } catch (MqttException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. /**
  78. * 订阅主题
  79. */
  80. public void subscribe(String topic,int qos){
  81. try {
  82. client.subscribe(topic,qos);
  83. } catch (MqttException e) {
  84. e.printStackTrace();
  85. }
  86. }
  87. }
4.3.4 消费者客户端消息回调

创建MqttConsumerCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示:

  1. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  2. import org.eclipse.paho.client.mqttv3.MqttCallback;
  3. import org.eclipse.paho.client.mqttv3.MqttMessage;
  4. public class MqttConsumerCallBack implements MqttCallback{
  5. /**
  6. * 客户端断开连接的回调
  7. */
  8. @Override
  9. public void connectionLost(Throwable throwable) {
  10. System.out.println("与服务器断开连接,可重连");
  11. }
  12. /**
  13. * 消息到达的回调
  14. */
  15. @Override
  16. public void messageArrived(String topic, MqttMessage message) throws Exception {
  17. System.out.println(String.format("接收消息主题 : %s",topic));
  18. System.out.println(String.format("接收消息Qos : %d",message.getQos()));
  19. System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
  20. System.out.println(String.format("接收消息retained : %b",message.isRetained()));
  21. }
  22. /**
  23. * 消息发布成功的回调
  24. */
  25. @Override
  26. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  27. System.out.println(String.format("接收消息成功"));
  28. }
  29. }
4.3.5 创建Controller控制器,实现MQTT连接的建立和断开

接下来,创建Controller控制器MqttController,并实现MQTT连接的建立和断开等方法。示例代码如下所示:

  1. import com.weiz.mqtt.config.MqttConsumerConfig;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.ResponseBody;
  7. @Controller
  8. public class MqttController {
  9. @Autowired
  10. private MqttConsumerConfig client;
  11. @Value("${spring.mqtt.client.id}")
  12. private String clientId;
  13. @RequestMapping("/connect")
  14. @ResponseBody
  15. public String connect(){
  16. client.connect();
  17. return clientId + "连接到服务器";
  18. }
  19. @RequestMapping("/disConnect")
  20. @ResponseBody
  21. public String disConnect(){
  22. client.disConnect();
  23. return clientId + "与服务器断开连接";
  24. }
  25. }

4.4 测试验证

首先,分别启动生产者spring-boot-starter-mqtt-provider 和消费者spring-boot-starter-mqtt-consumer两个项目,打开浏览器,输入地址http://localhost:18083/,在EMQX管理界面可以看到连接上来的两个客户端。如下图所示:

图片

接下来,调用生产者的消息发布接口验证消息发布是否成功。使用Pomstman调用消息发送接口:http://localhost:8080/sendMessage ,如下图所示:

图片

通过上图可以发现,生产者模块已经把消息发送成功。接下来查看消费者模块,验证消息是否处理成功。如下图所示:

图片

通过日志输出可以发现,消费者已经成功接收到生产者发送的消息,说明我们成功实现在Spring Boot项目中整合MQTT实现了消息的发布和订阅的功能。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/715931
推荐阅读
相关标签
  

闽ICP备14008679号