当前位置:   article > 正文

mqtt协议实现 java服务端推送功能(二)java demo测试

java使用mqtt客户端id向客户端推送主题

上一篇写了安装mosQuitto和测试,但是用cmd命令很麻烦,有没有一个可视化软件呢? 有,需要在google浏览器下载一个叫MQTTLens的插件

打开MQTTLens后界面如下:

打开connections后面的加号,就可以连接

绿色,说明连接成功,在Subscribe下添加订阅的topic名称(这里就写topic了) 点击subscribe 完成订阅

网上找了个java的测试代码,进行连接测试

pom.xml 引入包:

  1. <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4. <version>1.0.2</version>
  5. </dependency>
  1. package mqtt;
  2. import java.util.concurrent.ScheduledExecutorService;
  3. import org.eclipse.paho.client.mqttv3.MqttClient;
  4. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5. import org.eclipse.paho.client.mqttv3.MqttException;
  6. import org.eclipse.paho.client.mqttv3.MqttTopic;
  7. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  8. public class ClientMQTT {
  9. public static final String HOST = "tcp://localhost:1883";
  10. public static final String TOPIC = "topic";
  11. private static final String clientid = "client11";
  12. private MqttClient client;
  13. private MqttConnectOptions options;
  14. private String userName = "admin";
  15. private String passWord = "admin";
  16. private ScheduledExecutorService scheduler;
  17. private void start() {
  18. try {
  19. // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
  20. client = new MqttClient(HOST, clientid, new MemoryPersistence());
  21. // MQTT的连接设置
  22. options = new MqttConnectOptions();
  23. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  24. options.setCleanSession(true);
  25. // 设置连接的用户名
  26. options.setUserName(userName);
  27. // 设置连接的密码
  28. options.setPassword(passWord.toCharArray());
  29. // 设置超时时间 单位为秒
  30. options.setConnectionTimeout(10);
  31. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  32. options.setKeepAliveInterval(20);
  33. // 设置回调
  34. client.setCallback(new PushCallback());
  35. MqttTopic topic = client.getTopic(TOPIC);
  36. // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
  37. options.setWill(topic, "close".getBytes(), 2, true);
  38. client.connect(options);
  39. // 订阅消息
  40. int[] Qos = { 1 };
  41. String[] topic1 = { TOPIC };
  42. client.subscribe(topic1, Qos);
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. public static void main(String[] args) throws MqttException {
  48. ClientMQTT client = new ClientMQTT();
  49. client.start();
  50. }
  51. }
  1. package mqtt;
  2. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  3. import org.eclipse.paho.client.mqttv3.MqttCallback;
  4. import org.eclipse.paho.client.mqttv3.MqttMessage;
  5. /**
  6. * 发布消息的回调类
  7. *
  8. * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
  9. * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 在回调中,将它用来标识已经启动了该回调的哪个实例。
  10. * 必须在回调类中实现三个方法:
  11. *
  12. * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
  13. *
  14. * public void connectionLost(Throwable cause)在断开连接时调用。
  15. *
  16. * public void deliveryComplete(MqttDeliveryToken token)) 接收到已经发布的 QoS 1 或 QoS 2
  17. * 消息的传递令牌时调用。 由 MqttClient.connect 激活此回调。
  18. *
  19. */
  20. public class PushCallback implements MqttCallback {
  21. public void connectionLost(Throwable cause) {
  22. // 连接丢失后,一般在这里面进行重连
  23. System.out.println("连接断开,可以做重连");
  24. }
  25. public void deliveryComplete(IMqttDeliveryToken token) {
  26. System.out.println("deliveryComplete---------" + token.isComplete());
  27. }
  28. public void messageArrived(String topic, MqttMessage message) throws Exception {
  29. // subscribe后得到的消息会执行到这里面
  30. System.out.println("接收消息主题 : " + topic);
  31. System.out.println("接收消息Qos : " + message.getQos());
  32. System.out.println("接收消息内容 : " + new String(message.getPayload()));
  33. }
  34. }
  1. package mqtt;
  2. import java.io.UnsupportedEncodingException;
  3. import org.eclipse.paho.client.mqttv3.MqttClient;
  4. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  6. import org.eclipse.paho.client.mqttv3.MqttException;
  7. import org.eclipse.paho.client.mqttv3.MqttMessage;
  8. import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
  9. import org.eclipse.paho.client.mqttv3.MqttTopic;
  10. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  11. /**
  12. * Title:Server Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
  13. */
  14. public class ServerMQTT {
  15. // tcp://MQTT安装的服务器地址:MQTT定义的端口号
  16. public static final String HOST = "tcp://192.168.10.80:1883";
  17. // 定义一个主题
  18. public static final String TOPIC = "topic";
  19. // 定义MQTT的ID,可以在MQTT服务配置中指定
  20. private static final String clientid = "server11";
  21. private static MqttClient client;
  22. private static MqttTopic topic11;
  23. //用户名和密码
  24. private String userName = "admin";
  25. private String passWord = "admin";
  26. private MqttMessage message;
  27. /**
  28. * 构造函数
  29. *
  30. * @throws MqttException
  31. */
  32. public ServerMQTT() throws MqttException {
  33. // MemoryPersistence设置clientid的保存形式,默认为以内存保存
  34. client = new MqttClient(HOST, clientid, new MemoryPersistence());
  35. connect();
  36. }
  37. /**
  38. * 用来连接服务器
  39. */
  40. private void connect() {
  41. MqttConnectOptions options = new MqttConnectOptions();
  42. options.setCleanSession(false);
  43. options.setUserName(userName);
  44. options.setPassword(passWord.toCharArray());
  45. // 设置超时时间
  46. options.setConnectionTimeout(10);
  47. // 设置会话心跳时间
  48. options.setKeepAliveInterval(20);
  49. try {
  50. client.setCallback(new PushCallback());
  51. client.connect(options);
  52. topic11 = client.getTopic(TOPIC);
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. /**
  58. *
  59. * @param topic
  60. * @param message
  61. * @throws MqttPersistenceException
  62. * @throws MqttException
  63. */
  64. public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {
  65. MqttDeliveryToken token = topic.publish(message);
  66. token.waitForCompletion();
  67. System.out.println("message is published completely! " + token.isComplete());
  68. }
  69. /**
  70. * 启动入口
  71. *
  72. * @param args
  73. * @throws MqttException
  74. * @throws UnsupportedEncodingException
  75. */
  76. public static void main(String[] args) throws MqttException, UnsupportedEncodingException {
  77. ServerMQTT server = new ServerMQTT();
  78. server.message = new MqttMessage();
  79. /* Qos服务质量等级:
  80. * 0,最多一次,不管是否接收到;
  81. * 1,最少一次,保证信息将会被至少发送一次给接受者
  82. * 2,只一次,确保每个消息都只被接收到的一次,他是最安全也是最慢的服务等级
  83. */
  84. server.message.setQos(2);
  85. server.message.setRetained(true);
  86. server.message.setPayload("test".getBytes("UTF-8"));
  87. server.publish(server.topic11, server.message);
  88. System.out.println(server.message.isRetained() + "------ratained状态");
  89. }
  90. }

运行ServerMQTT中的main方法   发送test给mqtt,在MQTTlens中看到了接收的消息,说明推送成功

 

 

 

转载于:https://www.cnblogs.com/zhi-ming/p/10453129.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/200759
推荐阅读
相关标签
  

闽ICP备14008679号