赞
踩
消息队列遥测传输 (MQTT) , 是一种常用的轻量级 "发布-订阅"消息协议 , 非常适合通过互联网连接物联网(LOT) 或者机器对机器 (M2M) 设备与应用.
MQTT可在低带宽或者低功耗环境中高效运行,因此是有众多远程客户端应用的理想之选,适合用于多个行业,包括消费类电子产品,汽车,运输,制造及医疗行业.
HTTP与MQTT都是用于通过互联网传输数据的网络协议,下面我们来看看二者的区别.
MQTT的许多特性让其成为物联网设备(物联网中的"物")和后端系统之间进行消息传递的理想协议,此处,我们将重点介绍以下四个特性
MQTT支持传输控制协议/互联网协议 (TCP/IP)协议,作为其底层传输协议.
虽然TCP/IP协议是最常见的协议,但并非传输MQTT消息是唯一的选择,可可使用UDP和webSocket
官网下载地址 : https://www.emqx.io/docs/zh/v4.3/faq/use-guide.html
[root@lep ~]# docker pull emqx/emqx:5.3.1
v4.0.0: Pulling from emqx/emqx
89d9c30c1d48: Pull complete
d1c907393fbf: Pull complete
4f534f3dfa46: Pull complete
c0044c0a242c: Pull complete
432bcb7ac615: Pull complete
1c89b5520019: Pull complete
e3bf682944db: Downloadin
[root@lep ~]# docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:5.3.1
f8ca59e1a21b040f1c28d4a3f09e908e67f11bf61f7fb11b8fd3576b283a61a0
[root@lep ~]#
第一次进入 ,需要修改密码
控制台连接端口号 18083
客户端连接端口号 1883
<!-- 引入 mqtt 相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
spring.application.name=test
# MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
spring.mqtt.url=tcp://127.0.0.1:1883
# 用户名
spring.mqtt.username=admin
# 密码
spring.mqtt.password=lep-88888888
package com.example.springmaven.controller.conf; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; /** * @Date 2023/11/21 18:31 */ @Configuration public class MqttConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.application.name}") private String applicationName; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init() { this.connect(); } /** * 断开连接 */ @PreDestroy public void disConnect() { try { client.disconnect(); client.close(); } catch (MqttException e) { e.printStackTrace(); } } /** * 客户端连接服务端 */ public void connect() { try { // 创建MQTT客户端对象 client = new MqttClient(hostUrl, applicationName, new MemoryPersistence()); // 连接设置 MqttConnectOptions options = new MqttConnectOptions(); // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 // 设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(true); // 设置连接用户名 options.setUserName(username); // 设置连接密码 options.setPassword(password.toCharArray()); // 设置超时时间,单位为秒 options.setConnectionTimeout(100); // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic", (applicationName + "与服务器断开连接").getBytes(), 0, false); // 设置回调 client.setCallback(new MqttCallBack()); // 连接 client.connect(options); // 订阅主题 (接受此主题的消息) this.subscribe("warn_topic", 2); this.subscribe("warn_topic2", 2); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息 * * @param topic * @param message */ public boolean publish(String topic, String message) { MqttMessage mqttMessage = new MqttMessage(); // 0:最多交付一次,可能丢失消息 // 1:至少交付一次,可能消息重复 // 2:只交付一次,既不丢失也不重复 mqttMessage.setQos(2); // 是否保留最后一条消息 mqttMessage.setRetained(false); // 消息内容 mqttMessage.setPayload(message.getBytes()); // 主题的目的地,用于发布/订阅信息 MqttTopic mqttTopic = client.getTopic(topic); // 提供一种机制来跟踪消息的传递进度 // 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 MqttDeliveryToken token; try { // 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 // 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); return true; } catch (MqttException e) { e.printStackTrace(); } return false; } /** * 订阅主题 */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
package com.example.springmaven.controller.conf; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.context.annotation.Configuration; /** * @Date 2023/11/21 18:32 */ @Configuration public class MqttCallBack implements MqttCallback { /** * 与服务器断开的回调 */ @Override public void connectionLost(Throwable cause) { System.out.println("与服务器断开连接"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) { System.out.println(String.format("接收消息主题 : %s", topic)); System.out.println(String.format("接收消息Qos : %d", message.getQos())); System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b", message.isRetained())); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId() + "发布消息成功!"); } }
@RestController @RequestMapping(value = "/test") @Slf4j public class TestController { @Autowired private MqttConfig mqttConfig; @GetMapping("/sendMessage") public String sendMessage(@RequestParam("topic") String topic, @RequestParam("message") String message) { boolean publish = mqttConfig.publish(topic, message); if (publish) { return "ok"; } return "no"; } }
两个订阅方 , 两个主题方
查看监控
查看控制台输出
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.6) 2023-11-22 10:37:29.324 INFO 16044 --- [ main] c.e.springmaven.SpringMavenApplication : Starting SpringMavenApplication using Java 11.0.1 on lep with PID 16044 (D:\Corporation\CZGJ\spring-maven\target\classes started by lep in D:\Corporation\CZGJ\spring-maven) 2023-11-22 10:37:29.324 INFO 16044 --- [ main] c.e.springmaven.SpringMavenApplication : No active profile set, falling back to default profiles: default 2023-11-22 10:37:29.853 INFO 16044 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2023-11-22 10:37:29.861 INFO 16044 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2023-11-22 10:37:29.892 INFO 16044 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2023-11-22 10:37:29.908 INFO 16044 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2023-11-22 10:37:29.908 INFO 16044 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2023-11-22 10:37:30.098 INFO 16044 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2023-11-22 10:37:30.098 INFO 16044 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2023-11-22 10:37:30.114 INFO 16044 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54] 2023-11-22 10:37:30.114 INFO 16044 --- [ main] o.a.catalina.core.AprLifecycleListener : Loaded Apache Tomcat Native library [1.2.31] using APR version [1.7.0]. 2023-11-22 10:37:30.114 INFO 16044 --- [ main] o.a.catalina.core.AprLifecycleListener : APR capabilities: IPv6 [true], sendfile [true], accept filters [false], random [true], UDS [true]. 2023-11-22 10:37:30.114 INFO 16044 --- [ main] o.a.catalina.core.AprLifecycleListener : APR/OpenSSL configuration: useAprConnector [false], useOpenSSL [true] 2023-11-22 10:37:30.114 INFO 16044 --- [ main] o.a.catalina.core.AprLifecycleListener : OpenSSL successfully initialized [OpenSSL 1.1.1l 24 Aug 2021] 2023-11-22 10:37:30.178 INFO 16044 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2023-11-22 10:37:30.178 INFO 16044 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 822 ms 2023-11-22 10:37:30.871 INFO 16044 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2023-11-22 10:37:30.871 INFO 16044 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'test.errorChannel' has 1 subscriber(s). 2023-11-22 10:37:30.871 INFO 16044 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger' 2023-11-22 10:37:30.887 INFO 16044 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2023-11-22 10:37:30.887 INFO 16044 --- [ main] c.e.springmaven.SpringMavenApplication : Started SpringMavenApplication in 1.845 seconds (JVM running for 2.87) 2023-11-22 10:42:34.668 INFO 16044 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2023-11-22 10:42:34.668 INFO 16044 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2023-11-22 10:42:34.669 INFO 16044 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms test发布消息成功! 接收消息主题 : warn_topic 接收消息Qos : 2 接收消息内容 : 我是告警消息 接收消息retained : false
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。