赞
踩
之前我们简单了解了MQTT中间件以及中间件RabbitMQ的搭建,接下来我们利用搭建好的中间件或者专门开放的用于测试的MQTT中间件服务来写一些MQTT客户端进行测试。
Eclipse Paho是MQTT(消息队列遥测传输)客户端实现。
Paho在各种平台和编程语言上可用:
因此其可使用的场景很广,基本上嵌入式设备、网关、路由器、手机、电脑平板、服务器等设备上都可以使用该库来进行MQTT协议实现,因此我们对MQTT的进一步学习和使用以该库的学习使用来进行(比如阿里云的物联网平台上就提供了Paho MQTT客户端接入的相关示例,当然如果你们项目中使用了其它方式也不必担心,MQTT的客户端不会特别复杂,主要就是建立连接、发布消息、订阅消息、断开连接)。
官网地址:https://www.eclipse.org/paho/
目前我们的5G智能网关中主要服务都是使用Go开发的,MQTT客户端接入物联网平台也是一部分,所以我们首先对Go利用Paho MQTT实现MQTT客户端做下总结,也方便后续开发和维护对应模块。
github地址:https://github.com/eclipse/paho.mqtt.golang
Paho MQTT示例:https://www.eclipse.org/paho/index.php?page=clients/golang/index.php
默认的测试MQTT服务地址无法连接,不知道是墙还是什么原因,然后找到了一个测试地址,如果你安装我们之前说的安装了RabbitMQ并开启了MQTT服务,那么也可以使用RabbitMQ做测试。
代码:
package main import ( "fmt" //导入对应MQTT包,没有的go get安装 MQTT "github.com/eclipse/paho.mqtt.golang" "os" "time" ) //定义函数回调 var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } func main() { //创建ClientOptions struct 设置 the broker address, clientid, //关闭trace output and 设置 the default message handler //有用户名密码的还需要设置用户名密码 //opts := MQTT.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883") opts := MQTT.NewClientOptions().AddBroker("tcp://40.40.40.193:1883") opts.SetClientID("go-simple") //opts.SetUsername("emqx") //opts.SetPassword("public") opts.SetUsername("admin") opts.SetPassword("admin*1993") opts.SetDefaultPublishHandler(f) //create and start a client using the above ClientOptions c := MQTT.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } //subscribe to the topic /go-mqtt/sample and request messages to be delivered //at a maximum qos of zero, wait for the receipt to confirm the subscription if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } //Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt //from the server after sending each message for i := 0; i < 5; i++ { text := fmt.Sprintf("this is msg #%d!", i) token := c.Publish("go-mqtt/sample", 0, false, text) token.Wait() } time.Sleep(3 * time.Second) //unsubscribe from /go-mqtt/sample if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } c.Disconnect(250) }
结果:
TOPIC: go-mqtt/sample
MSG: this is msg #0!
TOPIC: go-mqtt/sample
MSG: this is msg #1!
TOPIC: go-mqtt/sample
MSG: this is msg #2!
TOPIC: go-mqtt/sample
MSG: this is msg #3!
TOPIC: go-mqtt/sample
MSG: this is msg #4!
上面的broker.emqx.io当你没有安装中间件测试地址的时候可以做测试用。
GitHub地址:https://github.com/eclipse/paho.mqtt.python
Paho MQTT地址:https://www.eclipse.org/paho/index.php?page=clients/python/index.php
代码:
import random import time from paho.mqtt import client as mqtt_client # broker = 'broker.emqx.io' broker = '40.40.40.193' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.username_pw_set("admin", "admin*1993") client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_start() publish(client) if __name__ == '__main__': run()
结果:
onnected to MQTT Broker!
Send `messages: 0` to topic `/python/mqtt`
Received `messages: 0` from `/python/mqtt` topic
Send `messages: 1` to topic `/python/mqtt`
Received `messages: 1` from `/python/mqtt` topic
Send `messages: 2` to topic `/python/mqtt`
Received `messages: 2` from `/python/mqtt` topic
Send `messages: 3` to topic `/python/mqtt`
Received `messages: 3` from `/python/mqtt` topic
Send `messages: 4` to topic `/python/mqtt`
Received `messages: 4` from `/python/mqtt` topic
Send `messages: 5` to topic `/python/mqtt`
Received `messages: 5` from `/python/mqtt` topic
...
C这部分对嵌入式C的话Paho还做了优化,这里暂时不做处理,只在Ubuntu1604上做下测试。
GitHub地址:https://github.com/eclipse/paho.mqtt.c
Paho MQTT地址:https://www.eclipse.org/paho/index.php?page=clients/c/index.php
需要下载编译,编译时需要依赖openssl,否则编译报错(安装git等就不用多说了):
sudo apt-get install openssl
sudo apt-get install libssl-dev
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make
sudo make install
之后可以通过编译安装的示例进行客户端测试:
paho_c_pub
paho_c_sub
了解一下用法:
简单发布订阅测试(我的Ubuntu上跑了RabbitMQ,所以可以在本地交互):
$paho_c_sub -t my_topic --connection 127.0.0.1:1883
$paho_c_pub -t my_topic --connection 127.0.0.1:1883 -m "test"
当然,我们也可以根据自己的需求写代码调用对应paho的mqtt库来进行操作(交叉编译的话则mqtt库也需要交叉编译,如果是嵌入式Linux的话也可以像cJSON等直接引用源文件)。
#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientPub" #define TOPIC "my_topic" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(-1); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }
编译:
gcc mqtt_test.c -l paho-mqtt3cs
结果:
Hello World!
对于其它示例目前由于开发环境等因素影响暂时不做总结,有机会再做补充(java和嵌入式C环境近期可能会找时间测试,用STM32开发板和Android app做MQTT交互测试,我们会通过虚拟机搭建中间件,然后单片机发布消息,Android app订阅单片机模拟发布的传感器数据),需要的话可以根据相关资料做学习总结即可:https://www.eclipse.org/paho/index.php?page=downloads.php
截至目前:
Client | Official Release | Unstable | GitHub |
---|---|---|---|
Java | 1.1.1 - Maven Central | 1.1.2-SNAPSHOT - Eclipse | https://github.com/eclipse/paho.mqtt.java |
Python | 1.5.0 - Pypi (Pip) | Build from develop branch | https://github.com/eclipse/paho.mqtt.python |
JavaScript | 1.0.3 - Eclipse | 1.0.4-SNAPSHOT - Build from develop branch | https://github.com/eclipse/paho.mqtt.javascript |
Golang | 1.1.0 - Github repo tag v1.1.0 | go get github.com/eclipse/paho.mqtt.golang | https://github.com/eclipse/paho.mqtt.golang |
C | 1.3.5 | Build from master branch | https://github.com/eclipse/paho.mqtt.c |
C++ | 1.0.0 - Build from source | Build from master branch | https://github.com/eclipse/paho.mqtt.cpp |
Rust | Coming soon | Build from develop branch | https://github.com/eclipse/paho.mqtt.rust |
.Net (C#) | 4.3.0 - NuGet | Build from master branch | https://github.com/eclipse/paho.mqtt.m2mqtt |
Android Service | 1.1.1 - Eclipse | 1.1.2-SNAPSHOT - Eclipse | https://github.com/eclipse/paho.mqtt.android |
Embedded C/C++ | 1.1.0 - Build from source / Arduino | Build from master branch | https://github.com/eclipse/paho.mqtt.embedded-c |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。