当前位置:   article > 正文

Android MQTT实现消息推送【Paho】_android mqtt 推送

android mqtt 推送

简介

MQTT:全拼是Message Queuing Telemetry Transport,即:消息队列遥测传输协议;

MQTT是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议族上,由IBM在1999年发布。主要用于针对物联网应用中低宽带和网络环境不是很稳定的场景。比如智能硬件,车联网,智能家居,智慧城市,电力,能源等市场。

特点

1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制

MQTT协议实现通信

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)或者(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
    a)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
    b)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

官方文档

MQTT官网:http://mqtt.org/
MQTT介绍:http://www.ibm.com
Paho Android客户端页面:https://www.eclipse.org/paho/clients/android/
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html

Eclipse Paho

Eclipse Paho:是Eclipse提供的一个访问MQTT服务器的一种开源客户端库。类似的框架还有Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等;目前主流是Paho;

还有很多语言的实现: http://mqtt.org/software

Paho和MQTT协议的区别

Paho我的理解就是基于MQTT协议,封装的一个框架;就像Okhttp和http的区别一样;

类属性方法的解释

1,Topic:中文翻译是"话题"或者说"主题"。可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);在MQTT中订阅了(subscribe)同一话题(topic)的客户端会同时收到消息推送。类似“群聊”功能。
2,KeepAlive:"临终遗嘱"信息,该协议提供了检测方式,利用KeepAlive机制在客户端异常断开时发现问题。因此当客户端电量耗尽、崩溃或者网络断开时,消息代理会采取相应措施。
客户端会向任意点的消息代理发送“临终遗嘱”(LWT)信息,当消息代理检测到客户端离线(连接并未关闭),就会发送保存在特定主题上的 LWT 信息,让其它客户端知道该节点已经意外离线。
3,retained:要保留最后的断开连接信息。
4,Qos:服务质量。
      qoS Level 0:至多一次
             这是最简单的级别,无需客户端确认,其可靠性与基础网络层 TCP/IP 一致。
      qoS Level 1:至少一次,有可能重复
             确保至少向客户端发送一次信息,不过也可发送多次;在接收数据包时,需要客户端返回确认消息(ACK 包)。这种方式常用于传递确保交付的信息,但开发人员必须确保其系统可以处理重复的数据包。
      qoS Level 2:只有一次,确保消息只到达一次
            这是最不常见的服务质量级别,确保消息发送且仅发送一次。这种方法需要交换4个数据包,同时也会降低消息代理的性能。由于相对比较复杂,在 MQTT 实现中通常会忽略这个级别,请确保在选择数据库或消息代理前检查这个问题。Connect:等待与服务器建立连接。
5,MqttAndroidClient#subscribe():订阅某个话题。
6,MqttAndroidClient#publish():向某个话题发送消息,之后服务器会推送给所有订阅了此话题的客户。也可以向自己发送消息;
7,clientId:客户身份唯一标识。
8,userName:连接到MQTT服务器的用户名。
9,passWord :连接到MQTT服务器的密码。
10,UnSubscribe:等待服务器取消客户端的一个或多个topics订阅。
11,Disconnect:等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。

Android客户端

以下Android客户端是基于Eclipse Paho客户端开源库实现;

1,添加权限

  1. <uses-permission android:name="android.permission.INTERNET" />
  2. <uses-permission android:name="android.permission.WAKE_LOCK" />
  3. <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

2,项目下的bulid.gradle配置

  1. buildscript {
  2. repositories {
  3. //....
  4. maven {
  5. url "https://repo.eclipse.org/content/repositories/paho-releases/"
  6. }
  7. //.....
  8. }
  9. }

3,添加Paho依赖

在module目录下的build.gradle中添加:

  1. implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
  2. implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

4,清单文件AndroidManifest.xml中注册Paho中的服务

<service android:name="org.eclipse.paho.android.service.MqttService" />

以上几步是使用Paho客户端前的基本配置;以下就是Paho 的Android客户端的具体应用了;既然是推送,一般是在后台运行,所以使用service创建MQTT客户端比较合适;

5,自定义Service,名字为MQTTService

  1. import android.app.Service;
  2. import android.content.Context;
  3. import android.content.Intent;
  4. import android.net.ConnectivityManager;
  5. import android.net.NetworkInfo;
  6. import android.os.Binder;
  7. import android.os.IBinder;
  8. import android.util.Log;
  9. import org.eclipse.paho.android.service.MqttAndroidClient;
  10. import org.eclipse.paho.client.mqttv3.IMqttActionListener;
  11. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  12. import org.eclipse.paho.client.mqttv3.IMqttToken;
  13. import org.eclipse.paho.client.mqttv3.MqttCallback;
  14. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  15. import org.eclipse.paho.client.mqttv3.MqttException;
  16. import org.eclipse.paho.client.mqttv3.MqttMessage;
  17. public class MQTTService extends Service {
  18. public static final String TAG = MQTTService.class.getSimpleName();
  19. public static final String SN = "device_sn";
  20. private static MqttAndroidClient client;
  21. private MqttConnectOptions conOpt;
  22. private String host = "自己服务器的地址";
  23. private String userName = "用户名";
  24. private String passWord = "密码";
  25. private static String mTopic = "1020304050"; //要订阅的主题
  26. private String clientId = mTopic + "b"; //客户端标识
  27. private IGetMessageCallBack iGetMessageCallBack;
  28. private static final Integer qos = 2;
  29. @Override
  30. public void onCreate() {
  31. super.onCreate();
  32. Log.e(getClass().getName(), "onCreate");
  33. init();
  34. }
  35. public static void publish(String msg) {
  36. String topic = mTopic;
  37. Boolean retained = false;
  38. try {
  39. if (client != null) {
  40. client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
  41. }
  42. } catch (MqttException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. private void init() {
  47. String sn = (String) SPUtils.get(SN, "");
  48. if (sn != null && !sn.isEmpty()) {
  49. startConnect(sn);
  50. } else {
  51. if (Device.getInstance().getSn() == null) {
  52. //TODO topic写死
  53. startConnect("kanisa_001");
  54. SPUtils.put(SN, "kanisa_001");
  55. }else {
  56. startConnect(Device.getInstance().getSn());
  57. SPUtils.put(SN, Device.getInstance().getSn());
  58. }
  59. }
  60. }
  61. private void startConnect(String sn) {
  62. Log.d(TAG, "设备号:" + sn);
  63. mTopic = sn;
  64. // 服务器地址(协议+地址+端口号)
  65. String uri = host;
  66. client = new MqttAndroidClient(KApp.getInstance().getApplicationContext(), uri, clientId);
  67. // 设置MQTT监听并且接受消息
  68. client.setCallback(mqttCallback);
  69. conOpt = new MqttConnectOptions();
  70. // 清除缓存
  71. conOpt.setCleanSession(true);
  72. // 设置超时时间,单位:秒
  73. conOpt.setConnectionTimeout(10);
  74. // 心跳包发送间隔,单位:秒
  75. conOpt.setKeepAliveInterval(20);
  76. // 用户名
  77. conOpt.setUserName(userName);
  78. // 密码
  79. conOpt.setPassword(passWord.toCharArray()); //将字符串转换为字符串数组
  80. //设置断开后重新连接
  81. conOpt.setAutomaticReconnect(true);
  82. // last will message
  83. boolean doConnect = true;
  84. String message = "{\"terminal_uid\":\"" + clientId + "\"}";
  85. Log.e(getClass().getName(), "message是:" + message + " myTopic " + mTopic);
  86. String topic = mTopic;
  87. Boolean retained = false;
  88. if ((!message.equals("")) || (!topic.equals(""))) {
  89. // 最后的遗嘱
  90. // MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。
  91. //当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
  92. //当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
  93. try {
  94. conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  95. } catch (Exception e) {
  96. Log.i(TAG, "Exception Occured", e);
  97. doConnect = false;
  98. iMqttActionListener.onFailure(null, e);
  99. }
  100. }
  101. if (doConnect) {
  102. doClientConnection();
  103. }
  104. }
  105. @Override
  106. public boolean onUnbind(Intent intent) {
  107. client.unregisterResources();
  108. return super.onUnbind(intent);
  109. }
  110. @Override
  111. public void onDestroy() {
  112. stopSelf();
  113. try {
  114. if (client != null)
  115. client.disconnect();
  116. } catch (MqttException e) {
  117. e.printStackTrace();
  118. }
  119. super.onDestroy();
  120. }
  121. /**
  122. * 连接MQTT服务器
  123. */
  124. private void doClientConnection() {
  125. if (!client.isConnected() && isConnectIsNormal()) {
  126. try {
  127. client.connect(conOpt, null, iMqttActionListener);
  128. } catch (MqttException e) {
  129. e.printStackTrace();
  130. }
  131. }
  132. }
  133. // MQTT是否连接成功
  134. private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
  135. @Override
  136. public void onSuccess(IMqttToken arg0) {
  137. Log.i(TAG, "连接成功 ");
  138. try {
  139. // 订阅myTopic话题
  140. client.subscribe(mTopic, 1);
  141. } catch (MqttException e) {
  142. e.printStackTrace();
  143. }
  144. }
  145. @Override
  146. public void onFailure(IMqttToken arg0, Throwable arg1) {
  147. arg1.printStackTrace();
  148. // 连接失败,重连
  149. Log.d(TAG, "连接失败");
  150. }
  151. };
  152. // MQTT监听并且接受消息
  153. private MqttCallback mqttCallback = new MqttCallback() {
  154. @Override
  155. public void messageArrived(String topic, MqttMessage message) {
  156. String str1 = new String(message.getPayload());
  157. if (iGetMessageCallBack != null) {
  158. iGetMessageCallBack.setMessage(str1);
  159. }
  160. String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
  161. Log.i(TAG, "messageArrived:" + str1);
  162. Log.i(TAG, str2);
  163. }
  164. @Override
  165. public void deliveryComplete(IMqttDeliveryToken arg0) {
  166. }
  167. @Override
  168. public void connectionLost(Throwable arg0) {
  169. // 失去连接,重连
  170. }
  171. };
  172. /**
  173. * 判断网络是否连接
  174. */
  175. private boolean isConnectIsNormal() {
  176. ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
  177. .getSystemService(Context.CONNECTIVITY_SERVICE);
  178. NetworkInfo info = connectivityManager.getActiveNetworkInfo();
  179. if (info != null && info.isAvailable()) {
  180. String name = info.getTypeName();
  181. Log.i(TAG, "MQTT当前网络名称:" + name);
  182. return true;
  183. } else {
  184. Log.i(TAG, "MQTT 没有可用网络");
  185. return false;
  186. }
  187. }
  188. @Override
  189. public IBinder onBind(Intent intent) {
  190. Log.e(getClass().getName(), "onBind");
  191. return new CustomBinder();
  192. }
  193. public class CustomBinder extends Binder {
  194. public MQTTService getService() {
  195. return MQTTService.this;
  196. }
  197. }
  198. public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
  199. this.iGetMessageCallBack = iGetMessageCallBack;
  200. }
  201. }

IGetMessageCallBack 接口 

  1. public interface IGetMessageCallBack {
  2. void setMessage(String message);
  3. }

MqttServiceConnection类 

  1. import android.content.ComponentName;
  2. import android.content.ServiceConnection;
  3. import android.os.IBinder;
  4. public class MqttServiceConnection implements ServiceConnection {
  5. private MQTTService mqttService;
  6. private IGetMessageCallBack iGetMessageCallBack;
  7. @Override
  8. public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
  9. mqttService = ((MQTTService.CustomBinder) iBinder).getService();
  10. mqttService.setIGetMessageCallBack(iGetMessageCallBack);
  11. }
  12. @Override
  13. public void onServiceDisconnected(ComponentName componentName) {
  14. }
  15. public MQTTService getMqttService() {
  16. return mqttService;
  17. }
  18. public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
  19. this.iGetMessageCallBack = iGetMessageCallBack;
  20. }
  21. }

6,清单文件AndroidManifest.xml中配置服务

  1. <service
  2. android:name=".MQTTService"
  3. android:enabled="true"
  4. android:exported="true"/>

7,最后绑定服务即可;MQTT收到数据,通过注册回调的方式返回给Activity;

  1. import androidx.appcompat.app.AppCompatActivity;
  2. import android.content.Context;
  3. import android.content.Intent;
  4. import android.os.Bundle;
  5. import android.util.Log;
  6. public class MainActivity extends AppCompatActivity implements IGetMessageCallBack {
  7. private static final String TAG = MainActivity.class.getSimpleName();
  8. private MqttServiceConnection serviceConnection;
  9. @Override
  10. protected void onCreate(Bundle savedInstanceState) {
  11. super.onCreate(savedInstanceState);
  12. setContentView(R.layout.activity_main1);
  13. initAndroidMQTT();
  14. }
  15. private void initAndroidMQTT() {
  16. serviceConnection = new MqttServiceConnection();
  17. serviceConnection.setIGetMessageCallBack(this);
  18. //用Intent方式创建并启用Service
  19. Intent intent = new Intent(this, MQTTService.class);
  20. bindService(intent, serviceConnection, Context.BIND_AUTO_CREATE);
  21. }
  22. @Override
  23. public void setMessage(String message) {
  24. Log.d(TAG, "收到的推送数据:" + message);
  25. }
  26. }

8,收到推送数据,可以不用通过回调的方式返回,可以通过广播的方式把数据转发到需要的地方;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/601501
推荐阅读
相关标签
  

闽ICP备14008679号