赞
踩
先上地址 :
在module下的build.gradle文件内的dependencies里面添加下面远程库,Androidstudio3.0以及以上的要用
implementation代替compile
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
项目结构下的build.gradle要添加maven仓库
- maven {
- url "https://repo.eclipse.org/content/repositories/paho-releases/"
- }
由于方便在项目中的使用,小编在这里把mqtt连接做成一个单例的管理类形式,回调信息和心跳处理也做了单独的处理
实现原理:1、先登录mqtt服务器;2、想要接收哪个主题下的消息,就要先订阅哪个主题;3、想发送给某个设备,就要把对方订阅的主题当做自己的发送主题,把数据发送给对方;整个连接数据收发大概就是这个样子,也可以自己加入断开重连,检测断开机制!
上一张项目目录图
好了废话不多说,贴代码
MqttManager管理类
- public class MqttManager {
-
- public static String TAG = MqttManager.class.getSimpleName();
- private static volatile MqttManager mInstance = null;
- private static MqttHeartBeat heartBeat = null;//心跳处理类,此心跳仅限于和另一端保持的一个心跳机制,并非与mqtt服务器之间的一个心跳,mqtt客户端与服务器之间自带心跳
-
- private MqttConnectOptions conOpt = null;
- private MqttClient client = null;
-
- public static MqttManager getInstance() {
- if (mInstance == null) {
- synchronized (MqttManager.class) {
- if (mInstance == null) {
- mInstance = new MqttManager();
- }
- }
- }
- return mInstance;
- }
-
- /**
- * 创建连接
- *
- * @param clientId 设备ID
- * @return 连接成功true 失败false
- */
- public boolean createConnect(String clientId) {
- boolean flag = false;
- String tmpDir = System.getProperty("java.io.tmpdir");
- MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
- try {
- conOpt = new MqttConnectOptions();
- conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
- conOpt.setCleanSession(true);
- conOpt.setPassword(MqttApi.getPassword().toCharArray());
- conOpt.setUserName(MqttApi.getName());
- conOpt.setConnectionTimeout(30); //设置连接超时
- conOpt.setKeepAliveInterval(30); //设置连接空闲最大时长 单位:秒
- client = new MqttClient(MqttApi.getMqtt_Url(), clientId, dataStore);
- client.setCallback(new MqttCallBack());
- if (client != null) {
- try {
- client.connect(conOpt);
- flag = true;
- } catch (Exception e) {
- flag = false;
- }
- }
- } catch (MqttException e) {
- LogUtils.e(TAG, "createConnect:MqttException: " + e.toString());
- }
- return flag;
- }
-
- /**
- * 连接服务端
- */
- public void connect() {
- EventBus.getDefault().post(new MqttMainEvent("onlinestatus","离线"));
- new Thread(new Runnable() {
- @Override
- public void run() {
- boolean isflag = createConnect(MqttApi.getClintId());
- if (isflag) {
- LogUtils.e(TAG, "连接成功...");
- isflag = subScribe();//开始订阅
- if (!isflag) {
- LogUtils.e(TAG, "订阅失败...");
- } else {
- LogUtils.e(TAG, "订阅成功...");
- while (true) {
- if (!MqttApi.isBoolean) {
- EventBus.getDefault().post(new LoginEvent("登录失败"));
- EventBus.getDefault().post(new MqttMainEvent("login", "登录失败"));
- LogUtils.e(TAG, "准备登录..............");
- boolean isPublish = publish(1, JsonPut.putLogin());//发布登录
- if (isPublish) {
- LogUtils.e(TAG, "登录成功...");
- } else {
- LogUtils.e(TAG, "登录异常...");
- connect();
- break;
- }
- } else {
- if (heartBeat == null) {
- heartBeat = new MqttHeartBeat();
- }
- LogUtils.e(TAG, "=======================登录成功,退出循环=====================");
- startReconnect();
- break;
- }
- SystemClock.sleep(10 * 1000);
- }
- }
- } else {
- LogUtils.e(TAG, "连接失败...");
- SystemClock.sleep(5000);
- connect();
- }
- }
- }).start();
- }
-
- /**
- * 发布数据到服务端
- *
- * @param code 1:登录主题,2:心跳主题,3:业务主题
- * @param data 发布内容
- * @return
- */
- public boolean publish(int code, String data) {
- String topic = "";
- if (code == 1) { //登录
- topic = "/lo";
- } else if (code == 2) { //心跳
- topic = "/he";
- } else if (code == 3) { //业务
- topic = "/bu";
- }
- boolean flag = false;
- if (client != null && client.isConnected()) {
- MqttMessage message = new MqttMessage();
- message.setPayload(data.getBytes());
- message.setQos(2);//@param qos的“服务质量”使用。设为0,1,2
- message.setRetained(false);
- try {
- client.publish(topic, message);
- flag = true;
- } catch (MqttException e) {
- LogUtils.e(TAG, "发送异常..." + e.toString());
- flag = false;
- }
- } else {
- LogUtils.e(TAG, "当前连接已断开...");
- }
- return flag;
- }
-
- /**
- * 订阅主题
- */
- public boolean subScribe() {
- boolean flag = false;
- if (client != null && client.isConnected()) {
- try {
- client.subscribe(MqttApi.getTopicSubscribe(), 2);
- flag = true;
- } catch (MqttException e) {
- flag = false;
- LogUtils.e(TAG, "订阅异常:" + e.toString());
- }
- }
- return flag;
- }
-
- /**
- * 断开连接
- */
- public void disConnect() throws MqttException {
- MqttApi.isBoolean = false;
- conOpt = null;
-
- if (client != null) {
- client.disconnect();
- client = null;
- }
- LogUtils.e(TAG, "断开连接");
- }
-
- /**
- * 释放单例, 及其所引用的资源
- */
- public static void release() throws Exception {
- if (mInstance != null) {
- mInstance.disConnect();
- mInstance = null;
- heartBeat.disConnect();
- heartBeat = null;
- }
- }
- }
接口回调类MqttCallBack
- public class MqttCallBack implements MqttCallback {
-
- public MqttCallBack() {}
-
- /**
- * 连接异常断开回调此方法,可在此方法内做重连机制
- */
- @Override
- public void connectionLost(Throwable cause) {
- /*SystemClock.sleep(10000);
- MqttManager.getInstance().connect();*/
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- String data = new String(message.getPayload(), "UTF-8");
- Object object = JSON.parse(data);
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- LogUtils.d(getClass().getSimpleName(), "token:" + token.toString());
- }
- }
常量类MqttApi
- public class MqttApi {
-
- private static boolean DEBUG = true;
-
- /**
- * 服务端IP
- */
- public static String getMqtt_Url() {
- if (DEBUG) {
- return "tcp://mqtt.inks:1883";
- }
- return "tcp://192.168.1.123:61613";
- }
-
- /**
- * 用户名
- */
- public static String getName() {
- if (DEBUG) {
- return "admin";//testuser
- }
- return "admin";
- }
-
- /**
- * 密码
- */
- public static String getPassword() {
- if (DEBUG) {
- return "shmqttfree";//sha256
- }
- return "password";
- }
-
- /**
- * 订阅主题
- * app接收这个主题的消息
- */
- public static String getTopicSubscribe() {
- if (DEBUG) {
- return "0000000000000";
- }
- return "111111111111111111";
- }
-
- /**
- * 发布主题
- * app发布消息的主题
- */
- public static String getTopicPublisher(String topic) {
- if (DEBUG) {
- return "/" + topic;
- }
- return "/" + topic;
- }
-
- /**
- * 设备ID
- *
- * @return
- */
- public static String getClintId() {
- return "00000000";
- }
-
- /**
- * 标记是否登录
- **/
- public static volatile boolean isBoolean = false;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。