当前位置:   article > 正文

RabbitMQ 常见使用场景_rabbitmq应用场景

rabbitmq应用场景

        概念

RabbitMQ 消息队列。消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦

1.0 消息队列

        1.1 消息

        消息包含消息体和标签,消息题表示带有业务逻辑的数据,如json数据,标签表示这这个消息的key,比如交换机名称和路由键。

        消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道

        1.2 队列

        RabbitMQ中的消息都只能存储在队列中,RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

        多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 CRound-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

2.0 交换机

        生产者将消息发送给交换器时, 需要一个RoutingKey。交换器和队列绑定时需要一个BindingKey。当BindingKey和RoutingKey相匹时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。

        BindingKey并不是在所有的情况下都能生效,它依赖于交换器类型。如fanout 类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

3.0 工作模式

        3.1 简单的工作模式一个生产者一个消费者

        生产者java代码实现

  1. //1.0 建立连接
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //设置mq服务器地址信息
  4. factory.setHost("172.1.1.183");
  5. factory.setPort(5672);
  6. factory.setVirtualHost("/fei");
  7. factory.setUsername("root");
  8. factory.setPassword("1234");
  9. // 2.0 建立连接
  10. Connection connection = factory.newConnection();
  11. // 3.0 建立通道
  12. Channel channel = connection.createChannel();
  13. //4.0 建立队列
  14. String queueName = "simple.queue";
  15. // 参数含义 队列名称,是否持久化,是否独占,没有消费是否丢弃,参数列表
  16. channel.queueDeclare(queueName, false, false, false, null);
  17. // 5.0 发送消息
  18. String message = "hello, word!";
  19. channel.basicPublish("", queueName, null, message.getBytes());
  20. System.out.println("发送消息成功:【" + message + "】");
  21. //5.0 关闭通道
  22. channel.close();
  23. connection.close();

消费者java代码的实现

  1. // 1.建立连接
  2. ConnectionFactory factory = new ConnectionFactory();
  3. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  4. factory.setHost("172.1.1.183");
  5. factory.setPort(5672);
  6. factory.setVirtualHost("/fei");
  7. factory.setUsername("root");
  8. factory.setPassword("1234");
  9. // 1.2.建立连接
  10. Connection connection = factory.newConnection();
  11. // 2.创建通道Channel
  12. Channel channel = connection.createChannel();
  13. // 3.创建队列
  14. String queueName = "simple.queue";
  15. channel.queueDeclare(queueName, false, false, false, null);
  16. // 4.订阅消息
  17. channel.basicConsume(queueName, true, new DefaultConsumer(channel){
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope,
  20. AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. // 5.处理消息
  22. String message = new String(body);
  23. System.out.println("接收到消息:【" + message + "】");
  24. }
  25. });
  26. System.out.println("等待接收消息。。。。");
  27. }

springboot集成rabbitmq代码实现

  1. @RestController
  2. @RequestMapping("/rabbitmq")
  3. public class RabbitMQController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @RequestMapping("/simpleMode")
  7. public ResponseResult simple(){
  8. // 参数1 队列名称 参数2 消息内容
  9. rabbitTemplate.convertAndSend("simpleMode","hello word");
  10. return new ResponseResult(200,"生产消息成功");
  11. }
  12. }

消费者

  1. @Component
  2. @RabbitListener(queuesToDeclare = @Queue("simpleMode"))
  3. public class SimpleConsumer {
  4. Logger logger =LoggerFactory.getLogger(SimpleConsumer.class);
  5. @RabbitHandler
  6. public void reception(String message){
  7. logger.info("简单模式接收到队列 simpleMode 消息为-->{}",message);
  8. }
  9. }

        3.2 工作队列 

        work queues 模型,创建一个工作队列,用于在多个工作者之间分配耗时的任务。也就是说,多个消费者可以消费同一个消息队列里面的消息,并且是通过轮询的形式,比如说有1,2,3,4这四个消息,按这个模型平均分配的原则,C1和C2分别可以消费两条消息,且都是按顺序的

        生产者消息

  1. @RequestMapping("/workQueue")
  2. public ResponseResult workQueue(){
  3. for(int i =0;i<20;i++){
  4. rabbitTemplate.convertAndSend("workQueue","work->"+i);
  5. logger.info("工作队列模式 向队列workQueue发送消息为-->work->{}",i);
  6. }
  7. return new ResponseResult(200,"生产消息成功");
  8. }

消费者

  1. @Component
  2. public class workQueueConsumer {
  3. Logger logger = LoggerFactory.getLogger(workQueueConsumer.class);
  4. @RabbitListener(queuesToDeclare = @Queue("workQueue"))
  5. public void workQueue1(String message){
  6. logger.info("工作队列模式,消费者1号收到workQueue队列消息为 --> {}",message);
  7. }
  8. @RabbitListener(queuesToDeclare = @Queue("workQueue"))
  9. public void workQueue2(String message){
  10. logger.info("工作队列模式,消费者2号收到workQueue队列消息为 --> {}",message);
  11. }
  12. }

        3.3 发布-订阅模型

        多个消费者可以同时接收到消息并进行消费

        生产者代码

  1. @RequestMapping("/fanoutQueue")
  2. public ResponseResult fanoutQueue(){
  3. for(int i =0;i<5;i++){
  4. rabbitTemplate.convertAndSend("fanout_exchange","","fanout+"+i);
  5. logger.info("发布订阅模式 向队列workQueue发送消息为-->fanout->{}",i);
  6. }
  7. return new ResponseResult(200,"生产消息成功");
  8. }

        交换机以及绑定代码

  1. @Configuration
  2. public class RabbitConfig {
  3. private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";
  4. private static final String FANOUT_QUEUE_A = "fanout_queue_a";
  5. private static final String FANOUT_QUEUE_B = "fanout_queue_b";
  6. private static final String FANOUT_QUEUE_C = "fanout_queue_c";
  7. // private final String FANOUT_QUEUE_NAME3 = "fanout_queue3";
  8. //创建交换机
  9. @Bean
  10. public FanoutExchange fanoutExchange()
  11. {
  12. return ExchangeBuilder.
  13. fanoutExchange(FANOUT_EXCHANGE_NAME) //交换机类型 ;参数为名字
  14. // .topicExchange(FANOUT_EXCHANGE_NAME)
  15. .durable(true)//是否持久化,true即存到磁盘,false只在内存上
  16. .build();
  17. }
  18. //创建队列
  19. @Bean
  20. public Queue fanoutA()
  21. {
  22. return new Queue(FANOUT_QUEUE_A);
  23. }
  24. @Bean
  25. public Queue fanoutB()
  26. {
  27. return new Queue(FANOUT_QUEUE_B);
  28. }
  29. @Bean
  30. public Queue queueC(){
  31. return new Queue(FANOUT_QUEUE_C);
  32. }
  33. // 将三个队列绑定到交换机上【队列订阅交换机】 ------------------------------------------------------------------
  34. @Bean
  35. Binding bindingExchangeA() {
  36. return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
  37. }
  38. @Bean
  39. Binding bindingExchangeB() {
  40. return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
  41. }
  42. @Bean
  43. Binding bindingExchangeC() {
  44. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  45. }

        消费者代码

  1. @Component
  2. public class FanoutConsumer {
  3. Logger logger = LoggerFactory.getLogger(FanoutConsumer.class);
  4. @RabbitListener(queues = "fanout_queue_a")
  5. public void fanoutMessage1(String message){
  6. logger.info("发布订阅模式,消费者1号收到fanout_queue队列消息为 --> {}",message);
  7. }
  8. @RabbitListener(queues = "fanout_queue_b")
  9. public void fanoutMessage2(String message){
  10. logger.info("发布订阅模式,消费者2号收到fanout_queue队列消息为 --> {}",message);
  11. }
  12. @RabbitListener(queues = "fanout_queue_c")
  13. public void fanoutMessage3(String message){
  14. logger.info("发布订阅模式,消费者3号收到fanout_queue队列消息为 --> {}",message);
  15. }
  16. }

        3.4 路由模式

        Routing 模型,也称直连模型,可以对消息指明路由键名称,消息队列绑定路由键名称,这样交换机就能根据路由键名称将消息匹配到相应的消息队列上。

        生产者代码

  1. /**
  2. * Routing 模型 路由模式
  3. * @return
  4. */
  5. @RequestMapping("/routing")
  6. public ResponseResult routing() {
  7. rabbitTemplate.convertAndSend("direct_exchange", "info","路由消息hello info!");
  8. rabbitTemplate.convertAndSend("direct_exchange", "dev","路由消息hello dev!");
  9. rabbitTemplate.convertAndSend("direct_exchange", "test","路由消息hello test!");
  10. logger.info("路由模式 向队列workQueue发送消息为-->route->{}","路由消息hello word!");
  11. return new ResponseResult(200,"路由生产者成功");
  12. }

        交换机绑定代码

  1. @Configuration
  2. public class RabbitConfig {
  3. private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";
  4. private static final String DIRECT_EXCHANGE_NAME = "direct_exchange";
  5. private static final String FANOUT_QUEUE_A = "fanout_queue_a";
  6. private static final String FANOUT_QUEUE_B = "fanout_queue_b";
  7. private static final String FANOUT_QUEUE_C = "fanout_queue_c";
  8. // private final String FANOUT_QUEUE_NAME3 = "fanout_queue3";
  9. //创建交换机
  10. @Bean
  11. public FanoutExchange fanoutExchange()
  12. {
  13. return ExchangeBuilder.
  14. fanoutExchange(FANOUT_EXCHANGE_NAME) //交换机类型 ;参数为名字
  15. // .topicExchange(FANOUT_EXCHANGE_NAME)
  16. .durable(true)//是否持久化,true即存到磁盘,false只在内存上
  17. .build();
  18. }
  19. @Bean
  20. public DirectExchange directExchange() {
  21. /**
  22. * directExchange的参数说明:
  23. * 1. 交换机名称
  24. * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
  25. * 3. 是否自动删除 false:不自动删除 true:自动删除
  26. */
  27. return ExchangeBuilder
  28. .directExchange(DIRECT_EXCHANGE_NAME)
  29. .durable(true)
  30. .build();
  31. }
  32. //创建队列
  33. @Bean
  34. public Queue fanoutA()
  35. {
  36. return new Queue(FANOUT_QUEUE_A);
  37. }
  38. @Bean
  39. public Queue fanoutB()
  40. {
  41. // 参数1队列名和 参数2否持久化
  42. return new Queue(FANOUT_QUEUE_B,true);
  43. }
  44. @Bean
  45. public Queue queueC(){
  46. return new Queue(FANOUT_QUEUE_C,true);
  47. }
  48. // 将三个队列绑定到交换机上【队列订阅交换机】 ------------------------------------------------------------------
  49. @Bean
  50. Binding bindingExchangeA() {
  51. return BindingBuilder.bind(fanoutA()).to(directExchange()).with("info");
  52. }
  53. @Bean
  54. Binding bindingExchangeB() {
  55. return BindingBuilder.bind(fanoutB()).to(directExchange()).with("test");
  56. }
  57. @Bean
  58. Binding bindingExchangeC() {
  59. return BindingBuilder.bind(queueC()).to(directExchange()).with("dev");
  60. }
  61. }

消费端

  1. @Component
  2. public class RoutingConsumer {
  3. Logger logger = LoggerFactory.getLogger(RoutingConsumer.class);
  4. @RabbitListener(queues = "fanout_queue_a")
  5. public void fanoutMessage1(String message){
  6. logger.info("路由模式,消费者a号收到fanout_queue队列消息为 --> {}",message);
  7. }
  8. @RabbitListener(queues = "fanout_queue_b")
  9. public void fanoutMessage2(String message){
  10. logger.info("路由模式,消费者b号收到fanout_queue队列消息为 --> {}",message);
  11. }
  12. @RabbitListener(queues = "fanout_queue_c")
  13. public void fanoutMessage3(String message){
  14. logger.info("路由模式,消费者c号收到fanout_queue队列消息为 --> {}",message);
  15. }
  16. }

        3.5 主题模式

        在Routing原有模型的基础上进行了增强,可以使用通配符的形式来匹配路由键。

        和路由模式差不多,只不过改边了一下路由键的通配符。这里不进行阐述。

4.0 消息的可靠性

        rabbitmq由 生产者→交换机→队列→消费者这些组成,消息丢失可以再这些部分传递或者使用中丢失,所以保证消息的可靠性可以从以下三点保证。

确认模式(confirm):可以监听消息是否从生产者成功传递到交换机。
退回模式(return):可以监听消息是否从交换机传递到队列。
消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

        三种模式刚好监听完RabbitMQ的一整套流程。即我们能够由这三种模式得到消息的传递及处理的结果。

        4.1 confirm 模式实现

        在配置文件中配置

publisher-confirm-type: correlated
  1. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  2. // 参数1 相关配置信息 参数2 交换机是否收到rabbitmq的消息 参数3 失败的原因
  3. @Override
  4. public void confirm(CorrelationData correlationData, boolean b, String s) {
  5. if(b){
  6. logger.info("生产者消息发送成功!!!");
  7. }else {
  8. logger.info("生产者消息发送到rabbitmq失败:原因为 -->{}",s);
  9. //失败后处理的业务逻辑
  10. }
  11. }
  12. });
  13. //消息进行发送
  14. rabbitTemplate.convertAndSend("fanout_exchange","","fanout+"+i);

        4.2 退回模式(return)

        配置文件配置

publisher-returns: true

        代码实现

  1. //定义退回模式的回调方法
  2. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  3. /**
  4. * @param returnedMessage 失败后将失败信息封装到参数中
  5. */
  6. @Override
  7. public void returnedMessage(ReturnedMessage returnedMessage) {
  8. logger.info("消息对象:{}",returnedMessage.getMessage());
  9. logger.info("错误码:{}",returnedMessage.getReplyCode());
  10. logger.info("错误信息:{}",returnedMessage.getReplyText());
  11. logger.info("交换机:{}",returnedMessage.getExchange());
  12. logger.info("路由键:{}",returnedMessage.getRoutingKey());
  13. //处理消息
  14. }
  15. });
  16. rabbitTemplate.convertAndSend("fanout_exchange","","fanout+"+i);
  17. logger.info("发布订阅模式 向队列workQueue发送消息为-->fanout->{}",i);

        4.3 消费者消息确认

        默认情况下是开启自动确认,当消费者出现异常,消息会在消息队列中清空,可以选择手动确认。

        配置文件中开启配置

listener:
  simple:
    acknowledge-mode: manual

        代码中实现手动确认

  1. Logger logger = LoggerFactory.getLogger(FanoutConsumer.class);
  2. @RabbitListener(queues = "fanout_queue_a")
  3. public void fanoutMessage1(Message message, Channel channel) throws InterruptedException, IOException {
  4. //消息投递序号,消息每次投递该值都会+1
  5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6. try
  7. {
  8. //int i = 1 / 0;//模拟处理消息出现bug
  9. logger.info("发布订阅模式,消费者1号收到fanout_queue队列消息为 --> {}",message);
  10. //签收消息
  11. /**
  12. * 参数1:消息投递序号
  13. * 参数2:是否一次可以签收多条消息
  14. */
  15. channel.basicAck(deliveryTag,true);
  16. }catch (Exception e)
  17. {
  18. System.out.println("消息消费失败");
  19. Thread.sleep(2000);
  20. //拒签消息
  21. /**
  22. * 参数1:消息投递序号
  23. * 参数2:是否可以一次拒签多条消息
  24. * 参数3:拒签后消息是否重回队列
  25. */
  26. channel.basicNack(deliveryTag,true,true);
  27. }
  28. }

5.0 高级特性

        5.1 消费端限流

        消费限流其实就是规定消费端获取消息的快慢限制。首先需要配置开启手动确认和消费端最多拉取消息的数量。

  1. listener:
  2. simple:
  3. #手动开启ack
  4. acknowledge-mode: manual
  5. #消费端最多拉取消息的数量
  6. prefetch: 10

        5.2 公平分发和不公平分发

由于消费者的处理消息速度不同,可能导致有些性能高的消费者处于等待状态,多个消息平均分发给消费者所以称为公平分发。

不公平分发则是消费者消费完可以继续消费消息。将prefetch设置为1即可。

        5.3 消息存活时间

当消息到达存活时间后还没有被消费,就会被移除队列。

消息存活时间又分为队列存活时间和单个消息存活时间。

队列存活时间,当QueueBuilder.ttl(时间)就可设置。

        5.4 优先队列

就是说在队列中指定可以设置的最大优先级,然后再对单条设置一个优先级数;可以在创建队列中设置,从new Queue(队列名)更改为QueueBuilder.maxpriority(10)设置返回队列。

        5.5 死信队列

死信队列和普通队列没有太大区别,只不过交换机更换为死信交换机deadLetterExchange。

死信队列用于:

1.队列消息长度到达限制。

2.消费者拒签消息,并且不把消息重新放回原队列。

3.消息到达存活时间未被消费。

        5.6 延迟队列

消息进入延迟队列不会立马进行消费,只有达到规定时间才能被消费。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/736868
推荐阅读
相关标签
  

闽ICP备14008679号