赞
踩
最近在做MQTT对接,然后发送消息,然后参考网上的实战文章进行了一下整理。
文章主要参考自(https://www.codetd.com/article/13550340),然后自己做了些许更改。
SpringBoot:2.2.2.RELEASE
MQTT平台:EMQX4.4.1(Docker运行)
虚拟机服务器:Centos7(192.168.56.102 )
发送端:cloud-mqtt-send8001
接收端:cloud-mqtt-accept8002
导入POM依赖:
- <!-- mqtt -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-integration</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-mqtt</artifactId>
- </dependency>
- <!--配置文件报错问题-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.22</version>
- <scope>provided</scope>
- </dependency>

设置YML
- server:
- port: 8001
-
- spring:
- application:
- name: mqtt-send
-
- #mqtt属性配置
- mqtt:
- hostUrl: tcp://192.168.56.102:1883
- username: admin
- password: public
- clientid: mqtt_send_client
- cleanSession: true
- reconnect: true
- #连接超时
- timeout: 1000
- #设置会话心跳时间
- keepalive: 100
- defaultTopic: client:report:1
- isOpen: true
- qos: 1

新建配置类:MqttProperties.java
- @Data
- @Component
- @ConfigurationProperties(prefix = "mqtt")
- public class MqttProperties {
-
- /**
- * 用户名
- */
- private String username;
-
- /**
- * 密码
- */
- private String password;
-
- /**
- * 连接地址
- */
- private String hostUrl;
-
- /**
- * 客户端Id,同一台服务器下,不允许出现重复的客户端id
- */
- private String clientId;
-
- /**
- * 默认连接主题
- */
- private String defaultTopic;
-
- /**
- * 超时时间
- */
- private int timeout;
-
- /**
- * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
- * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- */
- private int keepAlive;
-
- /**
- * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
- * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
- */
- private Boolean cleanSession;
-
- /**
- * 是否断线重连
- */
- private Boolean reconnect;
-
- /**
- * 启动的时候是否关闭mqtt
- */
- private Boolean isOpen;
-
- /**
- * 连接方式
- */
- private Integer qos;
- }

添加MQTT发送客户端:MqttSendClient:
- @Slf4j
- @Component
- public class MqttSendClient {
-
-
- @Autowired
- private MqttSendCallBack mqttSendCallBack;
-
- @Autowired
- private MqttProperties mqttProperties;
-
- private static MqttClient mqttClient;
-
- private static MqttClient getClient() {
-
-
- return mqttClient;
- }
-
- private static void setClient(MqttClient client) {
-
-
- MqttSendClient.mqttClient = client;
- }
-
- /**
- * 客户端连接
- * @return
- */
- public void connect(){
- MqttClient client = null;
-
- try {
-
-
- //String uuid = UUID.randomUUID().toString().replaceAll("-",""); //设置每一个客户端的id
- client = new MqttClient(mqttProperties.getHostUrl(),mqttProperties.getClientId() , new MemoryPersistence());
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(mqttProperties.getUsername());
- options.setPassword(mqttProperties.getPassword().toCharArray());
- options.setConnectionTimeout(mqttProperties.getTimeout());
- options.setKeepAliveInterval(mqttProperties.getKeepAlive());
- options.setCleanSession(true);
- options.setAutomaticReconnect(false);
-
- MqttSendClient.setClient(client);
- try {
-
-
- // 设置回调
- client.setCallback(mqttSendCallBack);
- client.connect(options);
- } catch (Exception e) {
-
-
- e.printStackTrace();
- }
- } catch (Exception e) {
-
- e.printStackTrace();
- }
- }
-
-
-
-
- /**
- * 发布,默认qos为0,非持久化
- *
- * @param topic 主题名
- * @param pushMessage 消息
- */
- public void publish(String topic, String pushMessage) {
- publish(0, false, topic, pushMessage);
- }
-
- /**
- * 发布
- *
- * @param qos
- * @param retained
- * @param topic
- * @param pushMessage
- */
- public void publish(int qos, boolean retained, String topic, String pushMessage) {
- MqttMessage message = new MqttMessage();
- message.setQos(qos);
- message.setRetained(retained);
- message.setPayload(pushMessage.getBytes());
- MqttTopic mTopic = MqttSendClient.getClient().getTopic(topic);
- if (null == mTopic) {
- log.error("主题不存在:{}",mTopic);
- }
- try {
- mTopic.publish(message);
- log.info("消息发送成功");
- } catch (Exception e) {
- log.error("mqtt发送消息异常:",e);
- }
- }
- }

添加MQTT发送客户端回调类:MqttSendCallBack
- @Slf4j
- @Component
- public class MqttSendCallBack implements MqttCallbackExtended {
-
-
- /**
- * 链接EMQ服务器后触发
- * @param reconnect
- * @param serverURI
- */
- @Override
- public void connectComplete(boolean reconnect, String serverURI) {
- log.info("————————————————-ClientID:{}——————————————"+"链接成功");
- }
-
- /**
- * 客户端连接断开后触发
- * 这里可以做重新链接操作
- */
- @Override
- public void connectionLost(Throwable cause) {
- log.error("【MQTT-发送端】链接断开!");
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- log.info("【MQTT-发送端】接收消息主题 : " + topic);
- log.info("【MQTT-发送端】接收消息Qos : " + message.getQos());
- log.info("【MQTT-发送端】接收消息内容 : " + new String(message.getPayload()));
- }
-
- /**
- * 发送消息回调
- * @param token
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
-
- String[] topics = token.getTopics();
-
- if (topics!=null && topics.length>0){
- for (String topic : topics) {
-
-
- log.info("【MQTT-发送端】向主题:" + topic + "发送消息成功!");
- }
- }
-
- try {
-
-
- MqttMessage message = token.getMessage();
- byte[] payload = message.getPayload();
- String s = new String(payload, "UTF-8");
- log.info("【MQTT-发送端】消息的内容是:" + s);
- } catch (MqttException e) {
-
-
- e.printStackTrace();
- } catch (UnsupportedEncodingException e) {
-
-
- e.printStackTrace();
- }
- }
- }
-

添加:MqttCondition
- public class MqttCondition implements Condition {
- @Override
- public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
-
- System.out.println("MqttCondition。。。。");
-
- //1、能获取到ioc使用的beanfactory
- ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
- //2、获取类加载器
- ClassLoader classLoader = context.getClassLoader();
- //3、获取当前环境信息
- Environment environment = context.getEnvironment();
- String isOpen = environment.getProperty("mqtt.isOpen");
- return Boolean.valueOf(isOpen);
- }
- }

添加MQTT配置类:MqttConfig
- @Configuration
- public class MqttConfig {
-
- @Autowired
- private MqttSendClient mqttSendClient;
-
- @Conditional(MqttCondition.class)
- @Bean
- public MqttSendClient getMqttSendClient(){
- mqttSendClient.connect();
- return mqttSendClient;
- }
- }
-
主启动类
- @SpringBootApplication
- public class MqttSendApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(MqttSendApplication.class, args);
- }
- }
启动项目,链接MQTT服务器成功。
项目整体代码结构如下:
导入POM
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!--MQTT客户端工具-->
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.2</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
- </dependency>
- </dependencies>

设置YML
- server:
- port: 8002
-
- spring:
- application:
- name: mqtt-accept
-
-
- mqtt:
- hostUrl: tcp://192.168.56.102:1883
- username: admin
- password: public
- clientid: mqtt_accept_client
- cleanSession: true
- reconnect: true
- #连接超时
- timeout: 1000
- #设置会话心跳时间
- keepalive: 100
- defaultTopic: client:report:1
- isOpen: true
- qos: 1

属性配置文件:MqttProperties.java
- @Data
- @Component
- @ConfigurationProperties(prefix = "mqtt")
- public class MqttProperties {
-
- /**
- * 用户名
- */
- private String username;
-
- /**
- * 密码
- */
- private String password;
-
- /**
- * 连接地址
- */
- private String hostUrl;
-
- /**
- * 客户端Id,同一台服务器下,不允许出现重复的客户端id
- */
- private String clientId;
-
- /**
- * 默认连接主题
- */
- private String defaultTopic;
-
- /**
- * 超时时间
- */
- private int timeout;
-
- /**
- * 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
- * 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- */
- private int keepAlive;
-
- /**
- * 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
- * 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
- */
- private Boolean cleanSession;
-
- /**
- * 是否断线重连
- */
- private Boolean reconnect;
-
- /**
- * 启动的时候是否关闭mqtt
- */
- private Boolean isOpen;
-
- /**
- * 连接方式
- */
- private Integer qos;
- }
-

添加MQTT接收客户端:MqttAcceptClient
- @Slf4j
- @Component
- public class MqttAcceptClient {
-
-
- @Autowired
- private MqttAcceptCallback mqttAcceptCallback;
-
- @Autowired
- private MqttProperties mqttProperties;
-
-
- private static MqttClient mqttClient;
-
- public static MqttClient getMqttClient() {
- return mqttClient;
- }
-
- public static void setMqttClient(MqttClient mqttClient) {
- MqttAcceptClient.mqttClient = mqttClient;
- }
-
-
- /**
- * 客户端连接
- */
- public void connect() {
-
-
- MqttClient client;
- try {
-
-
- client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(mqttProperties.getUsername());
- options.setPassword(mqttProperties.getPassword().toCharArray());
- options.setConnectionTimeout(mqttProperties.getTimeout());
- options.setKeepAliveInterval(mqttProperties.getKeepAlive());
- options.setAutomaticReconnect(mqttProperties.getReconnect());
- options.setCleanSession(mqttProperties.getCleanSession());
- MqttAcceptClient.setMqttClient(client);
- try {
-
-
- // 设置回调
- client.setCallback(mqttAcceptCallback);
- client.connect(options);
- } catch (Exception e) {
-
-
- e.printStackTrace();
- }
- } catch (Exception e) {
-
-
- e.printStackTrace();
- }
- }
-
- /**
- * 重新连接
- */
- public void reconnection() {
-
-
- try {
- mqttClient.connect();
- } catch (MqttException e) {
-
-
- e.printStackTrace();
- }
- }
-
- /**
- * 订阅某个主题
- *
- * @param topic 主题
- * @param qos 连接方式
- */
- public void subscribe(String topic, int qos) {
-
-
- log.info("==============开始订阅主题==============" + topic);
- try {
-
-
- mqttClient.subscribe(topic, qos);
- } catch (MqttException e) {
-
-
- e.printStackTrace();
- }
- }
-
- /**
- * 取消订阅某个主题
- *
- * @param topic
- */
- public void unsubscribe(String topic) {
-
-
- log.info("==============开始取消订阅主题==============" + topic);
- try {
-
-
- mqttClient.unsubscribe(topic);
- } catch (MqttException e) {
-
-
- e.printStackTrace();
- }
- }
- }
-

添加mqtt接受服务的回调类:MqttAcceptCallback
- @Slf4j
- @Component
- public class MqttAcceptCallback implements MqttCallbackExtended {
- @Autowired
- private MqttAcceptClient mqttAcceptClient;
-
- /**
- * 客户端断开后触发
- *
- * @param throwable
- */
- @Override
- public void connectionLost(Throwable throwable) {
-
-
- log.info("【MQTT-消费端】连接断开,可以做重连");
- if (MqttAcceptClient.getMqttClient() == null || !MqttAcceptClient.getMqttClient().isConnected()) {
-
-
- log.info("【MQTT-消费端】emqx重新连接....................................................");
- mqttAcceptClient.reconnection();
- }
- }
-
- /**
- * 客户端收到消息触发
- *
- * @param topic 主题
- * @param mqttMessage 消息
- */
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-
-
- log.info("【MQTT-消费端】接收消息主题 : " + topic);
- log.info("【MQTT-消费端】接收消息Qos : " + mqttMessage.getQos());
- log.info("【MQTT-消费端】接收消息内容 : " + new String(mqttMessage.getPayload()));
- // int i = 1/0;
- }
-
- /**
- * 发布消息成功
- *
- * @param token token
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
-
-
- String[] topics = token.getTopics();
- for (String topic : topics) {
-
-
- log.info("【MQTT-消费端】向主题:" + topic + "发送消息成功!");
- }
- try {
-
-
- MqttMessage message = token.getMessage();
- byte[] payload = message.getPayload();
- String s = new String(payload, "UTF-8");
- log.info("【MQTT-消费端】消息的内容是:" + s);
- } catch (MqttException e) {
-
-
- e.printStackTrace();
- } catch (UnsupportedEncodingException e) {
-
-
- e.printStackTrace();
- }
- }
-
- /**
- * 连接emq服务器后触发
- *
- * @param b
- * @param s
- */
- @Override
- public void connectComplete(boolean b, String s) {
-
- System.out.println("s: " + s);
-
- log.info("--------------------【MQTT-消费端】连接成功!--------------------");
- // 以/#结尾表示订阅所有以test开头的主题
- // 订阅所有机构主题
- mqttAcceptClient.subscribe("test_queue", 0);
- }
- }

MqttCondition
- public class MqttCondition implements Condition {
- @Override
- public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
-
- System.out.println("MqttCondition。。。。");
-
- //1、能获取到ioc使用的beanfactory
- ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
- //2、获取类加载器
- ClassLoader classLoader = context.getClassLoader();
- //3、获取当前环境信息
- Environment environment = context.getEnvironment();
- String isOpen = environment.getProperty("mqtt.isOpen");
- return Boolean.valueOf(isOpen);
- }
- }

MQTT启动配置类:MqttConfig.java
- @Configuration
- public class MqttConfig {
-
- @Autowired
- private MqttAcceptClient mqttAcceptClient;
-
- @Conditional(MqttCondition.class)
- @Bean
- public MqttAcceptClient getMqttAcceptClient(){
-
- mqttAcceptClient.connect();
- //mqttAcceptClient.subscribe("test_queue",0);
- return mqttAcceptClient;
- }
- }
主启动类
- package com.xlh.springcloud;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class MqttAcceptApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(MqttAcceptApplication.class,args);
- }
- }
启动项目,链接MQTT服务器,然后订阅主题:
项目整体结构如下:
4、项目测试
分别启动发送端和接收端以后,EMQX监控平台如下:
然后调用发送端测试接口:http://localhost:8001/send
发送端cloud-mqtt-send8001运行结果如下:
接收端cloud-mqtt-accept8002运行结果如下:
可以看到接收端成功的接收到消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。