当前位置:   article > 正文

EMQX安装和Java订阅、发布mqtt消息_java 订阅 emqx

java 订阅 emqx

一、EMQX介绍

EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用。EMQX 作为物联网应用开发和物联网平台搭建必须用到的基础设施软件,主要在边缘和云端实现物联网设备互联与设备上云,提供物联网设备接入、协议处理、消息路由、数据存储、流数据处理等核心能力。

二、EMQX安装

访问官网下载安装包:下载 EMQX

解压zip文件得到软件目录

运行EMQX,打开cmd命令窗口,进入软件bin目录,输入emqx start命令启动软件

登录emqx控制台,访问http://127.0.0.1:18083/,默认用户名、密码是admin、public。

三、Java实现发送和订阅消息

3.1 创建客户端基础代码
  • 引入pom依赖
  1. <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4. <version>1.2.2</version>
  5. </dependency>
  • 订阅和发布消息相关代码
  1. import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. public class App {
  4. public static void main(String[] args) {
  5. String subTopic = "testtopic/#";
  6. String pubTopic = "testtopic/1";
  7. String content = "Hello World";
  8. int qos = 2;
  9. String broker = "tcp://127.0.0.1:1883";
  10. String clientId = "emqx_test";
  11. MemoryPersistence persistence = new MemoryPersistence();
  12. try {
  13. MqttClient client = new MqttClient(broker, clientId, persistence);
  14. // MQTT 连接选项
  15. MqttConnectOptions connOpts = new MqttConnectOptions();
  16. connOpts.setUserName("用户名");
  17. connOpts.setPassword("密码".toCharArray());
  18. // 保留会话
  19. connOpts.setCleanSession(true);
  20. MqttCallback callback = new OnMessageCallback();
  21. // 设置回调
  22. client.setCallback(callback);
  23. // 建立连接
  24. System.out.println("Connecting to broker: " + broker);
  25. client.connect(connOpts);
  26. System.out.println("Connected");
  27. System.out.println("Publishing message: " + content);
  28. // 订阅主题
  29. client.subscribe(subTopic);
  30. // 消息发布所需参数
  31. MqttMessage message = new MqttMessage(content.getBytes());
  32. message.setQos(qos);
  33. // 发布消息
  34. client.publish(pubTopic, message);
  35. System.out.println("Message published");
  36. // client.disconnect();
  37. // System.out.println("Disconnected");
  38. // client.close();
  39. // System.exit(0);
  40. } catch (MqttException me) {
  41. System.out.println("reason " + me.getReasonCode());
  42. System.out.println("msg " + me.getMessage());
  43. System.out.println("loc " + me.getLocalizedMessage());
  44. System.out.println("cause " + me.getCause());
  45. System.out.println("excep " + me);
  46. me.printStackTrace();
  47. }
  48. }
  49. }
  • 接收消息相关代码
  1. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  2. import org.eclipse.paho.client.mqttv3.MqttCallback;
  3. import org.eclipse.paho.client.mqttv3.MqttMessage;
  4. public class OnMessageCallback implements MqttCallback {
  5. public void connectionLost(Throwable cause) {
  6. // 连接丢失后,一般在这里面进行重连
  7. System.out.println("连接断开,可以做重连");
  8. }
  9. public void messageArrived(String topic, MqttMessage message) throws Exception {
  10. // subscribe后得到的消息会执行到这里面
  11. System.out.println("接收消息主题:" + topic);
  12. System.out.println("接收消息Qos:" + message.getQos());
  13. System.out.println("接收消息内容:" + new String(message.getPayload()));
  14. }
  15. public void deliveryComplete(IMqttDeliveryToken token) {
  16. System.out.println("deliveryComplete---------" + token.isComplete());
  17. }
  18. }

3.2 创建客户端进阶代码
  • 引入pom依赖
  1. <!--mqtt-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-integration</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.integration</groupId>
  8. <artifactId>spring-integration-stream</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.integration</groupId>
  12. <artifactId>spring-integration-mqtt</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.eclipse.paho</groupId>
  16. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  17. <version>1.2.1</version>
  18. </dependency>
  19. <!-- fastJSON -->
  20. <dependency>
  21. <groupId>com.alibaba</groupId>
  22. <artifactId>fastjson</artifactId>
  23. <version>1.2.56</version>
  24. </dependency>
  • 定义发送消息客户端的配置
  1. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.integration.annotation.IntegrationComponentScan;
  6. import org.springframework.integration.annotation.ServiceActivator;
  7. import org.springframework.integration.channel.DirectChannel;
  8. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  9. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  10. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  11. import org.springframework.messaging.MessageChannel;
  12. import org.springframework.messaging.MessageHandler;
  13. @Configuration
  14. @IntegrationComponentScan
  15. public class MqttSenderConfig {
  16. /**
  17. * 发布的bean名称
  18. */
  19. public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  20. /**
  21. * 客户端与服务器之间的连接意外中断,服务器将发布客户端的"遗嘱"消息
  22. */
  23. private static final byte[] WILL_DATA;
  24. static {
  25. WILL_DATA = "offline".getBytes();
  26. }
  27. private static final String username = "admin";
  28. private static final String password = "DCDremote@997";
  29. private static final String url = "tcp://127.0.0.1:1883";
  30. private static final String clientId = "honeywell-server1";
  31. private static final String defaultTopic = "default";
  32. // @Value("${mqtt.username}")
  33. // private String username;
  34. //
  35. // @Value("${mqtt.password}")
  36. // private String password;
  37. //
  38. // @Value("${mqtt.url}")
  39. // private String url;
  40. //
  41. // @Value("${mqtt.sender.clientId}")
  42. // private String clientId;
  43. //
  44. // @Value("${mqtt.sender.topic}")
  45. // private String defaultTopic;
  46. @Bean
  47. public MqttConnectOptions getMqttConnectOption(){
  48. MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
  49. mqttConnectOptions.setCleanSession(true);
  50. mqttConnectOptions.setConnectionTimeout(10);
  51. mqttConnectOptions.setKeepAliveInterval(90);
  52. mqttConnectOptions.setAutomaticReconnect(true);
  53. mqttConnectOptions.setUserName(username);
  54. mqttConnectOptions.setPassword(password.toCharArray());
  55. mqttConnectOptions.setServerURIs(new String[]{url});
  56. mqttConnectOptions.setKeepAliveInterval(30);
  57. return mqttConnectOptions;
  58. }
  59. @Bean
  60. public MqttPahoClientFactory mqttClientsFactory() {
  61. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  62. factory.setConnectionOptions(getMqttConnectOption());
  63. return factory;
  64. }
  65. @Bean
  66. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  67. public MessageHandler mqttOutbound() {
  68. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientsFactory());
  69. messageHandler.setAsync(true);
  70. messageHandler.setDefaultTopic(defaultTopic);
  71. messageHandler.setDefaultQos(1);
  72. return messageHandler;
  73. }
  74. @Bean
  75. public MessageChannel mqttOutboundChannel() {
  76. return new DirectChannel();
  77. }
  78. }
  • 编写接收消息的客户端的相关配置
  1. package org.jianying.emqxstudy.mqtt;
  2. import com.alibaba.fastjson.JSON;
  3. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.integration.annotation.ServiceActivator;
  10. import org.springframework.integration.channel.DirectChannel;
  11. import org.springframework.integration.core.MessageProducer;
  12. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  13. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  14. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  15. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  16. import org.springframework.messaging.Message;
  17. import org.springframework.messaging.MessageChannel;
  18. import org.springframework.messaging.MessageHandler;
  19. import org.springframework.messaging.MessagingException;
  20. import java.util.Arrays;
  21. import java.util.List;
  22. import java.util.Map;
  23. @Configuration
  24. public class MqttReceiverConfig {
  25. final static Logger logger = LoggerFactory.getLogger(MqttReceiverConfig.class);
  26. /**
  27. * 订阅的bean名称
  28. */
  29. public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  30. // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
  31. private static final byte[] WILL_DATA;
  32. static {
  33. WILL_DATA = "offline".getBytes();
  34. }
  35. private static final String username = "admin";
  36. private static final String password = "DCDremote@997";
  37. private static final String url = "tcp://127.0.0.1:1883";
  38. // 接收消息的客户端id
  39. private static final String clientId = "test-server";
  40. // 接收的消息主题, $SYS/brokers 表示发送的是系统主题
  41. private static final String defaultTopic = "$SYS/brokers/+/clients/#,hello/info/faceid/#,hello/server/result/#,info_topic";
  42. // @Value("${mqtt.username}")
  43. // private String username;
  44. //
  45. // @Value("${mqtt.password}")
  46. // private String password;
  47. //
  48. // @Value("${mqtt.url}")
  49. // private String url;
  50. //
  51. // @Value("${mqtt.receiver.clientId}")
  52. // private String clientId;
  53. //
  54. // @Value("${mqtt.receiver.topic}")
  55. // private String defaultTopic;
  56. @Bean
  57. public MqttConnectOptions getMqttConnectOptions() {
  58. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  59. mqttConnectOptions.setCleanSession(true);
  60. mqttConnectOptions.setConnectionTimeout(10);
  61. mqttConnectOptions.setKeepAliveInterval(90);
  62. mqttConnectOptions.setAutomaticReconnect(true);
  63. mqttConnectOptions.setUserName(username);
  64. mqttConnectOptions.setPassword(password.toCharArray());
  65. mqttConnectOptions.setServerURIs(new String[]{url});
  66. mqttConnectOptions.setKeepAliveInterval(60);
  67. return mqttConnectOptions;
  68. }
  69. @Bean
  70. public MqttPahoClientFactory mqttClientFactory() {
  71. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  72. factory.setConnectionOptions(getMqttConnectOptions());
  73. return factory;
  74. }
  75. //接收通道
  76. @Bean
  77. public MessageChannel mqttInputChannel() {
  78. return new DirectChannel();
  79. }
  80. @Bean
  81. public MessageProducer inbound() {
  82. List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
  83. String[] topics = new String[topicList.size()];
  84. topicList.toArray(topics);
  85. MqttPahoMessageDrivenChannelAdapter adapter =
  86. new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
  87. topics);
  88. adapter.setCompletionTimeout(10000);
  89. adapter.setConverter(new DefaultPahoMessageConverter());
  90. adapter.setQos(1);
  91. adapter.setOutputChannel(mqttInputChannel());
  92. return adapter;
  93. }
  94. //通过通道获取数据
  95. @Bean
  96. @ServiceActivator(inputChannel = "mqttInputChannel")
  97. public MessageHandler handler() {
  98. return new MessageHandler() {
  99. @Override
  100. public void handleMessage(Message<?> message) throws MessagingException {
  101. logger.info(("收到消息" + message.getHeaders().get("mqtt_receivedTopic") + message.getPayload()));
  102. // 主题
  103. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
  104. // 消息体
  105. Map maps = (Map) JSON.parse(message.getPayload().toString().trim());
  106. // 判断设备状态
  107. if (topic.contains("$SYS/brokers") && !topic.contains("faceid-server") && !topic.contains("faceid-mqtt-server")) {
  108. if (maps.get("clientid").toString().contains("uniwin-mqtt-client")) {
  109. }
  110. } else if (topic.contains("uniwin/server/result/faceid")) { //结果返回
  111. if (maps.get("type") != null && !maps.get("type").equals("")) {
  112. }
  113. } else {
  114. System.out.println("info...");
  115. if (maps.get("type") != null && !maps.get("type").equals("")) {
  116. String type = maps.get("type").toString();
  117. // 设备心跳检测
  118. if (type.equals("heart")) {
  119. }
  120. // 上传打卡记录
  121. if (type.equals("note")) {
  122. }
  123. // 上传设备参数
  124. if (type.equals("param_upload")) {
  125. }
  126. }
  127. }
  128. }
  129. };
  130. }
  131. }
  • 编写发送消息的工具类
  1. import org.springframework.integration.annotation.MessagingGateway;
  2. import org.springframework.integration.mqtt.support.MqttHeaders;
  3. import org.springframework.messaging.handler.annotation.Header;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  7. public interface MqttGateway {
  8. /**
  9. * 发送信息到MQTT服务器
  10. *
  11. * @param data 发送的文本
  12. */
  13. void sendToMqtt(String data);
  14. /**
  15. * 发送信息到MQTT服务器
  16. *
  17. * @param topic 主题
  18. * @param payload 消息主体
  19. */
  20. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  21. String payload);
  22. /**
  23. * 发送信息到MQTT服务器
  24. *
  25. * @param topic 主题
  26. * @param qos 对消息处理的几种机制。
  27. * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
  28. * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
  29. * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
  30. * @param payload 消息主体
  31. */
  32. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  33. @Header(MqttHeaders.QOS) int qos,
  34. String payload);
  35. }
3.3 EMQX API使用示例
  • EMQX 提供了 HTTP API 以实现与外部系统的集成,例如查询客户端信息、发布消息和创建规则等。
  • EMQX 的 HTTP API 服务默认监听 8081 端口,可通过 etc/plugins/emqx_management.conf 配置文件修改监听端口,或启用 HTTPS 监听。EMQX 4.0.0 (opens new window)以后的所有 API 调用均以 api/v4 开头。

详细介绍可以看官方API文档:HTTP API | EMQX 4.3 文档

下面的代码是以 v4.3 版本为例,以API调用的方式操作EMQX服务。

3.3.1 消息发布
  • 请求方式

POST http://localhost:8081/api/v4/mqtt/publish

  • 请求参数(json数据)

Name

Type

Required

Default

Description

topic

String

Optional

主题,与 topics 至少指定其中之一

topics

String

Optional

以 , 分割的多个主题,使用此字段能够同时发布消息到多个主题

clientid

String

Required

客户端标识符

payload

String

Required

消息正文

encoding

String

Optional

plain

消息正文使用的编码方式,目前仅支持 plain 与 base64 两种

qos

Integer

Optional

0

QoS 等级

retain

Boolean

Optional

false

是否为保留消息

  • 请求成功返回结果

Name

Type

Description

code

Integer

0

  • 代码示例
  1. public void publiceMessage() {
  2. // 封装请求体参数
  3. Map<String, Object> map = new HashMap<>();
  4. map.put("clientid", "clientId");
  5. map.put("topic", "pubTopic");
  6. map.put("payload", "{\"iot_type\":10}");
  7. map.put("qos", 1);
  8. // json字符串请求体(注意:不转为json字符串,直接拼接字符串容易报错)
  9. String requestBody = JsonUtil.obj2string(map);
  10. // emqx路径
  11. String emqxApiBaseUrl = "http://127.0.0.1:8081"; // EMQ X API 地址
  12. try {
  13. // 构建发布消息的 URL
  14. String publicConnectionUrl = emqxApiBaseUrl + "/api/v4/mqtt/publish";
  15. // 创建 URL 对象
  16. URL url = new URL(publicConnectionUrl);
  17. // 创建 HttpURLConnection
  18. HttpURLConnection connection = (HttpURLConnection) url.openConnection();
  19. // 设置访问权限(使用Authorization方式)
  20. connection.setRequestProperty("Authorization", "Basic YWRtaW46cHVibGlj");
  21. connection.setRequestProperty("Content-Type", "application/json");
  22. connection.setRequestMethod("POST");
  23. connection.setDoOutput(true);
  24. // 将请求体写入连接
  25. try (OutputStream os = connection.getOutputStream()) {
  26. byte[] input = requestBody.getBytes(StandardCharsets.UTF_8);
  27. os.write(input, 0, input.length);
  28. }
  29. // 获取响应码
  30. int responseCode = connection.getResponseCode();
  31. if (responseCode == HttpURLConnection.HTTP_OK) {
  32. System.out.println("Successfully public: " + clientId);
  33. } else {
  34. System.out.println("Failed to public. Response Code: " + responseCode);
  35. }
  36. // 关闭连接
  37. connection.disconnect();
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. }
  41. }
3.3.2 根据clientid断开并剔除客户端

踢除指定客户端。注意踢除客户端操作会将连接与会话一并终结。

  • 请求方式

DELETE http://localhost:8081/api/v4/clients/{clientid}

  • 请求参数(Path Parameters)

Name

Type

Required

Description

clientid

String

True

ClientID

  • 请求成功结果

Name

Type

Description

code

Integer

0

  • 代码示例
  1. /**
  2. * 根据客户端id关闭客户端
  3. * @param clientId
  4. */
  5. private void closeClientByClientId(String clientId) {
  6. String emqxApiBaseUrl = "http://localhost:8081"; // EMQ X API 地址
  7. try {
  8. // 对clientId进行编码(避免clientId有特殊字符,比如/
  9. String encodedClientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8.toString());
  10. // 构建关闭连接的 URL
  11. String closeConnectionUrl = emqxApiBaseUrl + "/api/v4/clients/" + encodedClientId;
  12. // 创建 URL 对象
  13. URL url = new URL(closeConnectionUrl);
  14. // 创建 HttpURLConnection
  15. HttpURLConnection connection = (HttpURLConnection) url.openConnection();
  16. // 设置访问权限
  17. connection.setRequestProperty("authorization", "Basic YWRtaW46cHVibGlj");
  18. // 设置请求方法为 DELETE
  19. connection.setRequestMethod("DELETE");
  20. // 获取响应码
  21. int responseCode = connection.getResponseCode();
  22. if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
  23. System.out.println("Successfully closed client connection with clientId: " + clientId);
  24. } else {
  25. System.out.println("Failed to close client connection. Response Code: " + responseCode);
  26. }
  27. // 关闭连接
  28. connection.disconnect();
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/188701
推荐阅读
相关标签
  

闽ICP备14008679号