赞
踩
EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用。EMQX 作为物联网应用开发和物联网平台搭建必须用到的基础设施软件,主要在边缘和云端实现物联网设备互联与设备上云,提供物联网设备接入、协议处理、消息路由、数据存储、流数据处理等核心能力。
访问官网下载安装包:下载 EMQX
解压zip文件得到软件目录
运行EMQX,打开cmd命令窗口,进入软件bin目录,输入emqx start
命令启动软件
登录emqx控制台,访问http://127.0.0.1:18083/,默认用户名、密码是admin、public。
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.2</version>
- </dependency>
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-
- public class App {
- public static void main(String[] args) {
- String subTopic = "testtopic/#";
- String pubTopic = "testtopic/1";
- String content = "Hello World";
- int qos = 2;
- String broker = "tcp://127.0.0.1:1883";
- String clientId = "emqx_test";
- MemoryPersistence persistence = new MemoryPersistence();
-
- try {
- MqttClient client = new MqttClient(broker, clientId, persistence);
-
- // MQTT 连接选项
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setUserName("用户名");
- connOpts.setPassword("密码".toCharArray());
-
- // 保留会话
- connOpts.setCleanSession(true);
-
- MqttCallback callback = new OnMessageCallback();
-
- // 设置回调
- client.setCallback(callback);
-
- // 建立连接
- System.out.println("Connecting to broker: " + broker);
- client.connect(connOpts);
-
- System.out.println("Connected");
- System.out.println("Publishing message: " + content);
-
- // 订阅主题
- client.subscribe(subTopic);
-
- // 消息发布所需参数
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(qos);
-
- // 发布消息
- client.publish(pubTopic, message);
- System.out.println("Message published");
-
- // client.disconnect();
- // System.out.println("Disconnected");
- // client.close();
- // System.exit(0);
- } catch (MqttException me) {
- System.out.println("reason " + me.getReasonCode());
- System.out.println("msg " + me.getMessage());
- System.out.println("loc " + me.getLocalizedMessage());
- System.out.println("cause " + me.getCause());
- System.out.println("excep " + me);
- me.printStackTrace();
- }
- }
- }
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
-
- public class OnMessageCallback implements MqttCallback {
- public void connectionLost(Throwable cause) {
- // 连接丢失后,一般在这里面进行重连
- System.out.println("连接断开,可以做重连");
- }
-
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // subscribe后得到的消息会执行到这里面
- System.out.println("接收消息主题:" + topic);
- System.out.println("接收消息Qos:" + message.getQos());
- System.out.println("接收消息内容:" + new String(message.getPayload()));
- }
-
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete---------" + token.isComplete());
- }
- }
- <!--mqtt-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-integration</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-mqtt</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.1</version>
- </dependency>
-
- <!-- fastJSON -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.56</version>
- </dependency>
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.integration.annotation.IntegrationComponentScan;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.integration.channel.DirectChannel;
- import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.MessageHandler;
-
- @Configuration
- @IntegrationComponentScan
- public class MqttSenderConfig {
-
- /**
- * 发布的bean名称
- */
- public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
-
- /**
- * 客户端与服务器之间的连接意外中断,服务器将发布客户端的"遗嘱"消息
- */
- private static final byte[] WILL_DATA;
- static {
- WILL_DATA = "offline".getBytes();
- }
- private static final String username = "admin";
- private static final String password = "DCDremote@997";
- private static final String url = "tcp://127.0.0.1:1883";
- private static final String clientId = "honeywell-server1";
- private static final String defaultTopic = "default";
-
- // @Value("${mqtt.username}")
- // private String username;
- //
- // @Value("${mqtt.password}")
- // private String password;
- //
- // @Value("${mqtt.url}")
- // private String url;
- //
- // @Value("${mqtt.sender.clientId}")
- // private String clientId;
- //
- // @Value("${mqtt.sender.topic}")
- // private String defaultTopic;
-
- @Bean
- public MqttConnectOptions getMqttConnectOption(){
- MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
- mqttConnectOptions.setCleanSession(true);
- mqttConnectOptions.setConnectionTimeout(10);
- mqttConnectOptions.setKeepAliveInterval(90);
- mqttConnectOptions.setAutomaticReconnect(true);
- mqttConnectOptions.setUserName(username);
- mqttConnectOptions.setPassword(password.toCharArray());
- mqttConnectOptions.setServerURIs(new String[]{url});
- mqttConnectOptions.setKeepAliveInterval(30);
- return mqttConnectOptions;
- }
-
- @Bean
- public MqttPahoClientFactory mqttClientsFactory() {
- DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
- factory.setConnectionOptions(getMqttConnectOption());
- return factory;
- }
-
- @Bean
- @ServiceActivator(inputChannel = "mqttOutboundChannel")
- public MessageHandler mqttOutbound() {
- MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientsFactory());
- messageHandler.setAsync(true);
- messageHandler.setDefaultTopic(defaultTopic);
- messageHandler.setDefaultQos(1);
- return messageHandler;
- }
-
- @Bean
- public MessageChannel mqttOutboundChannel() {
- return new DirectChannel();
- }
- }
- package org.jianying.emqxstudy.mqtt;
-
-
- import com.alibaba.fastjson.JSON;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.integration.channel.DirectChannel;
- import org.springframework.integration.core.MessageProducer;
- import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
- import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.MessageHandler;
- import org.springframework.messaging.MessagingException;
-
- import java.util.Arrays;
- import java.util.List;
- import java.util.Map;
-
-
-
- @Configuration
- public class MqttReceiverConfig {
-
- final static Logger logger = LoggerFactory.getLogger(MqttReceiverConfig.class);
-
-
- /**
- * 订阅的bean名称
- */
- public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
-
- // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
- private static final byte[] WILL_DATA;
-
- static {
- WILL_DATA = "offline".getBytes();
- }
-
- private static final String username = "admin";
- private static final String password = "DCDremote@997";
- private static final String url = "tcp://127.0.0.1:1883";
-
- // 接收消息的客户端id
- private static final String clientId = "test-server";
-
- // 接收的消息主题, $SYS/brokers 表示发送的是系统主题
- private static final String defaultTopic = "$SYS/brokers/+/clients/#,hello/info/faceid/#,hello/server/result/#,info_topic";
-
- // @Value("${mqtt.username}")
- // private String username;
- //
- // @Value("${mqtt.password}")
- // private String password;
- //
- // @Value("${mqtt.url}")
- // private String url;
- //
- // @Value("${mqtt.receiver.clientId}")
- // private String clientId;
- //
- // @Value("${mqtt.receiver.topic}")
- // private String defaultTopic;
-
- @Bean
- public MqttConnectOptions getMqttConnectOptions() {
- MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
- mqttConnectOptions.setCleanSession(true);
- mqttConnectOptions.setConnectionTimeout(10);
- mqttConnectOptions.setKeepAliveInterval(90);
- mqttConnectOptions.setAutomaticReconnect(true);
- mqttConnectOptions.setUserName(username);
- mqttConnectOptions.setPassword(password.toCharArray());
- mqttConnectOptions.setServerURIs(new String[]{url});
- mqttConnectOptions.setKeepAliveInterval(60);
- return mqttConnectOptions;
- }
-
- @Bean
- public MqttPahoClientFactory mqttClientFactory() {
- DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
- factory.setConnectionOptions(getMqttConnectOptions());
- return factory;
- }
-
- //接收通道
- @Bean
- public MessageChannel mqttInputChannel() {
- return new DirectChannel();
- }
-
- @Bean
- public MessageProducer inbound() {
-
- List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
- String[] topics = new String[topicList.size()];
- topicList.toArray(topics);
-
- MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
- topics);
- adapter.setCompletionTimeout(10000);
- adapter.setConverter(new DefaultPahoMessageConverter());
- adapter.setQos(1);
- adapter.setOutputChannel(mqttInputChannel());
- return adapter;
- }
-
- //通过通道获取数据
- @Bean
- @ServiceActivator(inputChannel = "mqttInputChannel")
- public MessageHandler handler() {
- return new MessageHandler() {
- @Override
- public void handleMessage(Message<?> message) throws MessagingException {
- logger.info(("收到消息" + message.getHeaders().get("mqtt_receivedTopic") + message.getPayload()));
- // 主题
- String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
- // 消息体
- Map maps = (Map) JSON.parse(message.getPayload().toString().trim());
- // 判断设备状态
- if (topic.contains("$SYS/brokers") && !topic.contains("faceid-server") && !topic.contains("faceid-mqtt-server")) {
- if (maps.get("clientid").toString().contains("uniwin-mqtt-client")) {
-
- }
- } else if (topic.contains("uniwin/server/result/faceid")) { //结果返回
- if (maps.get("type") != null && !maps.get("type").equals("")) {
-
- }
- } else {
- System.out.println("info...");
- if (maps.get("type") != null && !maps.get("type").equals("")) {
- String type = maps.get("type").toString();
- // 设备心跳检测
- if (type.equals("heart")) {
-
- }
- // 上传打卡记录
- if (type.equals("note")) {
- }
- // 上传设备参数
- if (type.equals("param_upload")) {
- }
- }
- }
-
- }
- };
- }
-
- }
-
- import org.springframework.integration.annotation.MessagingGateway;
- import org.springframework.integration.mqtt.support.MqttHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- @Component
- @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
- public interface MqttGateway {
- /**
- * 发送信息到MQTT服务器
- *
- * @param data 发送的文本
- */
- void sendToMqtt(String data);
-
- /**
- * 发送信息到MQTT服务器
- *
- * @param topic 主题
- * @param payload 消息主体
- */
- void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
- String payload);
-
- /**
- * 发送信息到MQTT服务器
- *
- * @param topic 主题
- * @param qos 对消息处理的几种机制。
- * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
- * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
- * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
- * @param payload 消息主体
- */
- void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
- @Header(MqttHeaders.QOS) int qos,
- String payload);
-
- }
详细介绍可以看官方API文档:HTTP API | EMQX 4.3 文档
下面的代码是以 v4.3 版本为例,以API调用的方式操作EMQX服务。
POST http://localhost:8081/api/v4/mqtt/publish
Name | Type | Required | Default | Description |
topic | String | Optional | 主题,与 topics 至少指定其中之一 | |
topics | String | Optional | 以 , 分割的多个主题,使用此字段能够同时发布消息到多个主题 | |
clientid | String | Required | 客户端标识符 | |
payload | String | Required | 消息正文 | |
encoding | String | Optional | plain | 消息正文使用的编码方式,目前仅支持 plain 与 base64 两种 |
qos | Integer | Optional | 0 | QoS 等级 |
retain | Boolean | Optional | false | 是否为保留消息 |
Name | Type | Description |
code | Integer | 0 |
- public void publiceMessage() {
- // 封装请求体参数
- Map<String, Object> map = new HashMap<>();
- map.put("clientid", "clientId");
- map.put("topic", "pubTopic");
- map.put("payload", "{\"iot_type\":10}");
- map.put("qos", 1);
- // json字符串请求体(注意:不转为json字符串,直接拼接字符串容易报错)
- String requestBody = JsonUtil.obj2string(map);
- // emqx路径
- String emqxApiBaseUrl = "http://127.0.0.1:8081"; // EMQ X API 地址
- try {
- // 构建发布消息的 URL
- String publicConnectionUrl = emqxApiBaseUrl + "/api/v4/mqtt/publish";
- // 创建 URL 对象
- URL url = new URL(publicConnectionUrl);
- // 创建 HttpURLConnection
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- // 设置访问权限(使用Authorization方式)
- connection.setRequestProperty("Authorization", "Basic YWRtaW46cHVibGlj");
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setRequestMethod("POST");
- connection.setDoOutput(true);
- // 将请求体写入连接
- try (OutputStream os = connection.getOutputStream()) {
- byte[] input = requestBody.getBytes(StandardCharsets.UTF_8);
- os.write(input, 0, input.length);
- }
- // 获取响应码
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- System.out.println("Successfully public: " + clientId);
- } else {
- System.out.println("Failed to public. Response Code: " + responseCode);
- }
- // 关闭连接
- connection.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
踢除指定客户端。注意踢除客户端操作会将连接与会话一并终结。
DELETE http://localhost:8081/api/v4/clients/{clientid}
Name | Type | Required | Description |
clientid | String | True | ClientID |
Name | Type | Description |
code | Integer | 0 |
- /**
- * 根据客户端id关闭客户端
- * @param clientId
- */
- private void closeClientByClientId(String clientId) {
- String emqxApiBaseUrl = "http://localhost:8081"; // EMQ X API 地址
- try {
- // 对clientId进行编码(避免clientId有特殊字符,比如/)
- String encodedClientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8.toString());
-
- // 构建关闭连接的 URL
- String closeConnectionUrl = emqxApiBaseUrl + "/api/v4/clients/" + encodedClientId;
-
- // 创建 URL 对象
- URL url = new URL(closeConnectionUrl);
-
- // 创建 HttpURLConnection
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
-
- // 设置访问权限
- connection.setRequestProperty("authorization", "Basic YWRtaW46cHVibGlj");
-
- // 设置请求方法为 DELETE
- connection.setRequestMethod("DELETE");
-
- // 获取响应码
- int responseCode = connection.getResponseCode();
-
- if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
- System.out.println("Successfully closed client connection with clientId: " + clientId);
- } else {
- System.out.println("Failed to close client connection. Response Code: " + responseCode);
- }
-
- // 关闭连接
- connection.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。