赞
踩
MQTT:全拼是Message Queuing Telemetry Transport,即:消息队列遥测传输协议;
MQTT是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议族上,由IBM在1999年发布。主要用于针对物联网应用中低宽带和网络环境不是很稳定的场景。比如智能硬件,车联网,智能家居,智慧城市,电力,能源等市场。
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)或者(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
a)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
b)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
MQTT官网:http://mqtt.org/
MQTT介绍:http://www.ibm.com
Paho Android客户端页面:https://www.eclipse.org/paho/clients/android/
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html
Eclipse Paho:是Eclipse提供的一个访问MQTT服务器的一种开源客户端库。类似的框架还有Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等;目前主流是Paho;
还有很多语言的实现: http://mqtt.org/software
Paho我的理解就是基于MQTT协议,封装的一个框架;就像Okhttp和http的区别一样;
1,Topic:中文翻译是"话题"或者说"主题"。可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);在MQTT中订阅了(subscribe)同一话题(topic)的客户端会同时收到消息推送。类似“群聊”功能。
2,KeepAlive:"临终遗嘱"信息,该协议提供了检测方式,利用KeepAlive机制在客户端异常断开时发现问题。因此当客户端电量耗尽、崩溃或者网络断开时,消息代理会采取相应措施。
客户端会向任意点的消息代理发送“临终遗嘱”(LWT)信息,当消息代理检测到客户端离线(连接并未关闭),就会发送保存在特定主题上的 LWT 信息,让其它客户端知道该节点已经意外离线。
3,retained:要保留最后的断开连接信息。
4,Qos:服务质量。
qoS Level 0:至多一次
这是最简单的级别,无需客户端确认,其可靠性与基础网络层 TCP/IP 一致。
qoS Level 1:至少一次,有可能重复
确保至少向客户端发送一次信息,不过也可发送多次;在接收数据包时,需要客户端返回确认消息(ACK 包)。这种方式常用于传递确保交付的信息,但开发人员必须确保其系统可以处理重复的数据包。
qoS Level 2:只有一次,确保消息只到达一次
这是最不常见的服务质量级别,确保消息发送且仅发送一次。这种方法需要交换4个数据包,同时也会降低消息代理的性能。由于相对比较复杂,在 MQTT 实现中通常会忽略这个级别,请确保在选择数据库或消息代理前检查这个问题。Connect:等待与服务器建立连接。
5,MqttAndroidClient#subscribe():订阅某个话题。
6,MqttAndroidClient#publish():向某个话题发送消息,之后服务器会推送给所有订阅了此话题的客户。也可以向自己发送消息;
7,clientId:客户身份唯一标识。
8,userName:连接到MQTT服务器的用户名。
9,passWord :连接到MQTT服务器的密码。
10,UnSubscribe:等待服务器取消客户端的一个或多个topics订阅。
11,Disconnect:等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
以下Android客户端是基于Eclipse Paho客户端开源库实现;
1,添加权限
- <uses-permission android:name="android.permission.INTERNET" />
- <uses-permission android:name="android.permission.WAKE_LOCK" />
- <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
2,项目下的bulid.gradle配置
- buildscript {
- repositories {
- //....
- maven {
- url "https://repo.eclipse.org/content/repositories/paho-releases/"
- }
- //.....
- }
- }
3,添加Paho依赖
在module目录下的build.gradle中添加:
- implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
- implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
4,清单文件AndroidManifest.xml中注册Paho中的服务
<service android:name="org.eclipse.paho.android.service.MqttService" />
以上几步是使用Paho客户端前的基本配置;以下就是Paho 的Android客户端的具体应用了;既然是推送,一般是在后台运行,所以使用service创建MQTT客户端比较合适;
5,自定义Service,名字为MQTTService
-
- import android.app.Service;
- import android.content.Context;
- import android.content.Intent;
- import android.net.ConnectivityManager;
- import android.net.NetworkInfo;
- import android.os.Binder;
- import android.os.IBinder;
- import android.util.Log;
-
-
- import org.eclipse.paho.android.service.MqttAndroidClient;
- import org.eclipse.paho.client.mqttv3.IMqttActionListener;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.IMqttToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
-
- public class MQTTService extends Service {
- public static final String TAG = MQTTService.class.getSimpleName();
- public static final String SN = "device_sn";
-
- private static MqttAndroidClient client;
- private MqttConnectOptions conOpt;
-
- private String host = "自己服务器的地址";
- private String userName = "用户名";
- private String passWord = "密码";
- private static String mTopic = "1020304050"; //要订阅的主题
- private String clientId = mTopic + "b"; //客户端标识
- private IGetMessageCallBack iGetMessageCallBack;
-
- private static final Integer qos = 2;
-
- @Override
- public void onCreate() {
- super.onCreate();
- Log.e(getClass().getName(), "onCreate");
- init();
- }
-
- public static void publish(String msg) {
- String topic = mTopic;
- Boolean retained = false;
- try {
- if (client != null) {
- client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
- }
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
-
- private void init() {
- String sn = (String) SPUtils.get(SN, "");
- if (sn != null && !sn.isEmpty()) {
- startConnect(sn);
- } else {
- if (Device.getInstance().getSn() == null) {
- //TODO topic写死
- startConnect("kanisa_001");
- SPUtils.put(SN, "kanisa_001");
- }else {
- startConnect(Device.getInstance().getSn());
- SPUtils.put(SN, Device.getInstance().getSn());
- }
- }
- }
-
-
- private void startConnect(String sn) {
- Log.d(TAG, "设备号:" + sn);
- mTopic = sn;
- // 服务器地址(协议+地址+端口号)
- String uri = host;
- client = new MqttAndroidClient(KApp.getInstance().getApplicationContext(), uri, clientId);
-
- // 设置MQTT监听并且接受消息
- client.setCallback(mqttCallback);
-
- conOpt = new MqttConnectOptions();
- // 清除缓存
- conOpt.setCleanSession(true);
- // 设置超时时间,单位:秒
- conOpt.setConnectionTimeout(10);
- // 心跳包发送间隔,单位:秒
- conOpt.setKeepAliveInterval(20);
- // 用户名
- conOpt.setUserName(userName);
- // 密码
- conOpt.setPassword(passWord.toCharArray()); //将字符串转换为字符串数组
- //设置断开后重新连接
- conOpt.setAutomaticReconnect(true);
- // last will message
- boolean doConnect = true;
- String message = "{\"terminal_uid\":\"" + clientId + "\"}";
- Log.e(getClass().getName(), "message是:" + message + " myTopic " + mTopic);
- String topic = mTopic;
- Boolean retained = false;
- if ((!message.equals("")) || (!topic.equals(""))) {
- // 最后的遗嘱
- // MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。
- //当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
- //当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
- try {
- conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
- } catch (Exception e) {
- Log.i(TAG, "Exception Occured", e);
- doConnect = false;
- iMqttActionListener.onFailure(null, e);
- }
- }
-
- if (doConnect) {
- doClientConnection();
- }
- }
-
- @Override
- public boolean onUnbind(Intent intent) {
- client.unregisterResources();
- return super.onUnbind(intent);
- }
-
- @Override
- public void onDestroy() {
- stopSelf();
- try {
- if (client != null)
- client.disconnect();
-
- } catch (MqttException e) {
- e.printStackTrace();
- }
- super.onDestroy();
- }
-
-
- /**
- * 连接MQTT服务器
- */
- private void doClientConnection() {
- if (!client.isConnected() && isConnectIsNormal()) {
- try {
- client.connect(conOpt, null, iMqttActionListener);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
-
- }
-
- // MQTT是否连接成功
- private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
-
- @Override
- public void onSuccess(IMqttToken arg0) {
- Log.i(TAG, "连接成功 ");
- try {
- // 订阅myTopic话题
- client.subscribe(mTopic, 1);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onFailure(IMqttToken arg0, Throwable arg1) {
- arg1.printStackTrace();
- // 连接失败,重连
- Log.d(TAG, "连接失败");
- }
- };
-
- // MQTT监听并且接受消息
- private MqttCallback mqttCallback = new MqttCallback() {
-
- @Override
- public void messageArrived(String topic, MqttMessage message) {
- String str1 = new String(message.getPayload());
- if (iGetMessageCallBack != null) {
- iGetMessageCallBack.setMessage(str1);
- }
- String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
- Log.i(TAG, "messageArrived:" + str1);
- Log.i(TAG, str2);
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken arg0) {
-
- }
-
- @Override
- public void connectionLost(Throwable arg0) {
- // 失去连接,重连
- }
- };
-
- /**
- * 判断网络是否连接
- */
- private boolean isConnectIsNormal() {
- ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
- .getSystemService(Context.CONNECTIVITY_SERVICE);
- NetworkInfo info = connectivityManager.getActiveNetworkInfo();
- if (info != null && info.isAvailable()) {
- String name = info.getTypeName();
- Log.i(TAG, "MQTT当前网络名称:" + name);
- return true;
- } else {
- Log.i(TAG, "MQTT 没有可用网络");
- return false;
- }
- }
-
-
- @Override
- public IBinder onBind(Intent intent) {
- Log.e(getClass().getName(), "onBind");
- return new CustomBinder();
- }
-
- public class CustomBinder extends Binder {
- public MQTTService getService() {
- return MQTTService.this;
- }
- }
-
- public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
- this.iGetMessageCallBack = iGetMessageCallBack;
- }
- }
IGetMessageCallBack 接口
- public interface IGetMessageCallBack {
- void setMessage(String message);
- }
MqttServiceConnection类
- import android.content.ComponentName;
- import android.content.ServiceConnection;
- import android.os.IBinder;
-
- public class MqttServiceConnection implements ServiceConnection {
-
- private MQTTService mqttService;
- private IGetMessageCallBack iGetMessageCallBack;
-
- @Override
- public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
- mqttService = ((MQTTService.CustomBinder) iBinder).getService();
- mqttService.setIGetMessageCallBack(iGetMessageCallBack);
- }
-
- @Override
- public void onServiceDisconnected(ComponentName componentName) {
-
- }
-
- public MQTTService getMqttService() {
- return mqttService;
- }
-
- public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
- this.iGetMessageCallBack = iGetMessageCallBack;
- }
- }
6,清单文件AndroidManifest.xml中配置服务
- <service
- android:name=".MQTTService"
- android:enabled="true"
- android:exported="true"/>
7,最后绑定服务即可;MQTT收到数据,通过注册回调的方式返回给Activity;
-
- import androidx.appcompat.app.AppCompatActivity;
-
- import android.content.Context;
- import android.content.Intent;
- import android.os.Bundle;
- import android.util.Log;
-
-
- public class MainActivity extends AppCompatActivity implements IGetMessageCallBack {
- private static final String TAG = MainActivity.class.getSimpleName();
-
- private MqttServiceConnection serviceConnection;
-
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_main1);
- initAndroidMQTT();
- }
-
- private void initAndroidMQTT() {
- serviceConnection = new MqttServiceConnection();
- serviceConnection.setIGetMessageCallBack(this);
- //用Intent方式创建并启用Service
- Intent intent = new Intent(this, MQTTService.class);
- bindService(intent, serviceConnection, Context.BIND_AUTO_CREATE);
- }
-
- @Override
- public void setMessage(String message) {
- Log.d(TAG, "收到的推送数据:" + message);
- }
- }
8,收到推送数据,可以不用通过回调的方式返回,可以通过广播的方式把数据转发到需要的地方;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。