当前位置:   article > 正文

Android paho Mqtt物联网网络框架_adroid mqtt 单例

adroid mqtt 单例

先上地址 :  

module下的build.gradle文件内的dependencies里面添加下面远程库,Androidstudio3.0以及以上的要用

implementation代替compile
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'

项目结构下的build.gradle要添加maven仓库

  1. maven {
  2. url "https://repo.eclipse.org/content/repositories/paho-releases/"
  3. }

 

由于方便在项目中的使用,小编在这里把mqtt连接做成一个单例的管理类形式,回调信息和心跳处理也做了单独的处理

实现原理:1、先登录mqtt服务器;2、想要接收哪个主题下的消息,就要先订阅哪个主题;3、想发送给某个设备,就要把对方订阅的主题当做自己的发送主题,把数据发送给对方;整个连接数据收发大概就是这个样子,也可以自己加入断开重连,检测断开机制!

上一张项目目录图

好了废话不多说,贴代码

MqttManager管理类

  1. public class MqttManager {
  2. public static String TAG = MqttManager.class.getSimpleName();
  3. private static volatile MqttManager mInstance = null;
  4. private static MqttHeartBeat heartBeat = null;//心跳处理类,此心跳仅限于和另一端保持的一个心跳机制,并非与mqtt服务器之间的一个心跳,mqtt客户端与服务器之间自带心跳
  5. private MqttConnectOptions conOpt = null;
  6. private MqttClient client = null;
  7. public static MqttManager getInstance() {
  8. if (mInstance == null) {
  9. synchronized (MqttManager.class) {
  10. if (mInstance == null) {
  11. mInstance = new MqttManager();
  12. }
  13. }
  14. }
  15. return mInstance;
  16. }
  17. /**
  18. * 创建连接
  19. *
  20. * @param clientId 设备ID
  21. * @return 连接成功true 失败false
  22. */
  23. public boolean createConnect(String clientId) {
  24. boolean flag = false;
  25. String tmpDir = System.getProperty("java.io.tmpdir");
  26. MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
  27. try {
  28. conOpt = new MqttConnectOptions();
  29. conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
  30. conOpt.setCleanSession(true);
  31. conOpt.setPassword(MqttApi.getPassword().toCharArray());
  32. conOpt.setUserName(MqttApi.getName());
  33. conOpt.setConnectionTimeout(30); //设置连接超时
  34. conOpt.setKeepAliveInterval(30); //设置连接空闲最大时长 单位:秒
  35. client = new MqttClient(MqttApi.getMqtt_Url(), clientId, dataStore);
  36. client.setCallback(new MqttCallBack());
  37. if (client != null) {
  38. try {
  39. client.connect(conOpt);
  40. flag = true;
  41. } catch (Exception e) {
  42. flag = false;
  43. }
  44. }
  45. } catch (MqttException e) {
  46. LogUtils.e(TAG, "createConnect:MqttException: " + e.toString());
  47. }
  48. return flag;
  49. }
  50. /**
  51. * 连接服务端
  52. */
  53. public void connect() {
  54. EventBus.getDefault().post(new MqttMainEvent("onlinestatus","离线"));
  55. new Thread(new Runnable() {
  56. @Override
  57. public void run() {
  58. boolean isflag = createConnect(MqttApi.getClintId());
  59. if (isflag) {
  60. LogUtils.e(TAG, "连接成功...");
  61. isflag = subScribe();//开始订阅
  62. if (!isflag) {
  63. LogUtils.e(TAG, "订阅失败...");
  64. } else {
  65. LogUtils.e(TAG, "订阅成功...");
  66. while (true) {
  67. if (!MqttApi.isBoolean) {
  68. EventBus.getDefault().post(new LoginEvent("登录失败"));
  69. EventBus.getDefault().post(new MqttMainEvent("login", "登录失败"));
  70. LogUtils.e(TAG, "准备登录..............");
  71. boolean isPublish = publish(1, JsonPut.putLogin());//发布登录
  72. if (isPublish) {
  73. LogUtils.e(TAG, "登录成功...");
  74. } else {
  75. LogUtils.e(TAG, "登录异常...");
  76. connect();
  77. break;
  78. }
  79. } else {
  80. if (heartBeat == null) {
  81. heartBeat = new MqttHeartBeat();
  82. }
  83. LogUtils.e(TAG, "=======================登录成功,退出循环=====================");
  84. startReconnect();
  85. break;
  86. }
  87. SystemClock.sleep(10 * 1000);
  88. }
  89. }
  90. } else {
  91. LogUtils.e(TAG, "连接失败...");
  92. SystemClock.sleep(5000);
  93. connect();
  94. }
  95. }
  96. }).start();
  97. }
  98. /**
  99. * 发布数据到服务端
  100. *
  101. * @param code 1:登录主题,2:心跳主题,3:业务主题
  102. * @param data 发布内容
  103. * @return
  104. */
  105. public boolean publish(int code, String data) {
  106. String topic = "";
  107. if (code == 1) { //登录
  108. topic = "/lo";
  109. } else if (code == 2) { //心跳
  110. topic = "/he";
  111. } else if (code == 3) { //业务
  112. topic = "/bu";
  113. }
  114. boolean flag = false;
  115. if (client != null && client.isConnected()) {
  116. MqttMessage message = new MqttMessage();
  117. message.setPayload(data.getBytes());
  118. message.setQos(2);//@param qos的“服务质量”使用。设为0,1,2
  119. message.setRetained(false);
  120. try {
  121. client.publish(topic, message);
  122. flag = true;
  123. } catch (MqttException e) {
  124. LogUtils.e(TAG, "发送异常..." + e.toString());
  125. flag = false;
  126. }
  127. } else {
  128. LogUtils.e(TAG, "当前连接已断开...");
  129. }
  130. return flag;
  131. }
  132. /**
  133. * 订阅主题
  134. */
  135. public boolean subScribe() {
  136. boolean flag = false;
  137. if (client != null && client.isConnected()) {
  138. try {
  139. client.subscribe(MqttApi.getTopicSubscribe(), 2);
  140. flag = true;
  141. } catch (MqttException e) {
  142. flag = false;
  143. LogUtils.e(TAG, "订阅异常:" + e.toString());
  144. }
  145. }
  146. return flag;
  147. }
  148. /**
  149. * 断开连接
  150. */
  151. public void disConnect() throws MqttException {
  152. MqttApi.isBoolean = false;
  153. conOpt = null;
  154. if (client != null) {
  155. client.disconnect();
  156. client = null;
  157. }
  158. LogUtils.e(TAG, "断开连接");
  159. }
  160. /**
  161. * 释放单例, 及其所引用的资源
  162. */
  163. public static void release() throws Exception {
  164. if (mInstance != null) {
  165. mInstance.disConnect();
  166. mInstance = null;
  167. heartBeat.disConnect();
  168. heartBeat = null;
  169. }
  170. }
  171. }

接口回调类MqttCallBack

  1. public class MqttCallBack implements MqttCallback {
  2. public MqttCallBack() {}
  3. /**
  4. * 连接异常断开回调此方法,可在此方法内做重连机制
  5. */
  6. @Override
  7. public void connectionLost(Throwable cause) {
  8. /*SystemClock.sleep(10000);
  9. MqttManager.getInstance().connect();*/
  10. }
  11. @Override
  12. public void messageArrived(String topic, MqttMessage message) throws Exception {
  13. String data = new String(message.getPayload(), "UTF-8");
  14. Object object = JSON.parse(data);
  15. }
  16. @Override
  17. public void deliveryComplete(IMqttDeliveryToken token) {
  18. LogUtils.d(getClass().getSimpleName(), "token:" + token.toString());
  19. }
  20. }

常量类MqttApi

  1. public class MqttApi {
  2. private static boolean DEBUG = true;
  3. /**
  4. * 服务端IP
  5. */
  6. public static String getMqtt_Url() {
  7. if (DEBUG) {
  8. return "tcp://mqtt.inks:1883";
  9. }
  10. return "tcp://192.168.1.123:61613";
  11. }
  12. /**
  13. * 用户名
  14. */
  15. public static String getName() {
  16. if (DEBUG) {
  17. return "admin";//testuser
  18. }
  19. return "admin";
  20. }
  21. /**
  22. * 密码
  23. */
  24. public static String getPassword() {
  25. if (DEBUG) {
  26. return "shmqttfree";//sha256
  27. }
  28. return "password";
  29. }
  30. /**
  31. * 订阅主题
  32. * app接收这个主题的消息
  33. */
  34. public static String getTopicSubscribe() {
  35. if (DEBUG) {
  36. return "0000000000000";
  37. }
  38. return "111111111111111111";
  39. }
  40. /**
  41. * 发布主题
  42. * app发布消息的主题
  43. */
  44. public static String getTopicPublisher(String topic) {
  45. if (DEBUG) {
  46. return "/" + topic;
  47. }
  48. return "/" + topic;
  49. }
  50. /**
  51. * 设备ID
  52. *
  53. * @return
  54. */
  55. public static String getClintId() {
  56. return "00000000";
  57. }
  58. /**
  59. * 标记是否登录
  60. **/
  61. public static volatile boolean isBoolean = false;
  62. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/1007394
推荐阅读
相关标签
  

闽ICP备14008679号