当前位置:   article > 正文

【MQTT】SpringBoot整合MQTT(EMQX)_springboot emqx

springboot emqx

最近在做MQTT对接,然后发送消息,然后参考网上的实战文章进行了一下整理。

文章主要参考自(https://www.codetd.com/article/13550340),然后自己做了些许更改。

1、整合准备

SpringBoot:2.2.2.RELEASE

MQTT平台:EMQX4.4.1(Docker运行)

虚拟机服务器:Centos7(192.168.56.102 )

发送端:cloud-mqtt-send8001

接收端:cloud-mqtt-accept8002

2、发送端:cloud-mqtt-send8002

导入POM依赖:

  1. <!-- mqtt -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-integration</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.integration</groupId>
  8. <artifactId>spring-integration-stream</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.integration</groupId>
  12. <artifactId>spring-integration-mqtt</artifactId>
  13. </dependency>
  14. <!--配置文件报错问题-->
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-configuration-processor</artifactId>
  18. <optional>true</optional>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.projectlombok</groupId>
  22. <artifactId>lombok</artifactId>
  23. <version>1.18.22</version>
  24. <scope>provided</scope>
  25. </dependency>

设置YML

  1. server:
  2. port: 8001
  3. spring:
  4. application:
  5. name: mqtt-send
  6. #mqtt属性配置
  7. mqtt:
  8. hostUrl: tcp://192.168.56.102:1883
  9. username: admin
  10. password: public
  11. clientid: mqtt_send_client
  12. cleanSession: true
  13. reconnect: true
  14. #连接超时
  15. timeout: 1000
  16. #设置会话心跳时间
  17. keepalive: 100
  18. defaultTopic: client:report:1
  19. isOpen: true
  20. qos: 1

新建配置类:MqttProperties.java

  1. @Data
  2. @Component
  3. @ConfigurationProperties(prefix = "mqtt")
  4. public class MqttProperties {
  5. /**
  6. * 用户名
  7. */
  8. private String username;
  9. /**
  10. * 密码
  11. */
  12. private String password;
  13. /**
  14. * 连接地址
  15. */
  16. private String hostUrl;
  17. /**
  18. * 客户端Id,同一台服务器下,不允许出现重复的客户端id
  19. */
  20. private String clientId;
  21. /**
  22. * 默认连接主题
  23. */
  24. private String defaultTopic;
  25. /**
  26. * 超时时间
  27. */
  28. private int timeout;
  29. /**
  30. * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
  31. * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  32. */
  33. private int keepAlive;
  34. /**
  35. * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
  36. * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  37. */
  38. private Boolean cleanSession;
  39. /**
  40. * 是否断线重连
  41. */
  42. private Boolean reconnect;
  43. /**
  44. * 启动的时候是否关闭mqtt
  45. */
  46. private Boolean isOpen;
  47. /**
  48. * 连接方式
  49. */
  50. private Integer qos;
  51. }

添加MQTT发送客户端:MqttSendClient:

  1. @Slf4j
  2. @Component
  3. public class MqttSendClient {
  4. @Autowired
  5. private MqttSendCallBack mqttSendCallBack;
  6. @Autowired
  7. private MqttProperties mqttProperties;
  8. private static MqttClient mqttClient;
  9. private static MqttClient getClient() {
  10. return mqttClient;
  11. }
  12. private static void setClient(MqttClient client) {
  13. MqttSendClient.mqttClient = client;
  14. }
  15. /**
  16. * 客户端连接
  17. * @return
  18. */
  19. public void connect(){
  20. MqttClient client = null;
  21. try {
  22. //String uuid = UUID.randomUUID().toString().replaceAll("-",""); //设置每一个客户端的id
  23. client = new MqttClient(mqttProperties.getHostUrl(),mqttProperties.getClientId() , new MemoryPersistence());
  24. MqttConnectOptions options = new MqttConnectOptions();
  25. options.setUserName(mqttProperties.getUsername());
  26. options.setPassword(mqttProperties.getPassword().toCharArray());
  27. options.setConnectionTimeout(mqttProperties.getTimeout());
  28. options.setKeepAliveInterval(mqttProperties.getKeepAlive());
  29. options.setCleanSession(true);
  30. options.setAutomaticReconnect(false);
  31. MqttSendClient.setClient(client);
  32. try {
  33. // 设置回调
  34. client.setCallback(mqttSendCallBack);
  35. client.connect(options);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. /**
  44. * 发布,默认qos为0,非持久化
  45. *
  46. * @param topic 主题名
  47. * @param pushMessage 消息
  48. */
  49. public void publish(String topic, String pushMessage) {
  50. publish(0, false, topic, pushMessage);
  51. }
  52. /**
  53. * 发布
  54. *
  55. * @param qos
  56. * @param retained
  57. * @param topic
  58. * @param pushMessage
  59. */
  60. public void publish(int qos, boolean retained, String topic, String pushMessage) {
  61. MqttMessage message = new MqttMessage();
  62. message.setQos(qos);
  63. message.setRetained(retained);
  64. message.setPayload(pushMessage.getBytes());
  65. MqttTopic mTopic = MqttSendClient.getClient().getTopic(topic);
  66. if (null == mTopic) {
  67. log.error("主题不存在:{}",mTopic);
  68. }
  69. try {
  70. mTopic.publish(message);
  71. log.info("消息发送成功");
  72. } catch (Exception e) {
  73. log.error("mqtt发送消息异常:",e);
  74. }
  75. }
  76. }

添加MQTT发送客户端回调类:MqttSendCallBack

  1. @Slf4j
  2. @Component
  3. public class MqttSendCallBack implements MqttCallbackExtended {
  4. /**
  5. * 链接EMQ服务器后触发
  6. * @param reconnect
  7. * @param serverURI
  8. */
  9. @Override
  10. public void connectComplete(boolean reconnect, String serverURI) {
  11. log.info("————————————————-ClientID:{}——————————————"+"链接成功");
  12. }
  13. /**
  14. * 客户端连接断开后触发
  15. * 这里可以做重新链接操作
  16. */
  17. @Override
  18. public void connectionLost(Throwable cause) {
  19. log.error("【MQTT-发送端】链接断开!");
  20. }
  21. @Override
  22. public void messageArrived(String topic, MqttMessage message) throws Exception {
  23. log.info("【MQTT-发送端】接收消息主题 : " + topic);
  24. log.info("【MQTT-发送端】接收消息Qos : " + message.getQos());
  25. log.info("【MQTT-发送端】接收消息内容 : " + new String(message.getPayload()));
  26. }
  27. /**
  28. * 发送消息回调
  29. * @param token
  30. */
  31. @Override
  32. public void deliveryComplete(IMqttDeliveryToken token) {
  33. String[] topics = token.getTopics();
  34. if (topics!=null && topics.length>0){
  35. for (String topic : topics) {
  36. log.info("【MQTT-发送端】向主题:" + topic + "发送消息成功!");
  37. }
  38. }
  39. try {
  40. MqttMessage message = token.getMessage();
  41. byte[] payload = message.getPayload();
  42. String s = new String(payload, "UTF-8");
  43. log.info("【MQTT-发送端】消息的内容是:" + s);
  44. } catch (MqttException e) {
  45. e.printStackTrace();
  46. } catch (UnsupportedEncodingException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }

添加:MqttCondition

  1. public class MqttCondition implements Condition {
  2. @Override
  3. public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
  4. System.out.println("MqttCondition。。。。");
  5. //1、能获取到ioc使用的beanfactory
  6. ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
  7. //2、获取类加载器
  8. ClassLoader classLoader = context.getClassLoader();
  9. //3、获取当前环境信息
  10. Environment environment = context.getEnvironment();
  11. String isOpen = environment.getProperty("mqtt.isOpen");
  12. return Boolean.valueOf(isOpen);
  13. }
  14. }

添加MQTT配置类:MqttConfig

  1. @Configuration
  2. public class MqttConfig {
  3. @Autowired
  4. private MqttSendClient mqttSendClient;
  5. @Conditional(MqttCondition.class)
  6. @Bean
  7. public MqttSendClient getMqttSendClient(){
  8. mqttSendClient.connect();
  9. return mqttSendClient;
  10. }
  11. }

主启动类

  1. @SpringBootApplication
  2. public class MqttSendApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(MqttSendApplication.class, args);
  5. }
  6. }

启动项目,链接MQTT服务器成功。

项目整体代码结构如下:

 

3、接收端:cloud-mqtt-accept8002

导入POM

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-actuator</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-devtools</artifactId>
  13. <scope>runtime</scope>
  14. <optional>true</optional>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.projectlombok</groupId>
  18. <artifactId>lombok</artifactId>
  19. <optional>true</optional>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-test</artifactId>
  24. <scope>test</scope>
  25. </dependency>
  26. <!--MQTT客户端工具-->
  27. <dependency>
  28. <groupId>org.eclipse.paho</groupId>
  29. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  30. <version>1.2.2</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-configuration-processor</artifactId>
  35. <optional>true</optional>
  36. </dependency>
  37. </dependencies>

设置YML

  1. server:
  2. port: 8002
  3. spring:
  4. application:
  5. name: mqtt-accept
  6. mqtt:
  7. hostUrl: tcp://192.168.56.102:1883
  8. username: admin
  9. password: public
  10. clientid: mqtt_accept_client
  11. cleanSession: true
  12. reconnect: true
  13. #连接超时
  14. timeout: 1000
  15. #设置会话心跳时间
  16. keepalive: 100
  17. defaultTopic: client:report:1
  18. isOpen: true
  19. qos: 1

属性配置文件:MqttProperties.java

  1. @Data
  2. @Component
  3. @ConfigurationProperties(prefix = "mqtt")
  4. public class MqttProperties {
  5. /**
  6. * 用户名
  7. */
  8. private String username;
  9. /**
  10. * 密码
  11. */
  12. private String password;
  13. /**
  14. * 连接地址
  15. */
  16. private String hostUrl;
  17. /**
  18. * 客户端Id,同一台服务器下,不允许出现重复的客户端id
  19. */
  20. private String clientId;
  21. /**
  22. * 默认连接主题
  23. */
  24. private String defaultTopic;
  25. /**
  26. * 超时时间
  27. */
  28. private int timeout;
  29. /**
  30. * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
  31. * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  32. */
  33. private int keepAlive;
  34. /**
  35. * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
  36. * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  37. */
  38. private Boolean cleanSession;
  39. /**
  40. * 是否断线重连
  41. */
  42. private Boolean reconnect;
  43. /**
  44. * 启动的时候是否关闭mqtt
  45. */
  46. private Boolean isOpen;
  47. /**
  48. * 连接方式
  49. */
  50. private Integer qos;
  51. }

添加MQTT接收客户端:MqttAcceptClient

  1. @Slf4j
  2. @Component
  3. public class MqttAcceptClient {
  4. @Autowired
  5. private MqttAcceptCallback mqttAcceptCallback;
  6. @Autowired
  7. private MqttProperties mqttProperties;
  8. private static MqttClient mqttClient;
  9. public static MqttClient getMqttClient() {
  10. return mqttClient;
  11. }
  12. public static void setMqttClient(MqttClient mqttClient) {
  13. MqttAcceptClient.mqttClient = mqttClient;
  14. }
  15. /**
  16. * 客户端连接
  17. */
  18. public void connect() {
  19. MqttClient client;
  20. try {
  21. client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
  22. MqttConnectOptions options = new MqttConnectOptions();
  23. options.setUserName(mqttProperties.getUsername());
  24. options.setPassword(mqttProperties.getPassword().toCharArray());
  25. options.setConnectionTimeout(mqttProperties.getTimeout());
  26. options.setKeepAliveInterval(mqttProperties.getKeepAlive());
  27. options.setAutomaticReconnect(mqttProperties.getReconnect());
  28. options.setCleanSession(mqttProperties.getCleanSession());
  29. MqttAcceptClient.setMqttClient(client);
  30. try {
  31. // 设置回调
  32. client.setCallback(mqttAcceptCallback);
  33. client.connect(options);
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. /**
  42. * 重新连接
  43. */
  44. public void reconnection() {
  45. try {
  46. mqttClient.connect();
  47. } catch (MqttException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. /**
  52. * 订阅某个主题
  53. *
  54. * @param topic 主题
  55. * @param qos 连接方式
  56. */
  57. public void subscribe(String topic, int qos) {
  58. log.info("==============开始订阅主题==============" + topic);
  59. try {
  60. mqttClient.subscribe(topic, qos);
  61. } catch (MqttException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. /**
  66. * 取消订阅某个主题
  67. *
  68. * @param topic
  69. */
  70. public void unsubscribe(String topic) {
  71. log.info("==============开始取消订阅主题==============" + topic);
  72. try {
  73. mqttClient.unsubscribe(topic);
  74. } catch (MqttException e) {
  75. e.printStackTrace();
  76. }
  77. }
  78. }

添加mqtt接受服务的回调类:MqttAcceptCallback

  1. @Slf4j
  2. @Component
  3. public class MqttAcceptCallback implements MqttCallbackExtended {
  4. @Autowired
  5. private MqttAcceptClient mqttAcceptClient;
  6. /**
  7. * 客户端断开后触发
  8. *
  9. * @param throwable
  10. */
  11. @Override
  12. public void connectionLost(Throwable throwable) {
  13. log.info("【MQTT-消费端】连接断开,可以做重连");
  14. if (MqttAcceptClient.getMqttClient() == null || !MqttAcceptClient.getMqttClient().isConnected()) {
  15. log.info("【MQTT-消费端】emqx重新连接....................................................");
  16. mqttAcceptClient.reconnection();
  17. }
  18. }
  19. /**
  20. * 客户端收到消息触发
  21. *
  22. * @param topic 主题
  23. * @param mqttMessage 消息
  24. */
  25. @Override
  26. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  27. log.info("【MQTT-消费端】接收消息主题 : " + topic);
  28. log.info("【MQTT-消费端】接收消息Qos : " + mqttMessage.getQos());
  29. log.info("【MQTT-消费端】接收消息内容 : " + new String(mqttMessage.getPayload()));
  30. // int i = 1/0;
  31. }
  32. /**
  33. * 发布消息成功
  34. *
  35. * @param token token
  36. */
  37. @Override
  38. public void deliveryComplete(IMqttDeliveryToken token) {
  39. String[] topics = token.getTopics();
  40. for (String topic : topics) {
  41. log.info("【MQTT-消费端】向主题:" + topic + "发送消息成功!");
  42. }
  43. try {
  44. MqttMessage message = token.getMessage();
  45. byte[] payload = message.getPayload();
  46. String s = new String(payload, "UTF-8");
  47. log.info("【MQTT-消费端】消息的内容是:" + s);
  48. } catch (MqttException e) {
  49. e.printStackTrace();
  50. } catch (UnsupportedEncodingException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. /**
  55. * 连接emq服务器后触发
  56. *
  57. * @param b
  58. * @param s
  59. */
  60. @Override
  61. public void connectComplete(boolean b, String s) {
  62. System.out.println("s: " + s);
  63. log.info("--------------------【MQTT-消费端】连接成功!--------------------");
  64. ///#结尾表示订阅所有以test开头的主题
  65. // 订阅所有机构主题
  66. mqttAcceptClient.subscribe("test_queue", 0);
  67. }
  68. }

MqttCondition

  1. public class MqttCondition implements Condition {
  2. @Override
  3. public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
  4. System.out.println("MqttCondition。。。。");
  5. //1、能获取到ioc使用的beanfactory
  6. ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
  7. //2、获取类加载器
  8. ClassLoader classLoader = context.getClassLoader();
  9. //3、获取当前环境信息
  10. Environment environment = context.getEnvironment();
  11. String isOpen = environment.getProperty("mqtt.isOpen");
  12. return Boolean.valueOf(isOpen);
  13. }
  14. }

MQTT启动配置类:MqttConfig.java

  1. @Configuration
  2. public class MqttConfig {
  3. @Autowired
  4. private MqttAcceptClient mqttAcceptClient;
  5. @Conditional(MqttCondition.class)
  6. @Bean
  7. public MqttAcceptClient getMqttAcceptClient(){
  8. mqttAcceptClient.connect();
  9. //mqttAcceptClient.subscribe("test_queue",0);
  10. return mqttAcceptClient;
  11. }
  12. }

主启动类

  1. package com.xlh.springcloud;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class MqttAcceptApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(MqttAcceptApplication.class,args);
  8. }
  9. }

启动项目,链接MQTT服务器,然后订阅主题:

 

项目整体结构如下:

 

4、项目测试

分别启动发送端和接收端以后,EMQX监控平台如下:

 

然后调用发送端测试接口:http://localhost:8001/send

发送端cloud-mqtt-send8001运行结果如下:

 

接收端cloud-mqtt-accept8002运行结果如下:

可以看到接收端成功的接收到消息

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/748088
推荐阅读
相关标签
  

闽ICP备14008679号