赞
踩
MQTT (Message Queue Telemetry Transport) 是一个轻量级传输协议,它被设计用于轻量级的发布/订阅式消息传输,MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化。是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的信息采集,工业控制,智能家居等方面具有广泛的适用性。
官网:mqtt.org
MQTT的客户端工具,可以使用MQTTX
MQTT 客户端库 & SDK 大全 | EMQ (emqx.com)
特点:
MQTT支持三种消息发布服务质量(QoS):
MQTT 三种身份:
应用场景:
遥感数据、汽车、智能家居、智慧城市、医疗医护
即时通讯:MQ 可以通过订阅主题,轻松实现 1对1、1对多的通讯
EMQX大规模分布式 MQTT 消息服务器,大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器,高效可靠连接海量物联网设备,高性能实时处理消息与事件流数据,助力构建关键业务的物联网平台与应用。
基于Docker安装:
1.执行名称 创建容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26
2.连接测试
访问可视化页面:
http://服务器ip:18083
默认的账号 和密码 :admin/public
基于MQTTX连接测试MQTT服务器
RabbitMQ的MQTT插件
docker run -d --hostname qf-rabbit --name qf-rabbit -p 15672:15672 -p 5672:5672 -p 1883:1883 -p 15675:15675 daocloud.io/library/rabbitmq:3.8.4
2.执行命令,开启插件
docker exec qf-rabbit rabbitmq-plugins enable rabbitmq_management
docker exec qf-rabbit rabbitmq-plugins enable rabbitmq_mqtt
docker exec qf-rabbit rabbitmq-plugins enable rabbitmq_web_mqtt
3.访问RabbitMQ 查看是否开启MQTT
4.使用MQTTX测试MQTT
在Java程序中实现MQTT的发布和订阅
实现步骤:
1.依赖jar
<!-- mqtt通信的jar-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.编写MQTT客户端类
@Slf4j public class MyMqttClient { //MQTTurl private static String url="tcp://IP地址:1883"; //MQTTid头 private static String clientId="mqtt_lx_"; //创建MQTT对象 private static MqttCallback cb=new MqttCallback() { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { log.info("消息主题{},消息内容:{}",s,new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }; private static MqttClient client; public static void sendMsg(String msg) throws MqttException { client.publish("MQTT",new MqttMessage(msg.getBytes())); } public static void receiveMsg() throws MqttException { client.subscribe("MQTT",1); } public static void main(String[] args) throws MqttException { //注意id唯一 client=new MqttClient(url,clientId+new Random().nextInt(100000)); MqttConnectOptions options=new MqttConnectOptions(); options.setUserName("admin"); options.setPassword("public".toCharArray()); client.setCallback(cb); client.connect(options); System.err.println("已开启订阅,等待消息"); receiveMsg(); } }
3.运行测试
实际中实现MQTT需要考虑解耦和消息订阅之后存储到MongoDB中
具体的代码如下所示:
1.依赖jar
<!-- mqtt通信的jar--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.10</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency>
2.实现配置
MQTT自定义配置
mqtt:
url: tcp://IP地址:1883
clientid: mqtt_lx_
user: admin
pass: public
qos: 0
topic: MQTT
spring:
data:
mongodb:
uri: mongodb://IP地址/admin
3.编写代码-MongoDB代码
@Data
@Document
@NoArgsConstructor
@AllArgsConstructor
public class MqttMsg {
@Id
private Long id;//雪花算法
private String msg;//消息内容
private long ctime;//消息时间 毫秒为单位
}
public interface MqttmsgDao extends MongoRepository<MqttMsg,Long> {
}
4.编写代码-MQTT代码
@Component @Slf4j public class MyMqttCallBack implements MqttCallback { @Resource private MqttmsgDao dao; @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { switch (s){ case "MQTT": String json=new String(mqttMessage.getPayload()); log.info("消息内容:{}",json); try{ MqttMsg mqttMsg= JSON.parseObject(json,MqttMsg.class); dao.save(mqttMsg); }catch (Exception ex){ log.error("亲,不是我们规定的消息格式!"); } break; } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
@Slf4j @Component public class MqttConfig implements InitializingBean { @Value("${mqtt.url}") private String url; @Value("${mqtt.clientid}") private String clientid; @Value("${mqtt.user}") private String user; @Value("${mqtt.pass}") private String pass; @Value("${mqtt.qos}") private Integer qos; @Value("${mqtt.topic}") private String topic; @Resource public MyMqttCallBack callBack; private MqttClient client; /** * 发送消息*/ public boolean publishMsg(String msg){ try { client.publish(topic,new MqttMessage(msg.getBytes())); return true; } catch (MqttException e) { e.printStackTrace(); } return false; } @Override public void afterPropertiesSet() throws Exception { //实例化 mqtt客户端对象 参数说明:1.mqtt服务器地址 2.客户端名称 唯一 client=new MqttClient(url,clientid+new Random().nextInt(100000)); //实例化 mqtt 参数对象 设置参数信息 MqttConnectOptions options=new MqttConnectOptions(); //账号 EMQX 的账号 options.setUserName(user); //密码 EMQX 的密码 options.setPassword(pass.toCharArray()); //设置 消息接收对象 回调函数 client.setCallback(callBack); //发起连接 连接到MQTT服务器 client.connect(options); //定义 对应的主题的消息 client.subscribe(topic,qos); } }
编写代码实现控制层
@RestController @RequestMapping("/api/mqtt/") public class MqttController { @Resource private MqttConfig config; @Resource private MqttmsgDao dao; @GetMapping("sendmsg") public String sendMsg(String msg){ if(StringUtils.hasLength(msg)){ MqttMsg mqttMsg=new MqttMsg(IdUtil.getSnowflakeNextId(),msg,System.currentTimeMillis()); if(config.publishMsg(JSON.toJSONString(mqttMsg))){ return "OK"; } } return "FAIL"; } @GetMapping("all") public List<MqttMsg> all(){ return dao.findAll(Sort.by(Sort.Order.desc("ctime"))); } }
5.运行测试
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。