当前位置:   article > 正文

SpringBoot 整合MQTT之加入RabbitMQ消息队列处理_mqtt连接rabbitmq如何设置是什么虚拟路径

mqtt连接rabbitmq如何设置是什么虚拟路径

前言:mqtt客户到接收到订阅消息后,发布到rabbitmq消息队列处理业务,可以解决并发的问题

一、整体架构

二、实现代码

1,pom文件

  1. <dependencies>
  2. <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  3. <dependency>
  4. <groupId>com.alibaba</groupId>
  5. <artifactId>fastjson</artifactId>
  6. <version>1.2.68</version>
  7. </dependency>
  8. <!--mqtt依赖-->
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-integration</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.springframework.integration</groupId>
  15. <artifactId>spring-integration-stream</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.integration</groupId>
  19. <artifactId>spring-integration-mqtt</artifactId>
  20. </dependency>
  21. <!--rabbitmq-->
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-amqp</artifactId>
  25. </dependency>
  26. <!--lombok-->
  27. <dependency>
  28. <groupId>org.projectlombok</groupId>
  29. <artifactId>lombok</artifactId>
  30. <optional>true</optional>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-web</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-starter-test</artifactId>
  39. <scope>test</scope>
  40. </dependency>
  41. </dependencies>

 

2,yml文件

  1. server:
  2. port: 8021
  3. spring:
  4. #给项目来个名字
  5. application:
  6. name: rabbitmq-provider
  7. #配置rabbitMq 服务器
  8. rabbitmq:
  9. host: 127.0.0.1
  10. port: 5672
  11. username: guest
  12. password: guest
  13. #虚拟host 可以不设置,使用server默认host
  14. virtual-host: JCcccHost
  15. #MQTT-用户名
  16. mqtt:
  17. username: admin
  18. #MQTT-密码password: password
  19. password: public
  20. #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:1883
  21. url: tcp://127.0.0.1:1883
  22. #MQTT-连接服务器默认客户端ID
  23. client:
  24. id: mqttId
  25. #MQTT-默认的消息推送主题,实际可在调用接口时指定
  26. default:
  27. #订阅主题,多个用‘,’隔开,如:topic,topic1
  28. topic: topic,topic1
  29. #连接超时
  30. completionTimeout: 3000

3,RabbitConfig:

  1. package com.zxk.mqprovider.config;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Scope;
  12. @Configuration
  13. public class RabbitConfig {
  14. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  15. @Value("${spring.rabbitmq.host}")
  16. private String host;
  17. @Value("${spring.rabbitmq.port}")
  18. private int port;
  19. @Value("${spring.rabbitmq.username}")
  20. private String username;
  21. @Value("${spring.rabbitmq.password}")
  22. private String password;
  23. @Bean
  24. public ConnectionFactory connectionFactory() {
  25. CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
  26. connectionFactory.setUsername(username);
  27. connectionFactory.setPassword(password);
  28. connectionFactory.setVirtualHost("/");
  29. connectionFactory.setPublisherConfirms(true);
  30. return connectionFactory;
  31. }
  32. @Bean
  33. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  34. //必须是prototype类型
  35. public RabbitTemplate rabbitTemplate() {
  36. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  37. return template;
  38. }
  39. }

4,MqttReceiveClient客户端(mqtt)

  1. package com.zxk.mqprovider.mqtt;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.zxk.mqprovider.mqtt.handle.MqttReceiveHandle;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.integration.annotation.IntegrationComponentScan;
  11. import org.springframework.integration.annotation.ServiceActivator;
  12. import org.springframework.integration.channel.DirectChannel;
  13. import org.springframework.integration.core.MessageProducer;
  14. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  15. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  16. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  17. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  18. import org.springframework.messaging.Message;
  19. import org.springframework.messaging.MessageChannel;
  20. import org.springframework.messaging.MessageHandler;
  21. import org.springframework.messaging.MessagingException;
  22. /**
  23. *〈MQTT接收消息处理〉
  24. * **/
  25. @Slf4j
  26. @Configuration
  27. @IntegrationComponentScan
  28. public class MqttReceiveClient {
  29. @Autowired
  30. private MqttReceiveHandle mqttReceiveHandle;
  31. @Value("${spring.mqtt.username}")
  32. private String username;
  33. @Value("${spring.mqtt.password}")
  34. private String password;
  35. @Value("${spring.mqtt.url}")
  36. private String hostUrl;
  37. @Value("${spring.mqtt.client.id}")
  38. private String clientId;
  39. //订阅主题(#订阅主题,多个用‘,’隔开,如:topic,YIHON_WQ_M)
  40. @Value("${spring.mqtt.default.topic}")
  41. private String defaultTopic;
  42. @Value("${spring.mqtt.completionTimeout}")
  43. private int completionTimeout; //连接超时
  44. @Bean(value = "getMqttConnectOptions")
  45. public MqttConnectOptions getMqttConnectOptions() {
  46. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  47. mqttConnectOptions.setUserName(username);
  48. mqttConnectOptions.setPassword(password.toCharArray());
  49. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  50. //cleanSession 设为 true;当客户端掉线时;服务器端会清除 客户端session;重连后 客户端会有一个新的session,cleanSession
  51. // 设为false,客户端掉线后 服务器端不会清除session,当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
  52. mqttConnectOptions.setCleanSession(false);
  53. // 设置会话心跳时间 单位为秒
  54. mqttConnectOptions.setKeepAliveInterval(2);
  55. return mqttConnectOptions;
  56. }
  57. @Bean
  58. public MqttPahoClientFactory mqttClientFactory() {
  59. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  60. factory.setConnectionOptions(getMqttConnectOptions());
  61. return factory;
  62. }
  63. //接收通道
  64. @Bean
  65. public MessageChannel mqttInputChannel() {
  66. return new DirectChannel();
  67. }
  68. //配置client,监听的topic
  69. @Bean
  70. public MessageProducer inbound() {
  71. MqttPahoMessageDrivenChannelAdapter adapter =
  72. new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound2", mqttClientFactory(), defaultTopic.trim().split(","));
  73. adapter.setCompletionTimeout(completionTimeout);
  74. adapter.setConverter(new DefaultPahoMessageConverter());
  75. adapter.setQos(2);
  76. adapter.setOutputChannel(mqttInputChannel());
  77. return adapter;
  78. }
  79. //通过通道获取数据
  80. @Bean
  81. @ServiceActivator(inputChannel = "mqttInputChannel")
  82. public MessageHandler handler() {
  83. return new MessageHandler() {
  84. @Override
  85. public void handleMessage(Message<?> message) throws MessagingException {
  86. //处理消息
  87. mqttReceiveHandle.handle(message);
  88. }
  89. };
  90. }
  91. }

5, MqttReceiveHandle 客户端(mqtt)消息处理Handel

  1. package com.zxk.mqprovider.mqtt.handle;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.integration.mqtt.support.MqttHeaders;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * mqtt客户端消息处理类
  10. * **/
  11. @Slf4j
  12. @Component
  13. public class MqttReceiveHandle {
  14. //使用RabbitTemplate,这提供了接收/发送等等方法
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. public void handle(Message<?> message){
  18. log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());
  19. //发送到rabbit消息队列
  20. rabbitTemplate.convertAndSend("directExchange", "yihonWqm", (String) message.getPayload());
  21. }
  22. }

6,MqttDataRabbitConfig 创建队列

  1. package com.zxk.mqprovider.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class MqttDataRabbitConfig {
  10. //队列取名
  11. final static String YIHON_WQ_M = "topic.mqtt.data.yihon.wq.m";
  12. // 创建队列
  13. @Bean
  14. public Queue yihonWqmQueue() {
  15. return new Queue(MqttDataRabbitConfig.YIHON_WQ_M);
  16. }
  17. //Direct交换机 起名:directExchange
  18. @Bean
  19. DirectExchange directExchange() {
  20. return new DirectExchange("directExchange");
  21. }
  22. //绑定 将队列和交换机绑定, 并设置用于匹配键:directExchange
  23. @Bean
  24. Binding bindingYihonWqmMessage(Queue yihonWqmQueue, DirectExchange directExchange) {
  25. return BindingBuilder.bind(yihonWqmQueue).to(directExchange).with("yihonWqm");
  26. }
  27. }

7,队列消费者

  1. package com.zxk.mqprovider.consumer;
  2. import com.alibaba.fastjson.JSONObject;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Slf4j
  8. @Component
  9. @RabbitListener(queues = "topic.mqtt.data.yihon.wq.m")
  10. public class MqttDataRabbitHandle {
  11. @RabbitHandler
  12. public void process(String message) {
  13. log.info("MqttDataRabbitHandle消费者收到消息 : " + message);
  14. }
  15. }

8,测试(这里我用的调试毛通信助手,上一篇文章有说明:跳转

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/845640
推荐阅读
相关标签
  

闽ICP备14008679号