赞
踩
大家好,我是一个心中有小宇宙,有很多很多事想去实现的程序员,之前想过去写博客,冥冥之中不知道如何下手,这几天公司开发的项目用到了MQTT这个东西,一想赶紧网上了解,查阅了大量的资料,文档,还是碰到了很多坑,第一,网上的文章单篇写的不全面,还是需要整合很多文章在一起才能解决。第二,时间不是很新的,有些2016、2017年,甚至2018的文章还是用的2017年的技术。于是我便迫不及待的写了本文。
MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案。
MQTT官网:http://mqtt.org/
MQTT介绍:http://www.ibm.com
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,协议具有许多不同的功能:
注意:一定要添加最新的依赖包,不知道的可以去官网查。
repositories {
maven {
url “https://repo.eclipse.org/content/repositories/paho-releases/”
}
}
dependencies {
implementation ‘org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0’
implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’
}
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<service android:name="org.eclipse.paho.android.service.MqttService" />
<service android:name="com.smarthome.com.app.MQTTService"/>
<receiver
android:name="com.smarthome.com.app.MyReceiver"
android:enabled="true"
android:exported="false"
>
<intent-filter>
<category android:name="com.smarthome.com" />
<action android:name="android.intent.action.BOOT_COMPLETED" />
<action android:name="android.intent.action.USER_PRESENT" />
<action android:name="com.example.androidtest.receiver" />
</intent-filter>
</receiver>
/** * Created by 朱大大 * QQ:941556675 */ public class MQTTService extends Service { public static final String TAG = "MQTTService"; private static MqttAndroidClient client; private MqttConnectOptions conOpt; private String host = "xxxxxxxxxxx"; private String userName = "你的用户名"; private String passWord ="你的密码"; private String clientId ="唯一clientId"; private MyReceiver myReceiver; private static boolean isCloseService=false; @Override public void onCreate() { super.onCreate(); } @Override public int onStartCommand(Intent intent, int flags, int startId) { init(); return super.onStartCommand(intent, flags, startId); } public static void publish(String msg,String topic){ // String topic = myTopic; Integer qos = 0; Boolean retained = false; try { client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue()); } catch (MqttException e) { e.printStackTrace(); } } public static boolean subscribe(String[] topicName, int qos[]){ boolean flag = false; if (client != null && client.isConnected()) { try { client.subscribe(topicName, qos, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Log.e("Subscribed","Subscribed"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Log.e("Failed to subscribe","Failed to subscribe"); } }); flag = true; } catch (MqttException e) { e.printStackTrace(); } } else { } return flag; } private void init() { // 服务器地址(协议+地址+端口号) String uri = host; client = new MqttAndroidClient(getApplicationContext(), uri, clientId); // 设置MQTT监听并且接受消息 client.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { //断开连接必须重新订阅才能收到消息 if(reconnect){ //这里是发送消息去重新订阅 MQTTConMessage msg = new MQTTConMessage(); msg.setMessage("connect"); EventBus.getDefault().postSticky(msg); } } @Override public void connectionLost(Throwable cause) { Log.e(TAG,"连接失败,重连"); //重连处理 doClientConnection(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained(); Log.e(TAG, str2); String str1 = new String(message.getPayload()); Log.e(TAG, "收到消息:" + str1); MQTTMessage msg = new MQTTMessage(); msg.setMessage(str1); msg.setTopic(topic); Log.e("主题", topic); EventBus.getDefault().postSticky(msg); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); conOpt = new MqttConnectOptions(); conOpt.setAutomaticReconnect(true); // 清除缓存 conOpt.setCleanSession(true); // 设置超时时间,单位:秒 conOpt.setConnectionTimeout(60); // 心跳包发送间隔,单位:秒 conOpt.setKeepAliveInterval(5); // 用户名 conOpt.setUserName(userName); // 密码 conOpt.setPassword(passWord.toCharArray()); myReceiver = new MyReceiver(); IntentFilter filter = new IntentFilter(); filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION); registerReceiver(myReceiver, filter); } @Override public void onDestroy() { super.onDestroy(); unregisterReceiver(myReceiver); try { if(client!=null){ client.disconnect(); client.unregisterResources(); } } catch (MqttException e) { e.printStackTrace(); } if(isCloseService){ isCloseService=false; } else{ //服务停止,重新开启服务。 stopForeground(true); Intent intent = new Intent("com.example.androidtest.receiver"); sendBroadcast(intent); } } public static void closeConnect(){ isCloseService=true; } /** 连接MQTT服务器 */ private void doClientConnection() { if (!client.isConnected() && isConnectIsNomarl()) { try { client.connect(conOpt, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { MQTTConMessage msg = new MQTTConMessage(); msg.setMessage("connect"); Log.e(TAG, "连接成功 "); EventBus.getDefault().postSticky(msg); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable arg1) { Log.e("arg1",arg1+""); arg1.printStackTrace(); MQTTConMessage msg = new MQTTConMessage(); msg.setMessage("disconnect"); EventBus.getDefault().postSticky(msg); Log.e(TAG, "连接失败,重连"); } }); } catch (MqttException e) { e.printStackTrace(); } } } /** 判断网络是否连接 */ private boolean isConnectIsNomarl() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isConnected()) { String name = info.getTypeName(); Log.e(TAG, "MQTT当前网络名称:" + name); return true; } else { Log.e(TAG, "MQTT 没有可用网络"); return false; } } @Nullable @Override public IBinder onBind(Intent intent) { return null; } private class MyReceiver extends BroadcastReceiver{ @Override public void onReceive(Context context, Intent intent) { if (!isConnectIsNomarl()) { Log.e("网络错误","网络错误"); } else { doClientConnection(); } } } }
/** * Created by 朱大大 * QQ:941556675 */ public class MyReceiver extends BroadcastReceiver { @Override public void onReceive(Context context, Intent intent) { if(intent.getAction().equals("com.example.androidtest.receiver")){ Log.e("MyReceiver","start"); Intent sevice = new Intent(context, MQTTService.class); context.startService(sevice); } } }
/** * Created by 朱大大 * QQ:941556675 */ public class MainActivity extends Activity { private String arr []={"#test"}; private int qos []={0}; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); EventBus.getDefault().register(this); startService(new Intent(this, MQTTService.class)); //一般订阅的内容都是从服务器获取 //这里写获取服务器订阅主题的代码 } @Subscribe(threadMode = ThreadMode.MAIN) public void getMqttMessage(MQTTMessage mqttMessage){ Log.i(MQTTService.TAG,"get message:"+mqttMessage.getMessage()); Toast.makeText(this,mqttMessage.getMessage(),Toast.LENGTH_SHORT).show(); } @Subscribe(threadMode = ThreadMode.MAIN) public void getMqttConMessage(MQTTConMessage isConnection){ //连接成功发送过来的通知,连接成功才能去订阅消息。 if(isConnection.getMessage().equals("connect")){ //订阅消息 //注意:订阅单个主题我不多说,但是如果需要一次订阅多个主题,需要用for循环去订阅多个,这可能是MQTT的bug。 MQTTService.subscribe(arr,qos); } else{ } } @Override protected void onDestroy() { EventBus.getDefault().unregister(this); super.onDestroy(); } }
public class MQTTConMessage { private String message; private String topic; public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
注意:
如果需要完全退出APP且不收到消息用
MQTTService.closeConnect()
Intent intent = new Intent(getApplicationContext(),MQTTService.class);
stopService(intent);
1.代码中用到了Service,所以服务就有可能会自动停止,所以在 onDestroy()销毁时又开启服务。保证了服务不再被停止运行从而保证MQTT能正常接收到消息。
2.考虑到用户使用过程中可能会突然断开网络,这时候要对监听网络的断开和重连,重连过程中发现,会不断的重连,导致崩溃,实际上已经连上服务器了,出现这样的原因是,重连了要重新订阅才能收到消息,在connectComplete方法中重新订阅。
3.注意new MqttAndroidClient(getApplicationContext(), uri, clientId); 这里上下文对象要是getApplicationContext(),如果写成this的话也会经常出现断连,这个就更生命周期有关了。
第一次写博客好人做到底,赠上我的demo地址另外看完之后如有不懂的地方请在下方评论,我看到会及时回复。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。