消息队列遥测传输 (MQTT) , 是一种常用的轻量级 "发布-订阅"消息协议 , 非常适合通过互联网连接物联网(LOT) 或者机器对机器 (M2M) 设备与应用.
MQTT支持传输控制协议/互联网协议 (TCP/IP)协议,作为其底层传输协议.
官网下载地址 : https://www.emqx.io/docs/zh/v4.3/faq/use-guide.html
[root@lep ~]# docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:5.3.1
[root@lep ~]#
第一次进入 ,需要修改密码
控制台连接端口号 18083
客户端连接端口号 1883
<!-- 引入 mqtt 相关依赖-->
# MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
# 用户名
# 密码
package com.example.springmaven.controller.conf; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; /** * @Date 2023/11/21 18:31 */ @Configuration public class MqttConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.application.name}") private String applicationName; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init() { this.connect(); } /** * 断开连接 */ @PreDestroy public void disConnect() { try { client.disconnect(); client.close(); } catch (MqttException e) { e.printStackTrace(); } } /** * 客户端连接服务端 */ public void connect() { try { // 创建MQTT客户端对象 client = new MqttClient(hostUrl, applicationName, new MemoryPersistence()); // 连接设置 MqttConnectOptions options = new MqttConnectOptions(); // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 // 设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(true); // 设置连接用户名 options.setUserName(username); // 设置连接密码 options.setPassword(password.toCharArray()); // 设置超时时间,单位为秒 options.setConnectionTimeout(100); // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic", (applicationName + "与服务器断开连接").getBytes(), 0, false); // 设置回调 client.setCallback(new MqttCallBack()); // 连接 client.connect(options); // 订阅主题 (接受此主题的消息) this.subscribe("warn_topic", 2); this.subscribe("warn_topic2", 2); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息 * * @param topic * @param message */ public boolean publish(String topic, String message) { MqttMessage mqttMessage = new MqttMessage(); // 0:最多交付一次,可能丢失消息 // 1:至少交付一次,可能消息重复 // 2:只交付一次,既不丢失也不重复 mqttMessage.setQos(2); // 是否保留最后一条消息 mqttMessage.setRetained(false); // 消息内容 mqttMessage.setPayload(message.getBytes()); // 主题的目的地,用于发布/订阅信息 MqttTopic mqttTopic = client.getTopic(topic); // 提供一种机制来跟踪消息的传递进度 // 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 MqttDeliveryToken token; try { // 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 // 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); return true; } catch (MqttException e) { e.printStackTrace(); } return false; } /** * 订阅主题 */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
package com.example.springmaven.controller.conf; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.context.annotation.Configuration; /** * @Date 2023/11/21 18:32 */ @Configuration public class MqttCallBack implements MqttCallback { /** * 与服务器断开的回调 */ @Override public void connectionLost(Throwable cause) { System.out.println("与服务器断开连接"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) { System.out.println(String.format("接收消息主题 : %s", topic)); System.out.println(String.format("接收消息Qos : %d", message.getQos())); System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b", message.isRetained())); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId() + "发布消息成功!"); } }
@RestController @RequestMapping(value = "/test") @Slf4j public class TestController { @Autowired private MqttConfig mqttConfig; @GetMapping("/sendMessage") public String sendMessage(@RequestParam("topic") String topic, @RequestParam("message") String message) { boolean publish = mqttConfig.publish(topic, message); if (publish) { return "ok"; } return "no"; } }
两个订阅方 , 两个主题方
test发布消息成功! 接收消息主题 : warn_topic 接收消息Qos : 2 接收消息内容 : 我是告警消息 接收消息retained : false
