赞
踩
参考 : http://www.mokezhan.com/67091.html
maven依赖
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.0</version>
- </dependency>
发送
-
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
- /**
- * @Author: liyue
- * @Date: 2021/10/13/14:15
- * @Description:
- */
- public class Send {
-
- /**
- * 客户端唯一标识
- */
- public static final String MQTT_CLIENT_ID = "mqtt_broker_test_10010_send";
- private static MqttTopic topic;
- private static MqttClient client;
-
- public static void main(String[] args) {
- String serverURI, userName, password, mqttTopic;
- if (args.length > 0) {
- serverURI = args[0];
- userName = args[1];
- password = args[2];
- mqttTopic = args[3];
- } else {
- serverURI = "tcp://1.1.1.1:9087";
- userName = "admin";
- password = "password";
- mqttTopic = "mqtt_broker_test_10010";
- }
- System.out.println("============================================================================================================================================");
- System.out.println(StringUtil.join("消息发送程序开始启动,配置参数size:{}, serverURI:{}, userName:{}, password:{}, mqttTopic:{}",
- args.length,serverURI,userName,password,mqttTopic));
- System.out.println("============================================================================================================================================");
- MqttMessage message = new MqttMessage();
- try {
- client = new MqttClient(serverURI, MQTT_CLIENT_ID, new MemoryPersistence());
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);
- options.setUserName(userName);
- options.setPassword(password.toCharArray());
- options.setConnectionTimeout(10);
- options.setKeepAliveInterval(20);
-
- topic = client.getTopic(mqttTopic);
-
- message.setQos(1);
- message.setRetained(false);
- client.connect(options);
- int i = 1;
- while (true) {
- String msg = DateUtil.getNow() + " 测试消息" + i;
- message.setPayload(msg.getBytes());
- MqttDeliveryToken token = topic.publish(message);
- token.waitForCompletion();
- System.out.println("发送消息:["+msg+"]");
- Thread.sleep(4000);
- i++;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
接收
-
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
- /**
- * @Author: liyue
- * @Date: 2021/10/13/14:15
- * @Description:
- */
- public class Receive {
-
- /**
- * 客户端唯一标识
- */
- public static final String MQTT_CLIENT_ID = "mqtt_broker_test_10010_receive";
-
-
- private volatile static MqttClient mqttClient;
- private static MqttConnectOptions options;
-
-
- public static void main(String[] args) {
- String serverURI, userName, password, mqttTopic;
- if (args.length > 0) {
- serverURI = args[0];
- userName = args[1];
- password = args[2];
- mqttTopic = args[3];
- } else {
- serverURI = "tcp://1.1.1.1:9087";
- userName = "admin";
- password = "password";
- mqttTopic = "mqtt_broker_test_10010";
- }
- System.out.println("============================================================================================================================================");
- System.out.println(StringUtil.join("消息接收程序开始启动,配置参数size:{}, serverURI:{}, userName:{}, password:{}, mqttTopic:{}",
- args.length,serverURI,userName,password,mqttTopic));
- System.out.println("============================================================================================================================================");
- try {
- // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
- // MemoryPersistence设置clientid的保存形式,默认为以内存保存
-
- mqttClient = new MqttClient(serverURI, MQTT_CLIENT_ID, new MemoryPersistence());
- // 配置参数信息
- options = new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
- // 这里设置为true表示每次连接到服务器都以新的身份连接
- options.setCleanSession(true);
- // 设置用户名
- options.setUserName(userName);
- // 设置密码
- options.setPassword(password.toCharArray());
- // 设置超时时间 单位为秒
- options.setConnectionTimeout(10);
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- options.setKeepAliveInterval(20);
- // 连接
- mqttClient.connect(options);
- // 订阅
- mqttClient.subscribe(mqttTopic);
- // 设置回调
- mqttClient.setCallback(new MqttCallback() {
- @Override
- public void connectionLost(Throwable throwable) {
- System.err.println("无法连接mqtt服务器");
- throwable.printStackTrace();
- }
-
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- System.out.println(DateUtil.getNow()+" 收到消息,Topic:["+s+"],Message:["+mqttMessage.toString()+"]");
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- try {
- System.out.println("消息传输完成,msg:"+iMqttDeliveryToken.getMessage());
- }catch (Exception e){
- System.err.println("获取消息失败");
- e.printStackTrace();
- }
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
依赖
StringUtil.java
-
- /**
- * @Author: liyue
- * @Date: 2021/10/13/14:48
- * @Description:
- */
- public class StringUtil {
-
- public static String join(String str, Object... param) {
- for (Object p : param) {
- str = str.replaceFirst("\\{\\}", p.toString());
- }
- return str;
- }
- }
DateUtil.java
-
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.*;
-
- /**
- * 线程安全的日期工具类
- */
- public class DateUtil {
-
- public static String PATTERN_YYYYMM = "yyyyMM";
- public static String PATTERN_YYYY_MM_DD = "yyyy-MM-dd";
- public static String PATTERN_YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
- public static String PATTERN_YYYYMMDDHHMM = "yyyyMMddHHmm";
- public static String PATTERN_YYYY_MM_DD_HHMMSS = "yyyy-MM-dd HH:mm:ss";
- public static String PATTERN_YYYY_MM_DD_HH = "yyyy-MM-dd HH";
- public static String PATTERN_YYYYMMDDHH = "yyyyMMddHH";
- public static String PATTERN_YYYY_MM_DD_HH_MM = "yyyy-MM-dd HH:mm";
- public static String PATTERN_YYYYMMDD = "yyyyMMdd";
- public static String PATTERN_YYYYMMDDHHMMSSSSS = "yyyyMMddHHmmssSSS";
- public static String PATTERN_HTTP = "EEE, dd MMM yyyy HH:mm:ss zzz";
- public static String PATTERN_YYYYMMDD_CHINESE = "yyyy年MM月dd日";
- public static String PATTERN_YYYYMMDD_HHMMSS_CHINESE = "yyyy年MM月dd日 HH:mm:ss";
- public static String PATTERN_YYYYMMDD_SLASH = "yyyy/MM/dd";
- public static String PATTERN_YYYYMMDD_HHMMSS_SLASH = "yyyy/MM/dd HH:mm:ss";
- public static String PATTERN_YYYY_MM_DD_00 = "yyyy-MM-dd 00:00:00";
-
- public static String UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z = "yyyy-MM-dd'T'HH:mm:ss'Z'";
-
-
- /**
- * 锁对象
- */
- private static final Object lockObj = new Object();
-
- /**
- * 存放不同的日期模板格式的sdf的Map
- */
- private static Map<String, ThreadLocal<SimpleDateFormat>> sdfMap = new HashMap<String, ThreadLocal<SimpleDateFormat>>();
-
- /**
- * 返回一个ThreadLocal的sdf,每个线程只会new一次sdf
- *
- * @param pattern
- * @return
- */
- private static SimpleDateFormat getSdf(final String pattern) {
- ThreadLocal<SimpleDateFormat> tl = sdfMap.get(pattern);
-
- // 此处的双重判断和同步是为了防止sdfMap这个单例被多次put重复的sdf
- if (tl == null) {
- synchronized (lockObj) {
- tl = sdfMap.get(pattern);
- if (tl == null) {
- // 只有Map中还没有这个pattern的sdf才会生成新的sdf并放入map
- System.out.println("put new sdf of pattern " + pattern + " to map");
-
- // 这里是关键,使用ThreadLocal<SimpleDateFormat>替代原来直接new SimpleDateFormat
- tl = new ThreadLocal<SimpleDateFormat>() {
-
- @Override
- protected SimpleDateFormat initialValue() {
- //System.out.println("thread: " + Thread.currentThread() + " init pattern: " + pattern);
- return new SimpleDateFormat(pattern);
- }
- };
- sdfMap.put(pattern, tl);
- }
- }
- }
-
- return tl.get();
- }
-
- /**
- * 是用ThreadLocal<SimpleDateFormat>来获取SimpleDateFormat,这样每个线程只会有一个SimpleDateFormat
- * 时间格式化
- *
- * @param date
- * @param pattern
- * @return
- */
- public static String format(Date date, String pattern) {
- return getSdf(pattern).format(date);
- }
-
-
- public static String format(Date date) {
- return getSdf(PATTERN_YYYY_MM_DD_HHMMSS).format(date);
- }
-
- public static String format(long timestamp, String pattern) {
- return getSdf(pattern).format(new Date(timestamp));
- }
-
- public static String format(long timestamp) {
- return getSdf(PATTERN_YYYY_MM_DD_HHMMSS).format(new Date(timestamp));
- }
-
- /**
- * 时间反格式化
- *
- * @param dateStr
- * @param pattern
- * @return
- * @throws ParseException
- */
- public static Date parse(String dateStr, String pattern) {
- Date date = null;
- try {
- date = getSdf(pattern).parse(dateStr);
- } catch (Exception e) {
-
- }
- return date;
- }
-
- /**
- * 本地时间转 UTC 时间字符串
- *
- * @param date
- * @return
- */
- public static String localToUtcString(Date date, String pattern) {
- SimpleDateFormat sdf = getSdf(pattern);
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
- return sdf.format(date);
- }
-
- /**
- * UTC 时间反格式化
- *
- * @param date
- * @param pattern
- * @return
- */
- public static Date utcStringToUtcDate(String date, String pattern) {
- SimpleDateFormat sdf = getSdf(pattern);
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
- Date utcDate = null;
- try {
- utcDate = sdf.parse(date);
- } catch (Exception e) {
-
- }
- return utcDate;
- }
-
- /**
- * UTC 时间格式化
- *
- * @param date
- * @return
- */
- public static String utcDateToUtcString(Date date) {
- SimpleDateFormat sdf = getSdf("yyyy-MM-dd'T'HH:mm:ss'Z'");
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
- return sdf.format(date);
- }
-
-
- /**
- * UTC 时间字符串转本地时间
- *
- * @param date
- * @param pattern
- * @return
- */
- public static Date utcStringToLocalDate(String date, String pattern) {
- SimpleDateFormat sdf = getSdf(pattern);
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
- Date localDate = null;
- try {
- localDate = sdf.parse(date);
- } catch (Exception e) {
-
- }
- return localDate;
- }
-
- /**
- * 获取本周一的日期
- *
- * @return
- */
- public static String getMonday() {
- Calendar cal = Calendar.getInstance();
- // 设置一个星期的第一天,按中国的习惯一个星期的第一天是星期一
- cal.setFirstDayOfWeek(Calendar.MONDAY);
- // 获得当前日期是一个星期的第几天
- int dayWeek = cal.get(Calendar.DAY_OF_WEEK);
- if (dayWeek == 1) {
- dayWeek = 8;
- }
- // 根据日历的规则,给当前日期减去星期几与一个星期第一天的差值
- cal.add(Calendar.DATE, cal.getFirstDayOfWeek() - dayWeek);
- Date mondayDate = cal.getTime();
- return format(mondayDate, PATTERN_YYYY_MM_DD);
-
- }
-
- /**
- * 获取本周日的日期
- *
- * @return
- */
- public static String getSunday() {
- Calendar cal = Calendar.getInstance();
- // 设置一个星期的第一天,按中国的习惯一个星期的第一天是星期一
- cal.setFirstDayOfWeek(Calendar.MONDAY);
- // 获得当前日期是一个星期的第几天
- int dayWeek = cal.get(Calendar.DAY_OF_WEEK);
- if (dayWeek == 1) {
- dayWeek = 8;
- }
- // 根据日历的规则,给当前日期减去星期几与一个星期第一天的差值
- cal.add(Calendar.DATE, cal.getFirstDayOfWeek() - dayWeek);
- cal.add(Calendar.DATE, 4 + cal.getFirstDayOfWeek());
- Date sundayDate = cal.getTime();
- return format(sundayDate, PATTERN_YYYY_MM_DD);
- }
-
- /**
- * 获取某月的最后一天
- *
- * @param year 年份
- * @param month 月份
- * @return
- */
- public static String getLastDayOfMonth(int year, int month) {
- Calendar cal = Calendar.getInstance();
- //设置年份
- cal.set(Calendar.YEAR, year);
- //设置月份
- cal.set(Calendar.MONTH, month - 1);
- //获取某月最大天数
- int lastDay = cal.getActualMaximum(Calendar.DAY_OF_MONTH);
- //设置日历中月份的最大天数
- cal.set(Calendar.DAY_OF_MONTH, lastDay);
- //格式化日期
- String day = format(cal.getTime(), PATTERN_YYYY_MM_DD);
- return day;
- }
-
- /**
- * 获取指定区间内随机时间
- * @param beginDate
- * @param endDate
- * @param pattern
- * @return
- */
- public static Date randomDate(String beginDate, String endDate, String pattern) {
- try {
- Date start = parse(beginDate, pattern);
- Date end = parse(endDate, pattern);
- if (start.getTime() >= end.getTime()) {
- return null;
- }
- long date = random(start.getTime(), end.getTime());
- return new Date(date);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- private static long random(long begin, long end) {
- long rtn = begin + (long) (Math.random() * (end - begin));
- if (rtn == begin || rtn == end) {
- return random(begin, end);
- }
- return rtn;
- }
-
- public static void main(String[] args) throws Exception {
- String s = localToUtcString(new Date(), UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
- System.err.println(s);
-
- Date date = utcStringToLocalDate(s, UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
-
- System.err.println(format(date, PATTERN_YYYY_MM_DD_HHMMSS));
-
- System.err.println("--------------------------------");
- String da = localToUtcString(new Date(), UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
- System.err.println(da);
- Date utcDate = utcStringToUtcDate(da, UTC_PATTERN_YYYY_MM_DD_T_HH_MM_SS_Z);
- System.err.println(utcDateToUtcString(utcDate));
- System.err.println(getSunday());
- }
-
-
- public static String getNow() {
- return getSdf(PATTERN_YYYY_MM_DD_HHMMSS).format(new Date());
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。