赞
踩
LoRa网关与服务器通信协议这节给大家讲了下LoRa网关与服务器之间的协议格式。那么他们是怎么通信的呢?主流的有原生的UDP和Mqtt,今天就讲讲Mqtt这种方式。
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于 发布/订阅 范式的消息协议。它工作在 TCP/IP 协议族上,是为 硬件性能低下 的 远程设备 以及网络状况糟糕 的情况下而设计的发布/订阅型消息协议,它是一个消息中间件 。(度娘版)
如上图是IBM官方给的图解。我对他的理解就是:Broker是消息集散中心,负责消息的接收,汇总,分发,所有终端或者服务器都是消息的生产者或者消费者。就整个LoRa通信过程,网关端会生产PUSH_DATA,PULL_DATA同时又会消费PULL_ACK等回复消息。同理服务器端会消费PUSH_DATA,PULL_DATA生产PULL_ACK。
实现Mqtt协议的中间件太多太多,老牌的Rabbitmq,阿里的Rocketmq,本人以前用过的mosquito,还有今天介绍的emq等等。
本人安装环境:ubuntu 16.04的虚机。
IP:
下载地址
sudo apt update && sudo apt install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
curl -fsSL https://repos.emqx.io/gpg.pub | sudo apt-key add -
验证密钥
sudo apt-key fingerprint 3E640D53
sudo add-apt-repository \
"deb [arch=amd64] https://repos.emqx.io/emqx-ce/deb/ubuntu/ \
$(lsb_release -cs) \
stable"
sudo apt update
sudo apt install emqx
sudo service emqx start
EMQ有可视化的管理页面:http://192.168.237.128:18083
可根据自己的IP做相应修改;
下图为管理页面
还可以切换中文奥
在此只做简单演示
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttPublish { private static int QOS = 1; private static String HOST = "tcp://192.168.237.128:1883"; private static String userName = "admin"; private static String password = "public"; private static String msg = "{\"rxpk\":[\n" + "\t{\n" + "\t\t\"time\":\"2013-03-31T16:21:17.528002Z\",\n" + "\t\t\"tmst\":3512348611,\n" + "\t\t\"chan\":2,\n" + "\t\t\"rfch\":0,\n" + "\t\t\"freq\":866.349812,\n" + "\t\t\"stat\":1,\n" + "\t\t\"modu\":\"LORA\",\n" + "\t\t\"datr\":\"SF7BW125\",\n" + "\t\t\"codr\":\"4/6\",\n" + "\t\t\"rssi\":-35,\n" + "\t\t\"lsnr\":5.1,\n" + "\t\t\"size\":32,\n" + "\t\t\"data\":\"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84\"\n" + "\t},{\n" + "\t\t\"time\":\"2013-03-31T16:21:17.530974Z\",\n" + "\t\t\"tmst\":3512348514,\n" + "\t\t\"chan\":9,\n" + "\t\t\"rfch\":1,\n" + "\t\t\"freq\":869.1,\n" + "\t\t\"stat\":1,\n" + "\t\t\"modu\":\"FSK\",\n" + "\t\t\"datr\":50000,\n" + "\t\t\"rssi\":-75,\n" + "\t\t\"size\":16,\n" + "\t\t\"data\":\"VEVTVF9QQUNLRVRfMTIzNA==\"\n" + "\t},{\n" + "\t\t\"time\":\"2013-03-31T16:21:17.532038Z\",\n" + "\t\t\"tmst\":3316387610,\n" + "\t\t\"chan\":0,\n" + "\t\t\"rfch\":0,\n" + "\t\t\"freq\":863.00981,\n" + "\t\t\"stat\":1,\n" + "\t\t\"modu\":\"LORA\",\n" + "\t\t\"datr\":\"SF10BW125\",\n" + "\t\t\"codr\":\"4/7\",\n" + "\t\t\"rssi\":-38,\n" + "\t\t\"lsnr\":5.5,\n" + "\t\t\"size\":32,\n" + "\t\t\"data\":\"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass\"\n" + "\t}\n" + "]}\n"; public static void main(String[] args){ start(msg); } public static MqttClient connect(String clientId, String userName, String password){ MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); MqttClient client = null; try { client = new MqttClient(HOST, clientId, persistence); // client.setCallback(new PushCallback()); client.connect(options); } catch (MqttException e) { e.printStackTrace(); } return client; } private static void publish(MqttClient client, String msg, String topic){ MqttMessage message = new MqttMessage(msg.getBytes()); message.setQos(QOS); message.setRetained(false); try { client.publish(topic, message); } catch (MqttException e) { e.printStackTrace(); } } public static void start(String msg){ /*String msg = "Hello !";*/ String clientId = "ZWT_PUBLISH"; String topic = "PUSH_DATA"; MqttClient client = (MqttClient) connect(clientId, userName, password); if (client != null) { publish(client, msg, topic); System.out.println("Start-Public Message:" + msg); } if (client != null) { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } }
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttSub { private static int QOS = 1; private static String HOST = "tcp://192.168.237.128:1883"; private static String userName = "admin"; private static String password = "public"; public static void main(String[] args){ start(); } private static MqttClient connect(String clientId){ System.out.println("MqttSub connect"); MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setKeepAliveInterval(20); options.setConnectionTimeout(10); MqttClient client = null; try { client = new MqttClient(HOST, clientId, persistence); client.setCallback(new PushCallback()); client.connect(options); } catch (MqttException e) { e.printStackTrace(); } return client; } public static void receive(MqttClient client, String topic){ int[] Qos = {QOS}; String[] topics = {topic}; try { client.subscribe(topics, Qos); } catch (MqttException e) { e.printStackTrace(); } } public static void start(){ String clientId = "ZWT_SUB"; String topic = "PUSH_DATA"; MqttClient client = connect(clientId); if (client != null) { receive(client, topic); } } }
回调函数
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PushCallback implements MqttCallback { @Override public void connectionLost(Throwable throwable) { System.out.println("connect lost"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client topic : " + topic); System.out.println("Client Qos : " + message.getQos()); System.out.println("Client msg : " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete****" + token.isComplete()); } }
控制台打印
publish
Start-----Public Message:{"rxpk":[ { "time":"2013-03-31T16:21:17.528002Z", "tmst":3512348611, "chan":2, "rfch":0, "freq":866.349812, "stat":1, "modu":"LORA", "datr":"SF7BW125", "codr":"4/6", "rssi":-35, "lsnr":5.1, "size":32, "data":"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84" },{ "time":"2013-03-31T16:21:17.530974Z", "tmst":3512348514, "chan":9, "rfch":1, "freq":869.1, "stat":1, "modu":"FSK", "datr":50000, "rssi":-75, "size":16, "data":"VEVTVF9QQUNLRVRfMTIzNA==" },{ "time":"2013-03-31T16:21:17.532038Z", "tmst":3316387610, "chan":0, "rfch":0, "freq":863.00981, "stat":1, "modu":"LORA", "datr":"SF10BW125", "codr":"4/7", "rssi":-38, "lsnr":5.5, "size":32, "data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass" } ]}
sub
Client topic : PUSH_DATA Client Qos : 1 Client msg : {"rxpk":[ { "time":"2013-03-31T16:21:17.528002Z", "tmst":3512348611, "chan":2, "rfch":0, "freq":866.349812, "stat":1, "modu":"LORA", "datr":"SF7BW125", "codr":"4/6", "rssi":-35, "lsnr":5.1, "size":32, "data":"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84" },{ "time":"2013-03-31T16:21:17.530974Z", "tmst":3512348514, "chan":9, "rfch":1, "freq":869.1, "stat":1, "modu":"FSK", "datr":50000, "rssi":-75, "size":16, "data":"VEVTVF9QQUNLRVRfMTIzNA==" },{ "time":"2013-03-31T16:21:17.532038Z", "tmst":3316387610, "chan":0, "rfch":0, "freq":863.00981, "stat":1, "modu":"LORA", "datr":"SF10BW125", "codr":"4/7", "rssi":-38, "lsnr":5.5, "size":32, "data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass" } ]}
我们再看看EMQ的控制台
因为没有改emq的配置文件Node都是emq@127.0.0.1。
这一步走完,LoRaWAN Network server的第一步就完了。接下来就是数据分流解析了。
扔我
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。