赞
踩
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。
EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:
官方文档:https://docs.emqx.cn/broker/v4.3/getting-started/install.html
MQTT全称消息队列遥测传输 (Message Queuing Telemetry Transport)。其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
官方网站:https://www.emqx.cn/
下载地址:https://www.emqx.cn/downloads#broker
解压程序包
启动 EMQ X Broker
进入到emqx解压后目录,进入bin目录,执行其下的命令脚本
#启动emqx
emqx start
#查看emqx状态
emqx status
#停止 EMQ X Broker
emqx stop
卸载 EMQ X Broker
直接删除 EMQ X 目录即可
Emqx自带dashboard插件:通过Dashboard,你可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。
除此之外,EMQ X Dashboard 还提供了规则引擎的可视化操作界面,同时集成了一个简易的 MQTT 客户端工具供用户测试使用。
当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是admin,密码是 public。
QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
QoS 1:消息传递至少 1 次。
QoS 2:消息仅传送一次。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
package cn.kt.mtqqdemo.mqtt; /** * Created by tao. * Date: 2021/4/12 13:57 * 描述: */ import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.UUID; public class App { public static void main(String[] args) { try { //apollo地址 String HOST = "tcp://127.0.0.1:1883"; //要订阅的主题 String TOPIC1 = "ceshi"; //指你Apollo中的用户名密码 String userName = "admin"; String pwd = "123456"; String clientid = UUID.randomUUID().toString().replace("-", ""); MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接对象 MqttConnectOptions options = new MqttConnectOptions(); //设置连接参数 //清除session回话 options.setCleanSession(false); options.setUserName(userName); options.setPassword(pwd.toCharArray()); //超时设置 options.setConnectionTimeout(10); //心跳保持时间 options.setKeepAliveInterval(20); //遗嘱:当该客户端端口连接时,会向whb主题发布一条信息 options.setWill("nick", "我挂了,你加油".getBytes(), 1, true); //监听对象:自己创建 client.setCallback(new PushCallback()); //打开连接 client.connect(options); //设置消息级别 int[] Qos = {1}; //订阅主题 String[] topics = {TOPIC1}; client.subscribe(topics, Qos); } catch (MqttException e) { e.printStackTrace(); } } }
package cn.kt.mtqqdemo.mqtt; /** * Created by tao. * Date: 2021/4/12 14:09 * 描述: */ import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.io.UnsupportedEncodingException; import java.util.Scanner; public class SendOut { //tcp://MQTT安装的服务器地址:MQTT定义的端口号 String HOST = "tcp://127.0.0.1:1883"; //定义一个主题 public static final String TOPIC = "ceshi"; // public static final String TOPIC = "abc"; //定义MQTT的ID,可以在MQTT服务配置中指定 private static final String clientid = "server1"; private MqttMessage message; public static final String TOPIC1 = "topic1"; public static final String userName = "admin"; public static final String pwd = "123456"; public MqttClient client; private MqttTopic topic; public SendOut() { try { client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } catch (MqttException e) { e.printStackTrace(); } } //发布消息 public void publish(MqttTopic topic, MqttMessage message) throws MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); //打印发送状态 System.out.println("message is published completely!" + token.isComplete()); } //建立连接:参数与订阅端相似 private void connect() throws MqttException { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(pwd.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); client.setCallback(new PushCallback()); client.connect(options); } public static void main(String[] args) throws MqttException, UnsupportedEncodingException { SendOut service = new SendOut(); Scanner sc = new Scanner(System.in); service.topic = service.client.getTopic(TOPIC); service.message = new MqttMessage(); //确保被收到一次 service.message.setQos(1); service.message.setPayload("干嘛这么想不开,要在脸上贴个输字".getBytes("UTF-8")); service.publish(service.topic, service.message); } }
package cn.kt.mtqqdemo.mqtt; /** * Created by tao. * Date: 2021/4/12 13:58 * 描述: */ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
package cn.kt.mtqqdemo.mqtt; /** * Created by tao. * Date: 2021/4/12 14:01 * 描述: */ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PushCallback implements MqttCallback { //连接丢失:一般用与重连 public void connectionLost(Throwable throwable) { System.out.println("丢失连接"); } //消息到达:指收到消息 public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { //(发布)publish后会执行到这里,发送状态 System.out.println("deliveryComplete---------" + token.isComplete()); } }
可以下载:
链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw
提取码:siwg
也可以用对应的cdn 地址
<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
<!DOCTYPE html > <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http - equiv="X-UA-Compatible" content="ie=edge"> <title> Document </title> <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet"> <script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script> <script src="./js/mqttws31.js" type="text/javascript"></script> <style> #contentList li { word-break: break-all; word-wrap: break-word; } </style> </head> <body> <div style="width: 900px;margin: 50px auto;"> <div class="form-group"> <label>评论人:</label> <input type="text" class="form-control" id="user"> </div> <div class="form-group"> <label>评论内容:</label> <textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea> </div> <div class="form-group"> <input type="button" value="发表评论" class="btn btn-primary" onclick="send()"> </div> <div> <ul id="contentList" class="list-group"> <!-- <li class="list-group-item"> <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }} </li> --> </ul> </div> </div> <script> // http://192.168.3.181/ var hostname = '192.168.3.181', port = 8083, clientId = 'client-' + generateUUID(), timeout = 1000, keepAlive = 2000, cleanSession = false, ssl = false, userName = 'Nick', password = '12356', topic = 'ceshi'; client = new Paho.MQTT.Client(hostname, port, clientId); //建立客户端实例 var options = { invocationContext: { host: hostname, port: port, path: client.path, clientId: clientId }, timeout: timeout, keepAliveInterval: keepAlive, cleanSession: cleanSession, useSSL: ssl, userName: userName, password: password, onSuccess: onConnect, onFailure: function(e) { console.log(e); } }; client.connect(options); //连接服务器并注册连接成功处理事件 function onConnect() { console.log("onConnected"); client.subscribe(topic); } client.onConnectionLost = onConnectionLost; //注册连接断开处理事件 client.onMessageArrived = onMessageArrived; //注册消息接收处理事件 function onConnectionLost(responseObject) { console.log(responseObject); if (responseObject.errorCode !== 0) { console.log("onConnectionLost:" + responseObject.errorMessage); console.log("连接已断开"); } } //收到消息时处理事件 function onMessageArrived(message) { var msg = message.payloadString; var obj = JSON.parse(msg); console.log("收到消息:" + obj); /* <li class="list-group-item"> <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }} </li> */ $('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:` + obj.name + `,时间:` + obj.time + `</span>` + obj.content + `</li>`)); } //点击发送按钮事件 function send() { var name = document.getElementById("user").value; var content = document.getElementById("content").value; console.log('name :>> ', name); console.log('content :>> ', content); var time = new Date().Format("yyyy-MM-dd hh:mm:ss"); var getConment = { name: name, content: content, time: time, } if (name) { var str = getConment; message = new Paho.MQTT.Message(JSON.stringify(str)); message.destinationName = topic; client.send(message); document.getElementById("content").value = ""; document.getElementById("user").value = ""; } } //生成UUID function generateUUID() { var d = new Date().getTime(); if (window.performance && typeof window.performance.now === "function") { d += performance.now(); //use high-precision timer if available } var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { var r = (d + Math.random() * 16) % 16 | 0; d = Math.floor(d / 16); return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16); }); return uuid; } //date时间格式化 Date.prototype.Format = function(fmt) { var o = { "M+": this.getMonth() + 1, //月份 "d+": this.getDate(), //日 "h+": this.getHours(), //小时 "m+": this.getMinutes(), //分 "s+": this.getSeconds(), //秒 "q+": Math.floor((this.getMonth() + 3) / 3), //季度 "S": this.getMilliseconds() //毫秒 }; if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length)); for (var k in o) if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length))); return fmt; } </script> </body> </html>
页面效果
java 连接mqtt订阅者收到消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。