赞
踩
前言:mqtt客户到接收到订阅消息后,发布到rabbitmq消息队列处理业务,可以解决并发的问题
1,pom文件
- <dependencies>
- <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.68</version>
- </dependency>
- <!--mqtt依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-integration</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-mqtt</artifactId>
- </dependency>
-
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <!--lombok-->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
2,yml文件
- server:
- port: 8021
- spring:
- #给项目来个名字
- application:
- name: rabbitmq-provider
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- #虚拟host 可以不设置,使用server默认host
- virtual-host: JCcccHost
- #MQTT-用户名
- mqtt:
- username: admin
- #MQTT-密码password: password
- password: public
- #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:1883
- url: tcp://127.0.0.1:1883
- #MQTT-连接服务器默认客户端ID
- client:
- id: mqttId
- #MQTT-默认的消息推送主题,实际可在调用接口时指定
- default:
- #订阅主题,多个用‘,’隔开,如:topic,topic1
- topic: topic,topic1
- #连接超时
- completionTimeout: 3000
3,RabbitConfig:
- package com.zxk.mqprovider.config;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.beans.factory.config.ConfigurableBeanFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
-
- @Configuration
- public class RabbitConfig {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- @Value("${spring.rabbitmq.host}")
- private String host;
-
- @Value("${spring.rabbitmq.port}")
- private int port;
-
- @Value("${spring.rabbitmq.username}")
- private String username;
-
- @Value("${spring.rabbitmq.password}")
- private String password;
-
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true);
- return connectionFactory;
- }
-
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- //必须是prototype类型
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- }
4,MqttReceiveClient客户端(mqtt)
- package com.zxk.mqprovider.mqtt;
-
- import com.alibaba.fastjson.JSONObject;
- import com.zxk.mqprovider.mqtt.handle.MqttReceiveHandle;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.integration.annotation.IntegrationComponentScan;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.integration.channel.DirectChannel;
- import org.springframework.integration.core.MessageProducer;
- import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
- import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.MessageHandler;
- import org.springframework.messaging.MessagingException;
-
- /**
- *〈MQTT接收消息处理〉
- * **/
- @Slf4j
- @Configuration
- @IntegrationComponentScan
- public class MqttReceiveClient {
-
- @Autowired
- private MqttReceiveHandle mqttReceiveHandle;
-
- @Value("${spring.mqtt.username}")
- private String username;
-
- @Value("${spring.mqtt.password}")
- private String password;
-
- @Value("${spring.mqtt.url}")
- private String hostUrl;
-
- @Value("${spring.mqtt.client.id}")
- private String clientId;
-
- //订阅主题(#订阅主题,多个用‘,’隔开,如:topic,YIHON_WQ_M)
- @Value("${spring.mqtt.default.topic}")
- private String defaultTopic;
-
- @Value("${spring.mqtt.completionTimeout}")
- private int completionTimeout; //连接超时
-
-
- @Bean(value = "getMqttConnectOptions")
- public MqttConnectOptions getMqttConnectOptions() {
- MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
- mqttConnectOptions.setUserName(username);
- mqttConnectOptions.setPassword(password.toCharArray());
- mqttConnectOptions.setServerURIs(new String[]{hostUrl});
- //cleanSession 设为 true;当客户端掉线时;服务器端会清除 客户端session;重连后 客户端会有一个新的session,cleanSession
- // 设为false,客户端掉线后 服务器端不会清除session,当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
- mqttConnectOptions.setCleanSession(false);
- // 设置会话心跳时间 单位为秒
- mqttConnectOptions.setKeepAliveInterval(2);
- return mqttConnectOptions;
- }
-
- @Bean
- public MqttPahoClientFactory mqttClientFactory() {
- DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
- factory.setConnectionOptions(getMqttConnectOptions());
- return factory;
- }
-
- //接收通道
- @Bean
- public MessageChannel mqttInputChannel() {
- return new DirectChannel();
- }
-
- //配置client,监听的topic
- @Bean
- public MessageProducer inbound() {
- MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound2", mqttClientFactory(), defaultTopic.trim().split(","));
- adapter.setCompletionTimeout(completionTimeout);
- adapter.setConverter(new DefaultPahoMessageConverter());
- adapter.setQos(2);
- adapter.setOutputChannel(mqttInputChannel());
- return adapter;
- }
-
- //通过通道获取数据
- @Bean
- @ServiceActivator(inputChannel = "mqttInputChannel")
- public MessageHandler handler() {
- return new MessageHandler() {
- @Override
- public void handleMessage(Message<?> message) throws MessagingException {
- //处理消息
- mqttReceiveHandle.handle(message);
- }
- };
- }
- }
5, MqttReceiveHandle 客户端(mqtt)消息处理Handel
- package com.zxk.mqprovider.mqtt.handle;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.integration.mqtt.support.MqttHeaders;
- import org.springframework.messaging.Message;
- import org.springframework.stereotype.Component;
-
- /**
- * mqtt客户端消息处理类
- * **/
- @Slf4j
- @Component
- public class MqttReceiveHandle {
-
- //使用RabbitTemplate,这提供了接收/发送等等方法
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void handle(Message<?> message){
- log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());
- //发送到rabbit消息队列
- rabbitTemplate.convertAndSend("directExchange", "yihonWqm", (String) message.getPayload());
-
- }
- }
6,MqttDataRabbitConfig 创建队列
- package com.zxk.mqprovider.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MqttDataRabbitConfig {
-
- //队列取名
- final static String YIHON_WQ_M = "topic.mqtt.data.yihon.wq.m";
-
-
- // 创建队列
- @Bean
- public Queue yihonWqmQueue() {
- return new Queue(MqttDataRabbitConfig.YIHON_WQ_M);
- }
-
- //Direct交换机 起名:directExchange
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange("directExchange");
- }
-
- //绑定 将队列和交换机绑定, 并设置用于匹配键:directExchange
- @Bean
- Binding bindingYihonWqmMessage(Queue yihonWqmQueue, DirectExchange directExchange) {
- return BindingBuilder.bind(yihonWqmQueue).to(directExchange).with("yihonWqm");
- }
-
- }
7,队列消费者
- package com.zxk.mqprovider.consumer;
-
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
-
- @Slf4j
- @Component
- @RabbitListener(queues = "topic.mqtt.data.yihon.wq.m")
- public class MqttDataRabbitHandle {
-
- @RabbitHandler
- public void process(String message) {
- log.info("MqttDataRabbitHandle消费者收到消息 : " + message);
- }
-
- }
8,测试(这里我用的调试毛通信助手,上一篇文章有说明:跳转)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。