当前位置:   article > 正文

MQTT协议的简单介绍和服务器的安装_mqttdeliveriytoken waitforcomp

mqttdeliveriytoken waitforcomp


最近公司做的项目中有用到消息推送,经过多方面的筛选之后确定了使用MQTT协议,相对于XMPP,MQTT更加轻量级,并且占用用户很少的带宽。

MQTT是IBM推出的一种针对移动终端设备的基于TCP/IP的发布/预订协议,可以连接大量的远程传感器和控制设备。

MQTT的官网见:http://mqtt.org/。其中http://mqtt.org/software里面提供了官方推荐的各种服务器和客户端使用的各种语言版本的API。

下面以服务器Apollo 1.6为例,之前尝试过使用ActiveMQ,效果很不理想,只能实现服务器和客户端一对一的通信,从官网上了解到Apollo属于activemq的一个子工程。先不管这些了,言归正传,以下在windows环境下。

1、在这里下载Apollo服务器,下载后解压,然后运行apache-apollo-1.6\bin\apollo.cmd,输入create mybroker(名字任意取,这里是根据官网介绍的来取的)创建服务器实例,服务器实例包含了所有的配置,运行时数据等,并且和一个服务器进程关联。

2、create mybroker之后会在bin目录下生成mybroker文件夹,里面包含有很多信息,其中etc\apollo.xml文件下是配置服务器信息的文件,etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,后面会介绍,可以修改原始的admin=password,可以接着换行添加新的用户名密码。

3、打开cmd,运行…apache-apollo-1.6\bin\mybroker\bin\apollo-broker.cmd run 开启服务器,可以在浏览器中输入http://127.0.0.1:61680/查看是否安装成功,该界面展示了topic,连接数等很多信息。

经过上面的简单步骤,服务器基本上就已经完成,下一篇将介绍Android客户端的编写和注意事项。

客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的Eclipse Paho,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:

1、新建android工程MQTTClient

2、MainActivity代码如下:

  1. package ldw.mqttclient;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.ScheduledExecutorService;
  4. import java.util.concurrent.TimeUnit;
  5. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  6. import org.eclipse.paho.client.mqttv3.MqttCallback;
  7. import org.eclipse.paho.client.mqttv3.MqttClient;
  8. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  9. import org.eclipse.paho.client.mqttv3.MqttException;
  10. import org.eclipse.paho.client.mqttv3.MqttMessage;
  11. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  12. import android.app.Activity;
  13. import android.os.Bundle;
  14. import android.os.Handler;
  15. import android.os.Message;
  16. import android.view.KeyEvent;
  17. import android.widget.TextView;
  18. import android.widget.Toast;
  19. public class MainActivity extends Activity {
  20. private TextView resultTv;
  21. private String host = "tcp://127.0.0.1:1883";
  22. private String userName = "admin";
  23. private String passWord = "password";
  24. private Handler handler;
  25. private MqttClient client;
  26. private String myTopic = "test/topic";
  27. private MqttConnectOptions options;
  28. private ScheduledExecutorService scheduler;
  29. @Override
  30. protected void onCreate(Bundle savedInstanceState) {
  31. super.onCreate(savedInstanceState);
  32. setContentView(R.layout.main);
  33. resultTv = (TextView) findViewById(R.id.result);
  34. init();
  35. handler = new Handler() {
  36. @Override
  37. public void handleMessage(Message msg) {
  38. super.handleMessage(msg);
  39. if(msg.what == 1) {
  40. Toast.makeText(MainActivity.this, (String) msg.obj,
  41. Toast.LENGTH_SHORT).show();
  42. System.out.println("-----------------------------");
  43. } else if(msg.what == 2) {
  44. Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();
  45. try {
  46. client.subscribe(myTopic, 1);
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. } else if(msg.what == 3) {
  51. Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();
  52. }
  53. }
  54. };
  55. startReconnect();
  56. }
  57. private void startReconnect() {
  58. scheduler = Executors.newSingleThreadScheduledExecutor();
  59. scheduler.scheduleAtFixedRate(new Runnable() {
  60. @Override
  61. public void run() {
  62. if(!client.isConnected()) {
  63. connect();
  64. }
  65. }
  66. }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
  67. }
  68. private void init() {
  69. try {
  70. //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
  71. client = new MqttClient(host, "test",
  72. new MemoryPersistence());
  73. //MQTT的连接设置
  74. options = new MqttConnectOptions();
  75. //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  76. options.setCleanSession(true);
  77. //设置连接的用户名
  78. options.setUserName(userName);
  79. //设置连接的密码
  80. options.setPassword(passWord.toCharArray());
  81. // 设置超时时间 单位为秒
  82. options.setConnectionTimeout(10);
  83. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  84. options.setKeepAliveInterval(20);
  85. //设置回调
  86. client.setCallback(new MqttCallback() {
  87. @Override
  88. public void connectionLost(Throwable cause) {
  89. //连接丢失后,一般在这里面进行重连
  90. System.out.println("connectionLost----------");
  91. }
  92. @Override
  93. public void deliveryComplete(IMqttDeliveryToken token) {
  94. //publish后会执行到这里
  95. System.out.println("deliveryComplete---------"
  96. + token.isComplete());
  97. }
  98. @Override
  99. public void messageArrived(String topicName, MqttMessage message)
  100. throws Exception {
  101. //subscribe后得到的消息会执行到这里面
  102. System.out.println("messageArrived----------");
  103. Message msg = new Message();
  104. msg.what = 1;
  105. msg.obj = topicName+"---"+message.toString();
  106. handler.sendMessage(msg);
  107. }
  108. });
  109. // connect();
  110. } catch (Exception e) {
  111. e.printStackTrace();
  112. }
  113. }
  114. private void connect() {
  115. new Thread(new Runnable() {
  116. @Override
  117. public void run() {
  118. try {
  119. client.connect(options);
  120. Message msg = new Message();
  121. msg.what = 2;
  122. handler.sendMessage(msg);
  123. } catch (Exception e) {
  124. e.printStackTrace();
  125. Message msg = new Message();
  126. msg.what = 3;
  127. handler.sendMessage(msg);
  128. }
  129. }
  130. }).start();
  131. }
  132. @Override
  133. public boolean onKeyDown(int keyCode, KeyEvent event) {
  134. if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {
  135. try {
  136. client.disconnect();
  137. } catch (Exception e) {
  138. e.printStackTrace();
  139. }
  140. }
  141. return super.onKeyDown(keyCode, event);
  142. }
  143. @Override
  144. protected void onDestroy() {
  145. super.onDestroy();
  146. try {
  147. scheduler.shutdown();
  148. client.disconnect();
  149. } catch (MqttException e) {
  150. e.printStackTrace();
  151. }
  152. }
  153. }

由于项目需要,我用到了心跳重连。根据这里的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。

3、新建j2se工程MQTTServer

4、Server代码如下:

  1. import java.awt.Container;
  2. import java.awt.event.ActionEvent;
  3. import java.awt.event.ActionListener;
  4. import javax.swing.JButton;
  5. import javax.swing.JFrame;
  6. import javax.swing.JPanel;
  7. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  8. import org.eclipse.paho.client.mqttv3.MqttCallback;
  9. import org.eclipse.paho.client.mqttv3.MqttClient;
  10. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  11. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  12. import org.eclipse.paho.client.mqttv3.MqttMessage;
  13. import org.eclipse.paho.client.mqttv3.MqttTopic;
  14. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  15. public class Server extends JFrame {
  16. private static final long serialVersionUID = 1L;
  17. private JPanel panel;
  18. private JButton button;
  19. private MqttClient client;
  20. private String host = "tcp://127.0.0.1:1883";
  21. // private String host = "tcp://localhost:1883";
  22. private String userName = "test";
  23. private String passWord = "test";
  24. private MqttTopic topic;
  25. private MqttMessage message;
  26. private String myTopic = "test/topic";
  27. public Server() {
  28. try {
  29. client = new MqttClient(host, "Server",
  30. new MemoryPersistence());
  31. connect();
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. Container container = this.getContentPane();
  36. panel = new JPanel();
  37. button = new JButton("发布话题");
  38. button.addActionListener(new ActionListener() {
  39. @Override
  40. public void actionPerformed(ActionEvent ae) {
  41. try {
  42. MqttDeliveryToken token = topic.publish(message);
  43. token.waitForCompletion();
  44. System.out.println(token.isComplete()+"========");
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. });
  50. panel.add(button);
  51. container.add(panel, "North");
  52. }
  53. private void connect() {
  54. MqttConnectOptions options = new MqttConnectOptions();
  55. options.setCleanSession(false);
  56. options.setUserName(userName);
  57. options.setPassword(passWord.toCharArray());
  58. // 设置超时时间
  59. options.setConnectionTimeout(10);
  60. // 设置会话心跳时间
  61. options.setKeepAliveInterval(20);
  62. try {
  63. client.setCallback(new MqttCallback() {
  64. @Override
  65. public void connectionLost(Throwable cause) {
  66. System.out.println("connectionLost-----------");
  67. }
  68. @Override
  69. public void deliveryComplete(IMqttDeliveryToken token) {
  70. System.out.println("deliveryComplete---------"+token.isComplete());
  71. }
  72. @Override
  73. public void messageArrived(String topic, MqttMessage arg1)
  74. throws Exception {
  75. System.out.println("messageArrived----------");
  76. }
  77. });
  78. topic = client.getTopic(myTopic);
  79. message = new MqttMessage();
  80. message.setQos(1);
  81. message.setRetained(true);
  82. System.out.println(message.isRetained()+"------ratained状态");
  83. message.setPayload("eeeeeaaaaaawwwwww---".getBytes());
  84. client.connect(options);
  85. } catch (Exception e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. public static void main(String[] args) {
  90. Server s = new Server();
  91. s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
  92. s.setSize(600, 370);
  93. s.setLocationRelativeTo(null);
  94. s.setVisible(true);
  95. }
  96. }

上面代码跟客户端的代码差不多,这里就不做解释了。

没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。

jar包下载地址:
https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/

转自:http://www.longdw.com/mqtt-server-client-android/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/188604
推荐阅读
相关标签
  

闽ICP备14008679号