当前位置:   article > 正文

中间件(4): mqtt收发_mqttdeliverytoken.waitforcompletion

mqttdeliverytoken.waitforcompletion

参考 : http://www.mokezhan.com/67091.html

maven依赖

  1. <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4. <version>1.2.0</version>
  5. </dependency>

发送

  1. import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. /**
  4. * @Author: liyue
  5. * @Date: 2021/10/13/14:15
  6. * @Description:
  7. */
  8. public class Send {
  9. /**
  10. * 客户端唯一标识
  11. */
  12. public static final String MQTT_CLIENT_ID = "mqtt_broker_test_10010_send";
  13. private static MqttTopic topic;
  14. private static MqttClient client;
  15. public static void main(String[] args) {
  16. String serverURI, userName, password, mqttTopic;
  17. if (args.length > 0) {
  18. serverURI = args[0];
  19. userName = args[1];
  20. password = args[2];
  21. mqttTopic = args[3];
  22. } else {
  23. serverURI = "tcp://1.1.1.1:9087";
  24. userName = "admin";
  25. password = "password";
  26. mqttTopic = "mqtt_broker_test_10010";
  27. }
  28. System.out.println("============================================================================================================================================");
  29. System.out.println(StringUtil.join("消息发送程序开始启动,配置参数size:{}, serverURI:{}, userName:{}, password:{}, mqttTopic:{}",
  30. args.length,serverURI,userName,password,mqttTopic));
  31. System.out.println("============================================================================================================================================");
  32. MqttMessage message = new MqttMessage();
  33. try {
  34. client = new MqttClient(serverURI, MQTT_CLIENT_ID, new MemoryPersistence());
  35. MqttConnectOptions options = new MqttConnectOptions();
  36. options.setCleanSession(true);
  37. options.setUserName(userName);
  38. options.setPassword(password.toCharArray());
  39. options.setConnectionTimeout(10);
  40. options.setKeepAliveInterval(20);
  41. topic = client.getTopic(mqttTopic);
  42. message.setQos(1);
  43. message.setRetained(false);
  44. client.connect(options);
  45. int i = 1;
  46. while (true) {
  47. String msg = DateUtil.getNow() + " 测试消息" + i;
  48. message.setPayload(msg.getBytes());
  49. MqttDeliveryToken token = topic.publish(message);
  50. token.waitForCompletion();
  51. System.out.println("发送消息:["+msg+"]");
  52. Thread.sleep(4000);
  53. i++;
  54. }
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }

接收

  1. import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. /**
  4. * @Author: liyue
  5. * @Date: 2021/10/13/14:15
  6. * @Description:
  7. */
  8. public class Receive {
  9. /**
  10. * 客户端唯一标识
  11. */
  12. public static final String MQTT_CLIENT_ID = "mqtt_broker_test_10010_receive";
  13. private volatile static MqttClient mqttClient;
  14. private static MqttConnectOptions options;
  15. public static void main(String[] args) {
  16. String serverURI, userName, password, mqttTopic;
  17. if (args.length > 0) {
  18. serverURI = args[0];
  19. userName = args[1];
  20. password = args[2];
  21. mqttTopic = args[3];
  22. } else {
  23. serverURI = "tcp://1.1.1.1:9087";
  24. userName = "admin";
  25. password = "password";
  26. mqttTopic = "mqtt_broker_test_10010";
  27. }
  28. System.out.println("============================================================================================================================================");
  29. System.out.println(StringUtil.join("消息接收程序开始启动,配置参数size:{}, serverURI:{}, userName:{}, password:{}, mqttTopic:{}",
  30. args.length,serverURI,userName,password,mqttTopic));
  31. System.out.println("============================================================================================================================================");
  32. try {
  33. // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
  34. // MemoryPersistence设置clientid的保存形式,默认为以内存保存
  35. mqttClient = new MqttClient(serverURI, MQTT_CLIENT_ID, new MemoryPersistence());
  36. // 配置参数信息
  37. options = new MqttConnectOptions();
  38. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
  39. // 这里设置为true表示每次连接到服务器都以新的身份连接
  40. options.setCleanSession(true);
  41. // 设置用户名
  42. options.setUserName(userName);
  43. // 设置密码
  44. options.setPassword(password.toCharArray());
  45. // 设置超时时间 单位为秒
  46. options.setConnectionTimeout(10);
  47. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  48. options.setKeepAliveInterval(20);
  49. // 连接
  50. mqttClient.connect(options);
  51. // 订阅
  52. mqttClient.subscribe(mqttTopic);
  53. // 设置回调
  54. mqttClient.setCallback(new MqttCallback() {
  55. @Override
  56. public void connectionLost(Throwable throwable) {
  57. System.err.println("无法连接mqtt服务器");
  58. throwable.printStackTrace();
  59. }
  60. @Override
  61. public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
  62. System.out.println(DateUtil.getNow()+" 收到消息,Topic:["+s+"],Message:["+mqttMessage.toString()+"]");
  63. }
  64. @Override
  65. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  66. try {
  67. System.out.println("消息传输完成,msg:"+iMqttDeliveryToken.getMessage());
  68. }catch (Exception e){
  69. System.err.println("获取消息失败");
  70. e.printStackTrace();
  71. }
  72. }
  73. });
  74. } catch (Exception e) {
  75. e.printStackTrace();
  76. }
  77. }
  78. }

依赖

StringUtil.java

  1. /**
  2. * @Author: liyue
  3. * @Date: 2021/10/13/14:48
  4. * @Description:
  5. */
  6. public class StringUtil {
  7. public static String join(String str, Object... param) {
  8. for (Object p : param) {
  9. str = str.replaceFirst("\\{\\}", p.toString());
  10. }
  11. return str;
  12. }
  13. }

DateUtil.java

  1. import java.text.ParseException;
  2. import java.text.SimpleDateFormat;
  3. import java.util.*;
  4. /**
  5. * 线程安全的日期工具类
  6. */
  7. public class DateUtil {
  8. public static String PATTERN_YYYYMM = "yyyyMM";
  9. public static String PATTERN_YYYY_MM_DD = "yyyy-MM-dd";
  10. public static String PATTERN_YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
  11. public static String PATTERN_YYYYMMDDHHMM = "yyyyMMddHHmm";
  12. public static String PATTERN_YYYY_MM_DD_HHMMSS = "yyyy-MM-dd HH:mm:ss";
  13. public static String PATTERN_YYYY_MM_DD_HH = "yyyy-MM-dd HH";
  14. public static String PATTERN_YYYYMMDDHH = "yyyyMMddHH";
  15. public static String PATTERN_YYYY_MM_DD_HH_MM = "yyyy-MM-dd HH:mm";
  16. public static String PATTERN_YYYYMMDD = "yyyyMMdd";
  17. public static String PATTERN_YYYYMMDDHHMMSSSSS = "yyyyMMddHHmmssSSS";
  18. public static String PATTERN_HTTP = "EEE, dd MMM yyyy HH:mm:ss zzz";
  19. public static String PATTERN_YYYYMMDD_CHINESE = "yyyy年MM月dd日";
  20. public static String PATTERN_YYYYMMDD_HHMMSS_CHINESE = "yyyy年MM月dd日 HH:mm:ss";
  21. public static String PATTERN_YYYYMMDD_SLASH = "yyyy/MM/dd";
  22. public static String PATTERN_YYYYMMDD_HHMMSS_SLASH = "yyyy/MM/dd HH:mm:ss";
  23. public static String PATTERN_YYYY_MM_DD_00 = "yyyy-MM-dd 00:00:00";
  24. public static String UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z = "yyyy-MM-dd'T'HH:mm:ss'Z'";
  25. /**
  26. * 锁对象
  27. */
  28. private static final Object lockObj = new Object();
  29. /**
  30. * 存放不同的日期模板格式的sdf的Map
  31. */
  32. private static Map<String, ThreadLocal<SimpleDateFormat>> sdfMap = new HashMap<String, ThreadLocal<SimpleDateFormat>>();
  33. /**
  34. * 返回一个ThreadLocal的sdf,每个线程只会new一次sdf
  35. *
  36. * @param pattern
  37. * @return
  38. */
  39. private static SimpleDateFormat getSdf(final String pattern) {
  40. ThreadLocal<SimpleDateFormat> tl = sdfMap.get(pattern);
  41. // 此处的双重判断和同步是为了防止sdfMap这个单例被多次put重复的sdf
  42. if (tl == null) {
  43. synchronized (lockObj) {
  44. tl = sdfMap.get(pattern);
  45. if (tl == null) {
  46. // 只有Map中还没有这个pattern的sdf才会生成新的sdf并放入map
  47. System.out.println("put new sdf of pattern " + pattern + " to map");
  48. // 这里是关键,使用ThreadLocal<SimpleDateFormat>替代原来直接new SimpleDateFormat
  49. tl = new ThreadLocal<SimpleDateFormat>() {
  50. @Override
  51. protected SimpleDateFormat initialValue() {
  52. //System.out.println("thread: " + Thread.currentThread() + " init pattern: " + pattern);
  53. return new SimpleDateFormat(pattern);
  54. }
  55. };
  56. sdfMap.put(pattern, tl);
  57. }
  58. }
  59. }
  60. return tl.get();
  61. }
  62. /**
  63. * 是用ThreadLocal<SimpleDateFormat>来获取SimpleDateFormat,这样每个线程只会有一个SimpleDateFormat
  64. * 时间格式化
  65. *
  66. * @param date
  67. * @param pattern
  68. * @return
  69. */
  70. public static String format(Date date, String pattern) {
  71. return getSdf(pattern).format(date);
  72. }
  73. public static String format(Date date) {
  74. return getSdf(PATTERN_YYYY_MM_DD_HHMMSS).format(date);
  75. }
  76. public static String format(long timestamp, String pattern) {
  77. return getSdf(pattern).format(new Date(timestamp));
  78. }
  79. public static String format(long timestamp) {
  80. return getSdf(PATTERN_YYYY_MM_DD_HHMMSS).format(new Date(timestamp));
  81. }
  82. /**
  83. * 时间反格式化
  84. *
  85. * @param dateStr
  86. * @param pattern
  87. * @return
  88. * @throws ParseException
  89. */
  90. public static Date parse(String dateStr, String pattern) {
  91. Date date = null;
  92. try {
  93. date = getSdf(pattern).parse(dateStr);
  94. } catch (Exception e) {
  95. }
  96. return date;
  97. }
  98. /**
  99. * 本地时间转 UTC 时间字符串
  100. *
  101. * @param date
  102. * @return
  103. */
  104. public static String localToUtcString(Date date, String pattern) {
  105. SimpleDateFormat sdf = getSdf(pattern);
  106. sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
  107. return sdf.format(date);
  108. }
  109. /**
  110. * UTC 时间反格式化
  111. *
  112. * @param date
  113. * @param pattern
  114. * @return
  115. */
  116. public static Date utcStringToUtcDate(String date, String pattern) {
  117. SimpleDateFormat sdf = getSdf(pattern);
  118. sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
  119. Date utcDate = null;
  120. try {
  121. utcDate = sdf.parse(date);
  122. } catch (Exception e) {
  123. }
  124. return utcDate;
  125. }
  126. /**
  127. * UTC 时间格式化
  128. *
  129. * @param date
  130. * @return
  131. */
  132. public static String utcDateToUtcString(Date date) {
  133. SimpleDateFormat sdf = getSdf("yyyy-MM-dd'T'HH:mm:ss'Z'");
  134. sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
  135. return sdf.format(date);
  136. }
  137. /**
  138. * UTC 时间字符串转本地时间
  139. *
  140. * @param date
  141. * @param pattern
  142. * @return
  143. */
  144. public static Date utcStringToLocalDate(String date, String pattern) {
  145. SimpleDateFormat sdf = getSdf(pattern);
  146. sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
  147. Date localDate = null;
  148. try {
  149. localDate = sdf.parse(date);
  150. } catch (Exception e) {
  151. }
  152. return localDate;
  153. }
  154. /**
  155. * 获取本周一的日期
  156. *
  157. * @return
  158. */
  159. public static String getMonday() {
  160. Calendar cal = Calendar.getInstance();
  161. // 设置一个星期的第一天,按中国的习惯一个星期的第一天是星期一
  162. cal.setFirstDayOfWeek(Calendar.MONDAY);
  163. // 获得当前日期是一个星期的第几天
  164. int dayWeek = cal.get(Calendar.DAY_OF_WEEK);
  165. if (dayWeek == 1) {
  166. dayWeek = 8;
  167. }
  168. // 根据日历的规则,给当前日期减去星期几与一个星期第一天的差值
  169. cal.add(Calendar.DATE, cal.getFirstDayOfWeek() - dayWeek);
  170. Date mondayDate = cal.getTime();
  171. return format(mondayDate, PATTERN_YYYY_MM_DD);
  172. }
  173. /**
  174. * 获取本周日的日期
  175. *
  176. * @return
  177. */
  178. public static String getSunday() {
  179. Calendar cal = Calendar.getInstance();
  180. // 设置一个星期的第一天,按中国的习惯一个星期的第一天是星期一
  181. cal.setFirstDayOfWeek(Calendar.MONDAY);
  182. // 获得当前日期是一个星期的第几天
  183. int dayWeek = cal.get(Calendar.DAY_OF_WEEK);
  184. if (dayWeek == 1) {
  185. dayWeek = 8;
  186. }
  187. // 根据日历的规则,给当前日期减去星期几与一个星期第一天的差值
  188. cal.add(Calendar.DATE, cal.getFirstDayOfWeek() - dayWeek);
  189. cal.add(Calendar.DATE, 4 + cal.getFirstDayOfWeek());
  190. Date sundayDate = cal.getTime();
  191. return format(sundayDate, PATTERN_YYYY_MM_DD);
  192. }
  193. /**
  194. * 获取某月的最后一天
  195. *
  196. * @param year 年份
  197. * @param month 月份
  198. * @return
  199. */
  200. public static String getLastDayOfMonth(int year, int month) {
  201. Calendar cal = Calendar.getInstance();
  202. //设置年份
  203. cal.set(Calendar.YEAR, year);
  204. //设置月份
  205. cal.set(Calendar.MONTH, month - 1);
  206. //获取某月最大天数
  207. int lastDay = cal.getActualMaximum(Calendar.DAY_OF_MONTH);
  208. //设置日历中月份的最大天数
  209. cal.set(Calendar.DAY_OF_MONTH, lastDay);
  210. //格式化日期
  211. String day = format(cal.getTime(), PATTERN_YYYY_MM_DD);
  212. return day;
  213. }
  214. /**
  215. * 获取指定区间内随机时间
  216. * @param beginDate
  217. * @param endDate
  218. * @param pattern
  219. * @return
  220. */
  221. public static Date randomDate(String beginDate, String endDate, String pattern) {
  222. try {
  223. Date start = parse(beginDate, pattern);
  224. Date end = parse(endDate, pattern);
  225. if (start.getTime() >= end.getTime()) {
  226. return null;
  227. }
  228. long date = random(start.getTime(), end.getTime());
  229. return new Date(date);
  230. } catch (Exception e) {
  231. e.printStackTrace();
  232. }
  233. return null;
  234. }
  235. private static long random(long begin, long end) {
  236. long rtn = begin + (long) (Math.random() * (end - begin));
  237. if (rtn == begin || rtn == end) {
  238. return random(begin, end);
  239. }
  240. return rtn;
  241. }
  242. public static void main(String[] args) throws Exception {
  243. String s = localToUtcString(new Date(), UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
  244. System.err.println(s);
  245. Date date = utcStringToLocalDate(s, UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
  246. System.err.println(format(date, PATTERN_YYYY_MM_DD_HHMMSS));
  247. System.err.println("--------------------------------");
  248. String da = localToUtcString(new Date(), UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
  249. System.err.println(da);
  250. Date utcDate = utcStringToUtcDate(da, UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
  251. System.err.println(utcDateToUtcString(utcDate));
  252. System.err.println(getSunday());
  253. }
  254. public static String getNow() {
  255. return getSdf(PATTERN_YYYY_MM_DD_HHMMSS).format(new Date());
  256. }
  257. }

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号