当前位置:   article > 正文

MQTT介绍,服务器(EMQ X)搭建,客户端(mqtt-spy,安卓)使用,java编程示例_java mqtt emqx

java mqtt emqx

1.MQTT简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一个轻量的发布/订阅模式消息传输协议,是专门针对低带宽和不稳定网络环境的物联网应用设计的。

https://img-blog.csdnimg.cn/20210923185727239.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_16,color_FFFFFF,t_70,g_se,x_16

1.1.MQTT协议主要特性

MQTT协议工作在低带宽、不可靠的网络远程传感器和控制设备通讯而设计的协议,它具有一些主要特性:

1.开放消息协议,简单实现

2.使用发布/订阅模式,提供一对多的消息发布,解除应用程序耦合

3.对负载(协议携带的应用数据)内容屏蔽的消息传输

4.基于TCP/IP网络连接,提供有序、无损的双向连接

主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT—SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了

5.消息服务质量(Qos)支持,可靠传输保证:有三种发布服务质量

QoS0:“至多一次”,消息发布完全依赖底层TCP/IP‘网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久还会有第二次发送,这种方式主要普通的APP的推送,倘若您的智能设备在消息推送未联网,推送过去没收到,再次联网也就收不到了

QoS1:“至少一次”,确保消息到达,但消息重复可能会发生

QoS2:“只有一次”,确保消息到达一次。在一些重要比较严格的计费系统中,可以使用此级别,在计费系统中,消息重复或丢失导致不正确的结果,这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用于只会收到一次

6.1字节固定报头,2字节心跳报文,最小传输开销和协议交换,有效减少网络流量

这就是为什么在介绍里说他非常适合“在物联网领域,传感器与服务器的通信,信息的收集,要知道嵌入式设备的运算能力和带宽都相对薄弱”,使用这种协议来传递消息在适合不过了

7.在线状态感知:使用Last Will和Testament特性和通知有关各方客户端异常终端的机制

Last Will:遗言机制,用于通知同一主题下的其他设备,发送遗言的设备已经断开了连接

Testament: 遗嘱机制,功能类似于Last Will

1.2.应用

MQTT协议广泛应用于物联网、移动物联网、智能硬件、车联网、电力能源等领域

物联网M2M通信,物联网大数据采集

Android消息的推送,Web消息推送

移动即时消息,例如Facebook Messager

智能硬件、智能家居、智能电器

车联网通信,电动车站桩采集

智慧城市、远程医疗、远程教育

电力、石油与能源等行业市场

1.3.发布/订阅、主题、会话

MQTT是基于发布(Publish)/订阅(Subscribe)模式来进行通信及数据交换的,与Http的请求(Request)/应答(Response)的模式有本质的不同。

订阅者会向消息服务器订阅一个主题。成功订阅后,消息服务器会将该主题下的消息转发给所有的订阅者。

主题(Topic)以“/”为分隔符区分不同的层级。包含通配符“+”或“#”的主题又称为主题过滤器;不含通配符的称为主题名。

1.4.MQTT协议原理

https://img-blog.csdnimg.cn/20210923190818100.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_20,color_FFFFFF,t_70,g_se,x_16

实现MQTT 协议需要客户端和服务器端建立 TCP 连接,在通讯过程中, MQTT 协议中有三种身份 :发布者( Publish )、代理( Broker )(服务器)、订阅者( Subscribe )。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT 传输的消息分为 :主题( Topic )和负载( payload )

(1)Topic,可以理解为消息的类型,订阅者订阅( Subscribe )后,就会收到该主题的 消息内容( payload );

(2)payload可以理解为消息的内容,是指订阅者具体要使用的内容。

MQTT协议服务器:

        MQTT服务器以称为“消息代理”(Broker),它是位于消息发布者和订阅者之间,服务器可以:

        (1)接受来自客户端的网络连接;

        (2)接受客户端发布的信息;

        (3)处理来自客户端的订阅和退订请求;

        (4)向订阅的客户转发消息。

MQTT协议客户端:

        一个使用MQTT协议设备,它总是建立到服务器的网络连接,客户端可以:

        (1)发布其他客户端可能会订阅的信息;

        (2)订阅其它客户端发布的消息;

        (3)退订或删除消息;

        (4)断开与服务器连接。

MQTT协议栈

https://img-blog.csdnimg.cn/20210923191142235.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_20,color_FFFFFF,t_70,g_se,x_16

1.5.MQTT协议中的方法

MQTT协议中定义了一些方法(又称为动作),对于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源只服务器上的文件或输出。主要方法:

CONNECT: 客户端连接到服务器

CONNACT: 连接确认

PUBLISH: 发布消息

PUBACK: 发布确认

PUBREC: 发布的消息已接受

PUBREL: 发布的消息已释放

PUBCOMP: 发布完成

SUBSCRIBE: 订阅请求

SUBACK: 订阅确认

UNSUBSCRIBE: 取消订阅

UNSUBACK: 取消订阅确认

PINGREQ:客户端发送心跳

PINGRESP:客户端心跳响应

DISCONNECT: 断开连接

AUTI: 认证

1.6.MQTT接入流程

1.6.1.连接流程

https://img-blog.csdnimg.cn/20210923192736935.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_10,color_FFFFFF,t_70,g_se,x_16

•设备向平台发起 CONNECT 请求, CONNECT 中携带鉴权信息, 平台拿到鉴权信息进行鉴权。

•鉴权通过后,如果 CLEANSESSION=0, 平台将会加载保存的设备的一些信息, 如果 CLEANSESSION=1, 设备没有保存信息在平台,则不加载设备相关信息。

•返回鉴权结果 CONNACK。

1.6.2.发布消息流程

https://img-blog.csdnimg.cn/20210923192805438.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_11,color_FFFFFF,t_70,g_se,x_16

•设备发布 Qos0 消息;

•平台收到上报数据点后保存起来

QoS0:“至多一次”,消息发布完全依赖底层TCP/IP‘网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久还会有第二次发送,这种方式主要普通的APP的推送,倘若您的智能设备在消息推送未联网,推送过去没收到,再次联网也就收不到了(参见1.7. MQTT消息QOS

https://img-blog.csdnimg.cn/20210923192851987.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_11,color_FFFFFF,t_70,g_se,x_16

•设备发布 Qos1 消息

•平台收到上报数据点后保存起来

•平台给设备回复相应的 PUBACK

QoS1:“至少一次”,确保消息到达,但消息重复可能会发生(参见1.7.    MQTT消息QOS

https://img-blog.csdnimg.cn/20210923192916394.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_13,color_FFFFFF,t_70,g_se,x_16

•设备发布 Qos2 消息

•平台收到上报数据点后保存起来

•平台给设备回复相应的 PubRec 报文

•设备需回复平台 PubRel 报文,如超时不回平台则会断开相应连接

•平台给设备回复 PubComp 报文

QoS2:“只有一次”,确保消息到达一次。在一些重要比较严格的计费系统中,可以使用此级别,在计费系统中,消息重复或丢失导致不正确的结果,这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用于只会收到一次(参见1.7. MQTT消息QOS

1.6.3.订阅流程

https://img-blog.csdnimg.cn/20210923192954607.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_15,color_FFFFFF,t_70,g_se,x_16

设备发起订阅请求;

平台收到请求后更新topic列表;

平台给设备回复SubAck;

subscribe的requestqos级别可以为0、1、2。(参见1.7. MQTT消息QOS

https://img-blog.csdnimg.cn/20210923193027496.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_15,color_FFFFFF,t_70,g_se,x_16

•设备发起取消订阅请求。

•平台收到请求后更新 topic 列表。

•平台给设备回复 UnSubAck。

1.6.4.连接保活流程

https://img-blog.csdnimg.cn/20210923193057831.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_13,color_FFFFFF,t_70,g_se,x_16

客户端发送 PINGREQ 报文给服务端的,服务端发送 PINGRESP 报文响应客户端的 PINGREQ 报文,表示服务端还活着。

1.6.5.断开连接流程

https://img-blog.csdnimg.cn/20210923193124915.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_13,color_FFFFFF,t_70,g_se,x_16

DISCONNECT 报文是客户端发给服务端的最后一个控制报文,表示客户端正常断开连接。

1.7.MQTT消息QOS

级别0:最多一次,消息发送者会想尽办法发送消息,但是遇到意外并不会重试,这一级别会发生消息丢失。

https://img-blog.csdnimg.cn/2021092319320532.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_16,color_FFFFFF,t_70,g_se,x_16

级别 1 :至少一次。消息接收者如果 没有知会或者知会本身 丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,这一级别会保证消息到达,可能造成消息重复。

https://img-blog.csdnimg.cn/20210923193304151.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_17,color_FFFFFF,t_70,g_se,x_16

级别 2 :恰好一 次,确保消息只有一次到达。

https://img-blog.csdnimg.cn/20210923193327507.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_16,color_FFFFFF,t_70,g_se,x_16

MQTT发布消息QoS保证不是端到端的,是客户端与服务器之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS。

https://img-blog.csdnimg.cn/20210923193357183.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBASmVseS5ndWFu,size_20,color_FFFFFF,t_70,g_se,x_16

2.MQTT服务器搭建

MQTT是一种消息传输协议,和我们常用的RabbitMq比较类似,不过MQTT我们基本都是用于在物联网(比如说连接边缘计算机采集PLC数据)。

MQTT通讯模式看下边这张图应该就可以明白。发布者和订阅者提前约定一个主题,当发布者在这个主题下发布任何消息,订阅者就自动接收到了。

windows搭建MQTT服务器,网上大多资料都是说的客户端。我在这里说下我的模式,本地上搭建MQTT服务端,同时本地跑一个客户端,用来测试订阅其他客户端给我服务器发布的内容。

服务端常用的有EMQ X,还有apache apolle。 我用的是EMQ X。创建方式如下:

2.1.服务器(EMQ X)搭建

2.1.1.官网下载

登录官网:EMQ: 面向物联网的现代数据基础设施

找到EMQ X下载-->选择最新开源版本和你的电脑系统--> 然后点击 免费下载

2.1.2.解压压缩包

将下载好的压缩包(emqx-5.0.4-windows-amd64.tar.gz),解压到不含中文的路径

下载好后,解压目录如下:

https://img-blog.csdnimg.cn/2020070111044573.png

2.1.3.进入 bin目录下

2.1.4.直接在当前目录下输入cmd (不区分大小写)

2.1.5.启动 EMQ X

EMQ X 基本命令

emqx start 启动

emqx_ctl status 检查运行状态

emqx uninstall 服务卸载

emqx install 服务安装

emqx stop 停止

2.1.6.重置密码

emqx_ctl admins passwd admin 密码

2.1.7.输入账号密码进入MQTT服务器控制台

EMQ X已正常运行后,可在浏览器中输入:http://127.0.0.1:18083

或者 http://localhost:18083/

2.1.8.正常登录的界面如下所示

查看服务是否启动(两种方式):

1、使用dos命令查看:emqx_ctl status,如出现以下提示则启动成功

2、EMQ X还提供了强大的控制台服务,进入控制台可进行各种配置。

 2.2.登录EMQ X控制台

浏览器地址栏中直接输入:http://127.0.0.1:18083

或者 http://localhost:18083/

正常情况下会看到以下登陆界面:

https://img-blog.csdnimg.cn/20200701110444998.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3l1Zm0=,size_16,color_FFFFFF,t_70

可用 用户名:admin   密码:public 进行登陆

正常登陆后界面如下(未更改语言显示设置):

2.3.更改EMQ X控制台语言显示为中文

2.4.EMQ X端口的配置

在安装以后,EMQ X 默认会使用以下端口

1883: MQTT 协议端口

8883: MQTT/SSL 端口

8083: MQTT/WebSocket 端口

8080: HTTP API 端口

18083: Dashboard 管理控制台端口

'etc/emqx.conf'    //修改端口

listener.tcp.external = 0.0.0.0:1883

listener.ssl.external = 8883

listener.ws.external = 8083

修改 HTTP API 端口请

'etc/plugins/emqx_management.conf'

management.listener.http = 8080

修改 Dashboard 管理控制台端口请编辑 emqx_dashboard 插件的配置文件

'etc/plugins/emqx_dashboard.conf', 找到下述行,并按需修改端口号: 

 dashboard.listener.http = 18083

2.5.插件的启用和停止

插件是 EMQ X 的重要部分,EMQ X 的扩展功能基本都是通过插件实现的。包括 Dashbard 也是插件实现。您可以通过随软件附带的命令行工具 emqx_ctl 来启动和停止各个插件,进到bin文件下运行。

emqx_ctl plugins load plugin_name   //启动插件

emqx_ctl plugins unload plugin_name   //停止插件

2.6.修改 Erlang 虚拟机启动参数etc/emqx.conf’中有两个限定了虚拟机允许的最大连接数

node.process_limit //Erlang 虚拟机允许的最大进程数,EMQ X 一个连接会消耗 2 个 Erlang 进程;

node.max_ports     //Erlang虚拟机允许的最大 Port 数量,EMQ X 一个连接消耗 1 个 Port

//Erlang虚拟机允许的最大 Port 数量,EMQ X 一个连接消耗 1 个 Port
这连个参数可以设置为:

node.process_limit: 大于最大允许连接数

node.max_ports: 大于最大允许连接数

2.7.1个报错解决

dashboard总提示“URL not found”,开启服务时emqx console中报错如下:

Failed to start Ranch listener 'http:management' in ranch_tcp:listen([{cacerts,'...'},{key,'...'},{cert,'...'},{port,8080},{nodelay,true},{send_timeout_close,true},{send_timeout,15000},{backlog,512}]) for reason eacces (permission denied)

[Plugins] Load plugin emqx_management failed, cannot start plugin emqx_management for {...}

其原因为8080端口被占用,解决办法:

方法1.找出占用8080端口的程序,结束进程,然后重启EMQ。

# 在命令提示符中输入下面命令,查看端口8080是否被占用

netstat -aon|findstr 8080

# 结果:此处8080端口被PID 5744占用,监听5744端口中

#  TCP    0.0.0.0:8080           0.0.0.0:0              LISTENING       5744

#  TCP    [::]:8080              [::]:0                 LISTENING       5744

# 输入下面命令,查看时哪个进程或程序占用了某个端口;或者直接进入任务管理器查看详细信息中PID为5744的进程。

tasklist|findstr 5744

tasklist|findstr 5744

# 用以下命令关闭xxx.exe;或者直接在任务管理器中关闭相关进程即可

taskkill /f /t /im xxx.exe

# 再次检查8080端口是否被占用,如果没有信息返回,表示没有被占用,可放心使用啦

netstat -aon|findstr 8080

方法2.位于etc\plugins下,修改emqx_management.conf文件中端口为没有被占用的端口,然后重启EMQ

3.mqtt-spy下载

mqtt-spy开源的实用工具,用来帮助你监控MQTT主题活动,处理大量的信息。

mqtt-spy是EclipsePaho和EclipseIoT的一部分,它通过直接启动JAR文件在Java8和JavaFX之上运行,mqtt-spy有一种很好的交互方式来展现基本的MQTT发布/订阅机制。

mqtt-spy是一个JavaFX应用,所以在理论上应该可以在任意安装了Java8的操作系统上运行。

mqtt-spy没有提供独立的安装包,由于mqtt-spy是基于java的,所以需要在java环境下运行,使用前需要用户自行安装Java8运行环境Javajdk版本过高可能无法运行mqtt-spy。

出现下面报错:

Startupfailed:java.lang.NoClassDefFoundError:javax/xml/bind/ValidationEventHandler

这个问题是JDK版本过高,我之前是18版本,可以使用8版本。

以下是本文所使用的java版本信息:

因此若是JAVA版本不是8的,需要自行百度下载版本8并进行安装。

mqtt-spy下载地址:Releases · eclipse/paho.mqtt-spy · GitHub

下载包含依赖包的版本:mqtt-spy-1.0.1-beta-b18-jar-with-dependencies.jar。

4.mqtt-spy安装

mqtt-spy没有提供独立的安装包,本身无需安装。

只是由于mqtt-spy是基于java的,所以需要在java环境下运行,使用前需要用户自行安装Java8运行环境Javajdk版本过高可能无法运行mqtt-spy。

4.1.java-jdk8下载及安装

参考连接:java-jdk8下载及安装_m0_59619990的博客-CSDN博客_jdk8下载

1、下载JDK

a、直接官网下载(不推荐):http://www.oracle.com/;

b、或百度网盘:(推荐

链接:百度网盘 请输入提取码

提取码:mrxa

C、或其他路径下载

2、双击安装程序

点击下一步

https://img-blog.csdnimg.cn/img_convert/e5509827d687263a6d51975d9c32222a.png

安装目录若不修改,可直接下一步操作

https://img-blog.csdnimg.cn/img_convert/05f6fbd282232d00072c3442dd7d0604.png

等待安装完成

https://img-blog.csdnimg.cn/img_convert/dc0d7749cf3ad147e981f43b21359a7e.png     https://img-blog.csdnimg.cn/img_convert/c67254722db9d7d411c9e6a463326e73.png

https://img-blog.csdnimg.cn/img_convert/83c2862254585f5706d8c7316167409e.png

点击关闭,安装完毕。

3、程序安装完毕后,进行环境变量的配置

Java程序开发会使用JDK的两个命令:javac.exe、java.exe,路径是:C:\ProgramFiles\Java\jdk1.8.XXX\bin,由于这些命令不属于Windows本身命令,所以需要进行路径配置才可以使用。(XXX根据实际安装的版本填写)

a、右键单击计算机->属性->高级系统设置,选择“环境变量”。在“系统变量”栏下单击“新建”,创建新的系统环境变量。

https://img-blog.csdnimg.cn/img_convert/2e23dc278be0c72ecc4fa9f0a75aa60c.png    https://img-blog.csdnimg.cn/img_convert/d3b8b3fdfd26b9900e8a65a51dc7f79a.png

b、点击新建:变量名"JAVA_HOME",变量值"C:\ProgramFiles\Java\jdk1.8.XXX"(XXX根据实际安装的版本填写),点击确定新建

https://img-blog.csdnimg.cn/img_convert/fc781d86aa1dbc8aab8fe5951d606ce5.png

c、编辑->变量名"Path",在原变量值的最后面加上

";%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;"

https://img-blog.csdnimg.cn/img_convert/90d40697688ec68d4e5930e55b544b42.png

完成环境变量的配置。

4、使用java命令确认环境是否配置成功

win+R打开运行,输入CMD打开命令提示符窗口

https://img-blog.csdnimg.cn/img_convert/c7ea2157eeaa8cc6e6518eed6d365a84.png

分别输入javac,java,javac-version,java-version指令查看是否可看到JDK的编译器信息;

javac指令:

java指令:

javac-version,java-version指令:

javac-version和java-version中的版本号要一致,否则下一步会出错;若版本号不一致,需要自行百度如何卸载。

5、在C盘上创建一个文件夹,用记事本编写一个“HelloWorld”小程序来验证,保存为HelloWorld.java

public class HelloWorld {

    public static void main(String[] args){

        System.out.println("Hello World!");

    }

}

先输入javac HelloWorld.java进行编译,再输入java HelloWorld运行。如下图

至此安装成功。

5.mqtt-spy使用

mqtt-spy使用前,请确保服务器(此处是EMQ X)已经启动,处于运行中。

1、打开mqtt-spy软件开始编辑

双击mqtt-spy-1.0.1-beta-b18-jar-with-dependencies.jar运行

或者到jar包所在文件夹下输入CMD,然后在CMD中输入以下内容打开mqtt-spy软件:

java -jar mqtt-spy-1.0.1-beta-b18-jar-with-dependencies.jar

2、填写MQTT服务器ip端口信息(127.0.0.1:1883)

3.填写用户名密码(非必填)

如果服务端开启了ssl证书,则需要增加客户端TLS证书配置;否则不用开启

   

4.退出编辑并连接服务端

可在服务器(这里是EMQ X)中看到客户端的相关信息

5.创建消息订阅者

6.发布消息

5.1.保存mqtt-spy配置时出错的解决方法

无法保存每次编辑的客户端信息,并且会弹出提示框:   

解决方案,如右图:   

 6.消息订阅/发布的手机客户端

手机客户端使用前,请确保服务器(此处是EMQ X)已经启动,处于运行中。

6.1.安卓版(MQTT调试器) 

6.1.1.手机搜索并下载安装MQTT调试器

手机应用商店搜索MQTT调试器,下载并安装它安装

6.1.2.查询电脑端MQTT服务器(EMQ X)的IP地址

需要查一下自己电脑的ip地址,可以在cmd窗口中输入 ipconfig 查看,(我的是192.168.0.106. 一般局域网内都是192.168.0.***之类的,因此手机和电脑需要连接在同一个wifi下面,否则手机MQTT调试器会连接不上电脑的服务器)

6.1.3.打开手机上的MQTT调试器软件

6.1.4.手机上的MQTT调试器软件输入ip端口信息

tcp://192.168.0.106:1883 中需要将192.168.0.106替换成ipconfig查到的电脑ip。点击保存,配置客户端完成。

6.1.5.手机上的MQTT调试器软件连接服务器

6.1.6.手机上的MQTT调试器软件订阅主题

       

6.1.7.手机上的MQTT调试器软件发布消息

     

这样,手机和电脑端的网页就能互通消息了,当然也可以手机端也订阅(Subscribe)一个主题,电脑端发送这个主题的消息,手机端也可以收到,细节这里就不再写了,下面给出我的测试截图。

6.1.8.手机上的MQTT调试器软件订阅主题/发布消息记录

      

      

6.2.苹果版(未实现)

下述内容仅供参考,本人没有苹果手机,无法测试。

手机应用商店搜索MQTT调试器,下载安装,安装完并进入软件,这时你需要查一下自己电脑的ip地址,可以双击电脑右下角的wifi符号,然后进入属性查看。我的是10.0.0.252,大家的和我的肯定不一样哦,自己查!一般局域网内都是192.168.0.***之类的,我的比较特殊,不要介意。

然后打开手机上的软件

Host就是你电脑的ip地址,Port填1883,然后点击Connect,连接成功后再选择下方的“Publish”

Topic就输入我们网页里之前订阅的"r”,Message就随便填了,然后点击Publish发送,再回到计算机网页端看看是否收到消息了

这样,手机和电脑端的网页就能互通消息了,当然也可以手机端也订阅(Subscribe)一个主题,电脑端发送这个主题的消息,手机端也可以收到,我这里就不再写了,自己多玩玩吧。

7.编程示例(仅供参考,未验证

7.1.消息订阅的客户端例子(java)

package com.zkhuashui.support.mqtt;

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.text.MessageFormat;

public class SubscribeSample {    //消息订阅的客户端例子

    public static void main(String[] args) {

        String broker = "tcp://localhost:1883";    //连接服务器

        String clientId = "JavaSample";       //客户端id

        //Use the memory persistence

        MemoryPersistence persistence = new MemoryPersistence();     //持久化

        try {

            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);

            MqttConnectOptions connOpts = new MqttConnectOptions();

            connOpts.setCleanSession(true);

            System.out.println("Connecting to broker:" + broker);

            sampleClient.connect(connOpts);

            System.out.println("Connected");

            String topic = "testTopic";

            System.out.println("Subscribe to topic:" + topic);

            sampleClient.subscribe(topic);

            sampleClient.setCallback(new MqttCallback() {    //设置回调实例

                public void messageArrived(String topic, MqttMessage message) throws Exception {

                    String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);

                    System.out.println(theMsg);

                }

                public void deliveryComplete(IMqttDeliveryToken token) {

                }

                public void connectionLost(Throwable throwable) {

                }

            });

        } catch (MqttException me) {

            System.out.println("reason" + me.getReasonCode());

            System.out.println("msg" + me.getMessage());

            System.out.println("loc" + me.getLocalizedMessage());

            System.out.println("cause" + me.getCause());

            System.out.println("excep" + me);

            me.printStackTrace();

        }

    }

}

7.2.消息发布的客户端例子(java)

package com.zkhuashui.support.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PublishSample {   //消息发布的客户端例子

    public static void main(String[] args) {

        String topic = "testTopic";   //topic

        String content = "I Love Emq";

        int qos = 1;    //mqtt协议的主要特性,QoS 1:至少一次传送 (确认数据交付)

        String broker = "tcp://127.0.0.1:1883";    //服务器

        String userName = "admin";

        String password = "public";

        String clientId = "pubClient";

        // 内存存储

        MemoryPersistence persistence = new MemoryPersistence();

        try {

            // 创建客户端

            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);

            // 创建链接参数

            MqttConnectOptions connOpts = new MqttConnectOptions();

            // 在重新启动和重新连接时记住状态

            connOpts.setCleanSession(false);

            // 设置连接的用户名

            connOpts.setUserName(userName);

            connOpts.setPassword(password.toCharArray());

            // 建立连接

            sampleClient.connect(connOpts);

            // 创建消息

            MqttMessage message = new MqttMessage(content.getBytes());  //传入的参数字节

            // 设置消息的服务质量

            message.setQos(qos);

            // 发布消息

            sampleClient.publish(topic, message);

            // 断开连接

            sampleClient.disconnect();

            // 关闭客户端

            sampleClient.close();

        } catch (MqttException me) {

            System.out.println("reason " + me.getReasonCode());

            System.out.println("msg " + me.getMessage());

            System.out.println("loc " + me.getLocalizedMessage());

            System.out.println("cause " + me.getCause());

            System.out.println("excep " + me);

            me.printStackTrace();

        }

    }

}

7.3.MQTT-java模拟单车扫码开锁

在文件夹里面有源代码mqtt-java-bic-phone

具体描述参见:MQTT-java使用说明_Fandos的博客-CSDN博客_java使用mqtt

本文资料下载链接:

MQTT介绍,服务器(EMQX)搭建,客户端(mqtt-spy,安卓)使用,java编程示例-Java文档类资源-CSDN下载

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/188519?site
推荐阅读
相关标签
  

闽ICP备14008679号