当前位置:   article > 正文

SpringBoot整合Rabbitmq(消息队列和rabbitmq的使用)_springboot集成mq监听队列消息

springboot集成mq监听队列消息

MQ(message  queue)的概念:

什么是mq:

在互联网架构中,mq是一种非常常见的上下游”逻辑解耦+物理解耦“的消息通信服务,使用了mq之后,消息发送上游只需要依赖mq,不需要依赖其他服务。

为什么要使用mq:

因为它的三大功能:流量消峰、应用解耦、异步处理

简单举例:

流量消峰:订单系统在平缓期最多处理10000单,但是高峰期会超过这个值,就会使订单系统宕机,所以我们在人点单和订单系统出现订单的中间添加一个mq作为使用消息队列做缓存,这样虽然人下单后出现订单成功会慢一些,但是总比订单系统宕机好使!

应用解耦:订单系统包含在电商应用中,除此之外还有物流系统,支付系统、库存系统,当用户创建后,如果耦合调用物流系统,支付系统、库存系统,任何一个子系统出现故障,都会使下单操作异常,这是我们在中间添加一个mq消息队列,系统调用的问题就会减少,子系统一旦出现故障,需时间修复,在这特殊时间里,子系统的消息会缓存在消息队列中,订单不会出现异常正常执行。

异步处理:在这里使用mq,会节省时间,提高系统的应用性,当a调用b服务后,只需要监听b处理完成的消息,b处理完成后,发送一个消息到mq,mq将消息转发给a服务,这样使a及时得到异步处理成功的消息

mq的分类和选择:

ActiveMQ 、kfaka(为大数据而生的消息中间件,适合大量数据互联网服务的收集业务,适合大公司)、RocketMQ (适合金融互联网领域)、RabbitMQ(当前最主流的消息中间件之一,性能较好,支持多种语言,mq功能完备,稳定、跨平台、开源提供的管理界面非常好,社区活跃度高,更新频率高,适合数据量不那么大,中小型公司)

RabbitMQ的概念

什么是RabbitMQ

RabbitMQ是一个消息中间件:它接受、存储和转发数据
四大核心概念:

生产者、交换机、队列、消费者

RabbitMQ的安装

RabbitMQ的初步使用 hello world

1.配置pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.7.15</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.example</groupId>
  12. <artifactId>springboot_rabbitmq_demo</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>springboot_rabbitmq_demo</name>
  15. <description>springboot_rabbitmq_demo</description>
  16. <properties>
  17. <java.version>8</java.version>
  18. </properties>
  19. <dependencies>
  20. <!-- rabbitmq依赖客户端-->
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-amqp</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-web</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>
  34. </dependencies>
  35. <build>
  36. <plugins>
  37. <plugin>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-maven-plugin</artifactId>
  40. <version>2.7.15</version>
  41. </plugin>
  42. </plugins>
  43. </build>
  44. </project>
2.修改配置文件 
  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=25672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=123456
3.生产者代码
  1. /**
  2. * 生产者代码
  3. */
  4. public class Producer {
  5. //队列
  6. public static final String QUEUE_NAME="HelloWorld";
  7. //发送消息
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //创建一个连接工厂
  10. ConnectionFactory factory=new ConnectionFactory();
  11. //工厂ip连接rabbitmq的队列
  12. factory.setHost("localhost");
  13. factory.setPort(25672);
  14. //用户名
  15. factory.setUsername("admin");
  16. //密码
  17. factory.setPassword("123456");
  18. //创建连接
  19. Connection connection = factory.newConnection();
  20. //获取信道
  21. Channel channel = connection.createChannel();
  22. /**
  23. * 生产一个队列
  24. * com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare
  25. * (String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
  26. * 1.队列名称
  27. * 2.队列里面的消息是否持久化,默认消息存放在内存中
  28. * 3.该队列是否只供一个消费者进行消费,是否进行消费共享,true:表示可以多个消费者消费 false::表示只能一个消费者消费
  29. * 4.是否自动删除 true自动删除,false不自动删除
  30. * 5.其它参数
  31. */
  32. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  33. //发消息
  34. String message="hello world!";
  35. /**
  36. * void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
  37. * 发送一个消息
  38. * 1.发送到哪个交换机
  39. * 2.路由的key值
  40. * 3.其他的参数值
  41. * 4.发送消息的消息体
  42. */
  43. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  44. System.out.println( "消息发送完毕");
  45. }
  46. }

 4.消费者代码

  1. /**
  2. * 消费者代码
  3. */
  4. public class Consumer {
  5. //队列
  6. public static final String QUEUE_NAME = "HelloWorld";
  7. //接受消息
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //创建一个连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. //工厂ip连接rabbitmq的队列
  12. factory.setHost("localhost");
  13. factory.setPort(25672);
  14. //用户名
  15. factory.setUsername("admin");
  16. //密码
  17. factory.setPassword("123456");
  18. //创建连接
  19. Connection connection = factory.newConnection();
  20. //获取信道
  21. Channel channel = connection.createChannel();
  22. /**
  23. * 消费者消费队列
  24. * 1.消费哪个队列
  25. * 2.消费成功后是否要自动应答,true:自动应答,false:手动应答
  26. * 3.消费者未成功消费的回调
  27. * 4.消费者取消消费的回调
  28. */
  29. //监听队列消息, 如果有消息则会回调客户端
  30. channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel){
  31. //处理消息函数
  32. public void handleDelivery(String consumerTag,
  33. Envelope envelope,
  34. AMQP.BasicProperties properties,
  35. byte[] body)
  36. throws IOException
  37. {
  38. channel.basicAck(envelope.getDeliveryTag(),false);
  39. //我们收到消息后打印消息
  40. System.out.println( new String( body ) );
  41. }
  42. });
  43. }
  44. }
4.结果展示:

 

 RabbitMQ中的TTL以及延迟队列的优化

TTL队列的配置类

  1. @Configuration
  2. public class TtlQueueConfig {
  3. //普通交换机
  4. @Bean("commonExchange")
  5. public DirectExchange commonExchange(){
  6. return new DirectExchange("commonExchange");
  7. }
  8. //普通队列
  9. @Bean("commonQueue")
  10. public Queue commonQueueA(){
  11. HashMap<String, Object> args = new HashMap<>(3);
  12. args.put("x-dead-letter-exchange","deadExchange");
  13. args.put("x-dead-letter-routing-key","dead");
  14. args.put("x-message-ttl",15000);
  15. return QueueBuilder.durable("commonQueue").withArguments(args).build();
  16. }
  17. //死信交换机
  18. @Bean
  19. public DirectExchange deadExchange(){
  20. return new DirectExchange("deadExchange");
  21. }
  22. //死信队列
  23. @Bean("deadQueue")
  24. public Queue deadQueue(){
  25. return QueueBuilder.durable("deadQueue").build();
  26. }
  27. /**
  28. *关于ttl队列的优化
  29. */
  30. @Bean("optimizeQueue")
  31. public Queue commonQueueB(){
  32. HashMap<String, Object> args = new HashMap<>(3);
  33. args.put("x-dead-letter-exchange","deadExchange");
  34. args.put("x-dead-letter-routing-key","dead");
  35. return QueueBuilder.durable("optimizeQueue").withArguments(args).build();
  36. }
  37. @Bean
  38. public Binding optimizeQueueBinding(@Qualifier("optimizeQueue") Queue optimizeQueue, @Qualifier("commonExchange") DirectExchange commonExchange){
  39. return BindingBuilder.bind(optimizeQueue).to(commonExchange).with("optimize");
  40. }
  41. //绑定普通队列和交换机
  42. @Bean
  43. public Binding commonQueueBinding(@Qualifier("commonQueue") Queue commonQueue, DirectExchange commonExchange){
  44. return BindingBuilder.bind(commonQueue).to(commonExchange).with("common");
  45. }
  46. //绑定死信队列和交换机
  47. @Bean
  48. public Binding deadQueueBinding(@Qualifier("deadQueue") Queue deadQueue, DirectExchange deadExchange){
  49. return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
  50. }
  51. }

TTL队列的生产者

  1. /**
  2. * 生产者代码
  3. */
  4. @Slf4j
  5. @RestController
  6. @RequestMapping("/ttl")
  7. public class SendMessageController {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. //开始发消息
  11. @GetMapping("/sendMsg/{message}")
  12. public void sendMsg(@PathVariable String message){
  13. log.info("当前时间:{},发送一个消息给一个ttl队列:{}",new Date().toString(),message);
  14. rabbitTemplate.convertAndSend("commonExchange","common","消息来自ttl为15s的队列:"+message);
  15. }
  16. /**
  17. * 优化
  18. * @param message
  19. * @param ttlTime
  20. */
  21. @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
  22. public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
  23. log.info("当前时间:{},发送一条时长:{}毫秒,ttl信息给optimize队列:{}",
  24. new Date().toString(),ttlTime,message);
  25. rabbitTemplate.convertAndSend("commonExchange","optimize",message,msg->{
  26. msg.getMessageProperties().setExpiration(ttlTime);
  27. return msg;
  28. });
  29. }
  30. }

TTL队列的消费者

  1. /**
  2. * 消费者代码
  3. */
  4. @Slf4j
  5. @Component
  6. public class DeadLetterQueueConsumer {
  7. @RabbitListener(queues = "deadQueue")
  8. public void receive(Message message, Channel channel)throws Exception{
  9. String msg=new String(message.getBody());
  10. log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
  11. }
  12. }

结果展示

 

问题 

springboot集成RabbitMq异常 Channel shutdown: channel error; protocol method

问题1

[CachingConnectionFactory.java:1567]- Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

解决:

  1. #消费端配置 去掉自动签收功能 #自动签收auto 手动 manual
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual

 问题2

Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid expiration ' ': no_integer, class-id=60, method-id=40)

1、如果队列设置的是客户端是自动创建的,直接删除队列。

2、如果客户端没有配置自动创建队列的话,手动去MQ客户端创建队列,并且设置对应的TTL值。

3、 你所设置的TTL值要真实存在,不能为空

RabbitMQ的发布确认高级

配置类代码

  1. @Configuration
  2. public class ConfirmConfig {
  3. @Bean
  4. public DirectExchange directExchange(){
  5. return new DirectExchange("com.lc.direct.demo");
  6. }
  7. @Bean("directQueueA")
  8. public Queue directQueueA(){
  9. return new Queue("directQueueA");
  10. }
  11. @Bean("directQueueB")
  12. public Queue directQueueB(){
  13. return new Queue("directQueueB");
  14. }
  15. @Bean
  16. public Binding getBindingDirectQueueA(@Qualifier("directQueueA") Queue directQueueA, DirectExchange directExchange){
  17. return BindingBuilder.bind(directQueueA).to(directExchange).with("北京");
  18. }
  19. @Bean
  20. public Binding getBindingDirectQueueB(@Qualifier("directQueueB") Queue directQueueB, DirectExchange directExchange){
  21. return BindingBuilder.bind(directQueueB).to(directExchange).with("哈尔滨");
  22. }
  23. }

生产者代码

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/confirm")
  4. public class ProducerController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @GetMapping("/sendMessage/{messsage}")
  8. public void sendMessageA(@PathVariable String messsage){
  9. rabbitTemplate.convertAndSend("com.lc.direct.demo","北京",messsage);
  10. log.info("发送消息内容:{}",messsage);
  11. }
  12. @GetMapping("/sendMessage/{messsage}")
  13. public void sendMessageB(@PathVariable String messsage){
  14. rabbitTemplate.convertAndSend("com.lc.direct.demo","哈尔滨",messsage);
  15. log.info("发送消息内容:{}",messsage);
  16. }
  17. }

消费者代码

  1. @Slf4j
  2. @Component
  3. public class DirectConsumer {
  4. @RabbitListener(queues = "directQueueA")
  5. public void receiveConfirmMessageA(Message message){
  6. String msg = new String(message.getBody());
  7. log.info("接收到的队列消息:{}",msg);
  8. }
  9. @RabbitListener(queues = "directQueueB")
  10. public void receiveConfirmMessageB(Message message){
  11. String msg = new String(message.getBody());
  12. log.info("接收到的队列消息:{}",msg);
  13. }
  14. }

结果展示:

遇到的问题:

1.springboot项目使用spring-boot-starter-amqp连接rabbitmq时出现报错
: Failed to check/redeclare auto-delete queue(s).

 这里我是没有开启RabbitMQ,直接连接,当然报错了

2.创建队列进行消息发送的时候报错
 Queue declaration failed; retries left=3

只监听了队列,但是没有在config中配置,又或者是链接错误写错符号或者有重复相同连接

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/618629
推荐阅读
相关标签
  

闽ICP备14008679号