当前位置:   article > 正文

RabbitMQ快速入门_rabbitmq多个消费者绑定同一个队列

rabbitmq多个消费者绑定同一个队列

RabbitMQ

1. 快速入门

1.控制台创建队列simple.queue

2.利用SpringAMQP直接像simple.queue发送消息

3.利用SpringAMQP编写消费者,监听simple.queue队列

2. Work模型

多个消费者绑定同一个队列,可以加快消息处理速度

同一条消息只会被一个消费者处理、

通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

3. 交换机的作用是什么?

接收publisher发送的消息

将消息按照规则路由到与之绑定的队列

fanoutExchange会将消息路由到每个绑定的队列

发送消息到交换机的API

// 发送消息到交换机,参数分别是:交换机名称,RoutingKey,消息
	rabbitTemplate.convertAndSend(exchangeName,"",message)
	
//发送消息到队列,参数为:队列名称。消息
	rabbitTemplate.convertAndSend(queue,message)
  • 1
  • 2
  • 3
  • 4
  • 5

3.1 Direct交换机

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此被称为定向路由

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

3.2 Direct交换机与Fanout交换机的区别

Fanout交换机将消息路由给每一个与之绑定的队列

Direct交换机根据RoutingKey判断路由给哪个队列

如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.3 Topic交换机

TopicExchange也是基于RoutingKey做消息路由,但是routingKey通常是多个单词的组合,并且以“.”分割。

Queue与Exchange指定BindingKey时可以使用通配符

“#”:代指0个或多个单词

“*”:代指一个单词

3.4 声明队列和交换机

Queue:用于声明队列,可以用工厂类QueueBulider构建

Exchange:用于声明交换机,可以用工厂类ExchangeBulider构建

Binding:用于声明队列和交换机的关系,可以用工厂类BindingBulider构建

代码演示:

@Configuration
public class FanoutConfiguration {
    
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        //return new FanoutExchange("hmall.fanout");
        return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    }
    
    
    //声明第一个队列
    @Bean
    public Queue fanoutQueue1() {
        //return new FanoutExchange("fanout.queue1");
        return QueueBuilder.durable("fanout.queue1").build();
    }
    //绑定第一个队列到交换机
    @Bean
    public Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    
    
    //声明第二个队列
    @Bean
    public Queue fanoutQueue2() {
        //return new FanoutExchange("fanout.queue2");
        return QueueBuilder.durable("fanout.queue2").build();
    }
    //绑定第二个队列到交换机
    @Bean
    public Binding fanoutQueue2Binding(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

3.5 基于注解声明队列和交换机

  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2", durable = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) {
        log.info("消费者2监听到direct2.queue消息:【{}】", msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. 消息转换器

使用JSON序列化代替默认的JDK序列化

1.引入jackson依赖

 <!--Jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.配置消息转换器

    @Bean
    public MessageConverter jacksonMessageConvertor() {
        return new Jackson2JsonMessageConverter();
    }
  • 1
  • 2
  • 3
  • 4

5. 实战操作步骤

1.导入mq依赖

<!--        AMQP-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.配置RabbitMQ

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 123
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.配置消息转换器

@Configuration
public class MqConfig {
    
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4.在spring.factories配置扫描路径

  com.hmall.common.config.MqConfig,\
  • 1

5.编写消费者,声明队列、交换机以及key

@Component
@RequiredArgsConstructor
public class PayStatusListener {
    
    private final IOrderService orderService;
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(name = "pay.direct"),
            key = "pay.success"
    ))
    public void listenerPaySuccess(Long orderId) {
        orderService.markOrderPaySuccess(orderId);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

6.发送消息

​ 1.引入RabbltTemplate

    private final RabbitTemplate rabbitTemplate;		//构造方法注入
    
    
    @Autowired
    private RabbitTemplate rabbitTemplate;	//注解注入
  • 1
  • 2
  • 3
  • 4
  • 5

​ 2.发送消息

        try {
            rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getPayOrderNo());
            
        } catch (Exception e) {
            log.error("发送支付状态通知失败,订单id:【{}】",po.getPayOrderNo(),e);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

6.消息可靠性

6.1 消息丢失的原因

  1. 网络故障
  2. RabbitMQ服务故障
  3. 业务服务故障

6.2 解决方案

6.2.1 发送者重连
spring:
  rabbitmq:
    host: 127.0.0.1 #IP地址
    port: 5672	#端口
    virtual-host: /hmall 
    username: hmall 
    password: 123
    connection-timeout: 1s  # 连接超时时间
    template:
      retry:
        multiplier: 2  #失败后下次等待时长倍数
        max-attempts: 3  # 最大重试次数
        initial-interval: 1s # 第一次重试时间
        enabled: true # 是否开启重试
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

注意:

​ 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试(同步),也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能

​ 如果对业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息的代码

6.2.2 发送者确认

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有一下几种情况

  • 消息投递到了MQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息(不持久化消息)投递到MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其他情况都会返回NACK,告知投递失败

操作步骤

1.添加配置

spring:
	rabbitmq:
		publisher-confirm-type:correlated #开启publisher confirm机制,并设置confirm类型
		publisher-returns: true # 开启publisher-return机制
  • 1
  • 2
  • 3
  • 4

配置说明:

  • publisher-confitm-type有三种模式可选
    • none: 关闭confirm机制
    • simple:同步阻塞等待MQ的回执消息
    • correlated:MQ异步回调方式返回回执消息

2.添加ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置

@Configuration
@Slf4j
@AllArgsConstructor
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init(){
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            log.error("触发return Callback");
            log.debug("exchange:{}", returned.getExchange());
            log.debug("routingKey:{}", returned.getRoutingKey());
            log.debug("replyCode:{}", returned.getReplyCode());
            log.debug("replyText:{}", returned.getReplyText());
            log.debug("message:{}", returned.getMessage());
        }
    });
    }
 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.发送消息,指定消息ID、消息ConfirmCallback

 @Test
    void testPublisherConfirm() throws InterruptedException{
        //1.创建CorrelationData
        CorrelationData cd = new CorrelationData();
        //2.给Future添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                //2.1 Future发生异常时的处理逻辑,基本不会触发
                log.error("handle message ack fail",ex);
        
            }
    
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                //2.2 Future接收到回执的处理逻辑,参数中的result就是回执内容
                if(result.isAck()){
                log.debug("发送消息成功,收到ack");
                }else{
                log.error("发送消息失败,收到nack,reason:{}",result.getReason());
                }
        
            }
        });
        //3.发送消息
        rabbitTemplate.convertAndSend("hmall.direct","red1","hello",cd);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

7. MQ可靠性

在默认情况下,RabbitMQ将接收到的消息保存在内存中以降低收发消息的延迟。这样会导致两个问题

  • 一旦MQ宕机,内存中的消息将会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

7.1 解决方案

7.1.1 数据持久化

RabbitMQ实现数据持久化包括的三个方面

  • 交换机持久化
  • 队列持久化
  • 消息持久化
7.1.2 Lazy Queue(惰性队列)

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列

惰性队列有一下特征:

  • 接收到消息直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)

在3.12版本后,所有队列都是Lazy Queue模式,无法更改

7.1.3 添加Lazy Queue

​ 要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可

控制台添加

​ 点击 Lazy mode 后,会在Arguments添加一条数据,这时表示当前队列为Layz Queue队列
在控制台添加

2.Java代码添加

​ 声明Bean

    @Bean
    public Queue layzQueue() {
        return QueueBuilder
                .durable("lazy.queue")
                .lazy() //开启Lazy模式
                .build();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

​ 注解

    //注解声明LazyQueue
    @RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            //队列类型
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void ListenLazyQueue(String msg){
        log.info("接收到lazy.queue的消息:{}",msg);
    } 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

8.总结- RabbitMQ如何保证消息的可靠性

  • 首先通过配置让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
  • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会成为队列的默认模式并且不可修改。LazyQueue会将所有的消息都持久化。
  • 开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/1002614
推荐阅读
相关标签
  

闽ICP备14008679号