当前位置:   article > 正文

SpringBoot-RabbitMQ

SpringBoot-RabbitMQ

码云

一、安装并下载 RabbitMQ

RabbitMq

二、Spring boot 整合 RabbitMQ

主题模式Topic是使用最多的模式,也是4种模式的最终版

消息发送到 交换机 --交换机下有不同的路由–不同的路由下有不同的队列
将消息发送到 队列中,消费者监听队列。获取相应的消息

1、pom文件

        <!-- RabbitMQ-AMQP依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2、yaml配置


spring:
  #RabbitMQ
 rabbitmq:
    #服务器地址
   host: 127.0.0.1
    #用户名
   username: guest
    #密码
   password: guest
    #虚拟主机
   virtual-host: /
    #端口
   port: 5672
   listener:
     simple:
        #消费者最小数量
       concurrency: 10
        #消费者最大数量
       max-concurrency: 10
        #限制消费者每次只处理一条消息,处理完再继续下一条消息
       prefetch: 1
        #启动时是否默认启动容器,默认true
       auto-startup: true
        #被拒绝时重新进入队列
       default-requeue-rejected: true
   template:
     retry:
        #发布重试,默认false
       enabled: true
        #重试时间 默认1000ms
       initial-interval: 1000
        #重试最大次数,默认3次
       max-attempts: 3
        #重试最大间隔时间,默认10000ms
       max-interval: 10000
        #重试间隔的乘数。比如配2.0 第一次等10s,第二次等20s,第三次等40s
       multiplier: 1.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3、配置类
配置一个 MQ队列用来接收消息
配置 队列:名称 queue 并且持久化(队列和消息都要持久化)

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RabbitMq配置类
 *
 * 准备 队列
 *
 */
@Configuration
public class RabbitMqConfig {

    // 配置 队列:名称 queue  并且持久化(队列和消息都要持久化)
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

1、 简单模式

向队列发送消息,监听此队列获取消息

消息发送者【生产者】

/**
 * 消息发送者
 *
 */
@Service
@Slf4j
public class MqSender {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("发送消息:" + msg);
        // 往队列中发送 msg消息
        rabbitTemplate.convertAndSend("queue",msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消息接收者【消费者】

/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "queue")
    public void receive(Object msg){
      log.info("接收消息:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

测试请求

    @Autowired
    MqSender mqSender;

    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){
        mqSender.send("小白兔");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

运行项目

在RabbitMq的可视化页面,可以看到 有一个长连接(接收者要时刻监听消息)

在这里插入图片描述

有多个通道

在这里插入图片描述

自定义的队列

在这里插入图片描述

调用接口:
在这里插入图片描述
点击页面的队列,可以看到
在这里插入图片描述
表示 消息 生成 和消息被消费了

2、广播模式(fanout)

创建交换机并在它下绑定两个队列,向交换机发送消息,监听两个队列的消费者都可以获取消息

1、MQ配置类
创建两个队列 和一个交换机,并将队列绑定到交换机上

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * RabbitMq配置类
 *
 * 准备 队列
 *
 */
@Configuration
public class RabbitMqConfig {

    // 广播模式 :准备两个队列 一个交换机
    private static final String QUEUE01 = "queue-fanout01";
    private static final String QUEUE02 = "queue-fanout02";
    private static final String EXCHANGE = "fanoutExchange";

    // 创建 队列一
    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01,true);
    }

    // 创建 队列2
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02,true);
    }

    // 创建 交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE,true,false);
    }

    // 将队列绑定到交换机上
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(fanoutExchange());
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(fanoutExchange());
    }




//    // 配置 队列:名称 queue  并且持久化(队列和消息都要持久化)
//    @Bean
//    public Queue queue(){
//        return new Queue("queue",true);
//    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

2、消息发送者【生产者】

将消息发送到 交换机上,路由为空

/**
 * 消息发送者
 *
 */
@Service
@Slf4j
public class MqSender {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("发送消息:" + msg);
        // 将消息发送到交换机
     rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3、消息接收者【消费者】

监听两条队列的消息

/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "queue-fanout01")
    public void receive01(Object msg){
      log.info("01接收消息:" + msg);
    }

    @RabbitListener(queues = "queue-fanout02")
    public void receive02(Object msg){
      log.info("02接收消息:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4、测试类

    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){
        mqSender.send("小白兔");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

运行项目
在MQ可视化页面可以看到创建的交换机
在这里插入图片描述
点击交换机。可以看到 绑定的两个 队列
在这里插入图片描述
并且 队列目录下也有这两个队列
在这里插入图片描述
运行结果:交换机下的两个队列都接收到了消息
在这里插入图片描述

3、direct模式[路由模式]

消息发送到交换机并指定 路由键,就会发送对应的队列上

如:发送到X交换机 ,指定black路由键,消息会到 Q2队列中,由监听该队列的消费者获取消息
在这里插入图片描述

1、配置类

创建两个队列,一个交换机,两个路由,将队列绑定到交换机的路由key上

package com.example.seckill.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * RabbitMq配置类
 *
 * 准备 队列
 *
 */
@Configuration
public class RabbitMqConfig {

    // 广播模式 :准备两个队列 一个交换机
    private static final String QUEUE01 = "queue-direct01";
    private static final String QUEUE02 = "queue-direct02";
    private static final String EXCHANGE = "directExchange";
    private static final String ROUTINGKEY01 = "queue.red";
    private static final String ROUTINGKEY02 = "queue.green";


    // 创建 队列一
    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01,true);
    }

    // 创建 队列2
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02,true);
    }

    // 创建 交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE,true,false);
    }

    // 将队列绑定到交换机上 并指定路由键
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

2、消息发送者【生产者】
消息发送到 指定交换机的指定 路由key上

package com.example.seckill.rabbit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 消息发送者
 *
 */
@Service
@Slf4j
public class MqSender {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    public void send01(Object msg){
        log.info("发送消息:" + msg);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend("directExchange","queue.red",msg);
    }

    public void send02(Object msg){
        log.info("发送消息:" + msg);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend("directExchange","queue.green",msg);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

3、消息接收者【消费者】
监听队列,获取队列的消息


/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "queue-direct01")
    public void receive01(Object msg){
      log.info("01接收消息:" + msg);
    }

    @RabbitListener(queues = "queue-direct02")
    public void receive02(Object msg){
      log.info("02接收消息:" + msg);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4、测试:

    @Autowired
    MqSender mqSender;

    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){
        mqSender.send01("小白兔");
        mqSender.send02("白又白");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

运行项目

在可视化页面可以看到创建的 交换机,点进去,可以看到绑定的两个队列、以及队列对应的路由
在这里插入图片描述
在这里插入图片描述
运行结果
在这里插入图片描述
01发送小白兔,01接收小白兔

4、Topic模式[主题模式]

主题模式是在路由模式的基础上,对路由key做了通配符匹配,以满足复杂的消息分发场景
在这里插入图片描述
注意:
‘#’ :匹配0个或者多个
‘*’:匹配一个
如:#.red: 可以匹配:a.red、a.b.red等
*.red:可以匹配a.red,不能匹配a.b.red

1、配置类
使用通配符定义 路由key
key1:#.queue.# —匹配路由中带有.queue 的所有路由
key2:*.queue.# — 匹配 类似:a.queue的所有路由

package com.example.seckill.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * RabbitMq配置类
 *
 * 准备 队列
 *
 */
@Configuration
public class RabbitMqConfig {

    // 广播模式 :准备两个队列 一个交换机
    private static final String QUEUE01 = "queue-topic01";
    private static final String QUEUE02 = "queue-topic02";
    private static final String EXCHANGE = "topicExchange";

    private static final String ROUTINGKEY01 = "#.queue.#";
    private static final String ROUTINGKEY02 = "*.queue.#";


    // 创建 队列一
    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01,true);
    }

    // 创建 队列2
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02,true);
    }

    // 创建 交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(EXCHANGE,true,false);
    }

    // 将队列绑定到交换机上 并指定路由键
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

2、消息发送者【生产者】

01发送的路由为 queue.red.message,匹配的是 路由key1
02发送的路由是 a.queue.green 匹配的是 路由key1和key2
所以 02 发送的消息到了 queue01 和 queue 02两个队列

/**
 * 消息发送者
 *
 */
@Service
@Slf4j
public class MqSender {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    public void send01(Object msg){
        log.info("发送消息:" + msg);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
    }

    public void send02(Object msg){
        log.info("发送消息:" + msg);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend("topicExchange","a.queue.green",msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3、消息接收者【消费者】

/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "queue-topic01")
    public void receive01(Object msg){
      log.info("01接收消息:" + msg);
    }

    @RabbitListener(queues = "queue-topic02")
    public void receive02(Object msg){
      log.info("02接收消息:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4、测试:

    @Autowired
    MqSender mqSender;

    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){

        mqSender.send01("小白兔");
        mqSender.send02("白又白");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

结果:
在这里插入图片描述

三、RabbitMq 的消息丢失原因及解决方法

1、原因

在这里插入图片描述
分为三部分:

  1. 生产者将消息发送给 MQ服务器的过程中,因为网络原因,造成消息丢失 【回调函数confirm】
  2. MQ接收到生产者的消息后,还没有持久化到磁盘,MQ服务器宕机了,造成消息丢失 【持久化】
  3. 消费者从MQ中获取消息的过程中,系统挂了,MQ认为消息已经被消费了,造成消息丢失 【确认机制】

2、解决方法

消息确认、回调、重试机制

1、生产者发送消息,得到MQ的回复 消费者收到消息,反馈MQ确认

1、消息体类:

/**
 * MQ消息数据模型
 */
@Data
public class MQParam implements Serializable {
    private Long id;
    private String name;
    private String messageId;//储存消息发送的唯一标识


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2、消息配置类

/**
 * RabbitMq配置类
 *
 * 准备 队列
 *
 */
@Configuration
public class RabbitMqConfig {

    // 广播模式 :准备两个队列 一个交换机
    private static final String QUEUE01 = "queue-topic01";
    private static final String QUEUE02 = "queue-topic02";
    private static final String EXCHANGE = "topicExchange";

    private static final String ROUTINGKEY01 = "#.queue.#";
    private static final String ROUTINGKEY02 = "*.queue.#";


    // 创建 队列一
    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }

    // 创建 队列2
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }

    // 创建 交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(EXCHANGE);
    }

    // 将队列绑定到交换机上 并指定路由键
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

3、发送消息【生产者】
首先定义 回调函数,让生产者知道 消息成功发送到了MQ服务器

注意: :RabbitTemplate是spring生成的Bean,是单例的,这设置一次回调后,其他使用默认RabbitTemplate的发送者发送消息都会触发这个回调

package com.example.seckill.rabbit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 发送消息
 */
@Service
@Slf4j
public class MqSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
     * @param correlationData  相关配置信息
     * @param b exchange 交换机,判断交换机是否成功收到消息    true 表示交换机收到
     * @param s  失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("data:" + correlationData);
        log.info("---- confirm ----ack="+b+"  cause="+String.valueOf(s));
        if (b){
            System.out.println("消息被 MQ 接收");
        }else {
            System.out.println("消息没有被MQ接收");
        }
    }
    /**
     * 交换机并未将数据丢入指定的队列中时,触发
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  参数三:true  表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
     * @param returnedMessage   消息对象

     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("----交换机并未将数据丢入指定的队列中----replyCode="+returnedMessage.getReplyCode()+" replyText="+returnedMessage.getReplyText()+" ");
    }

    // 发送消息
    public void send01(MQParam param) {

        // 设置交换机处理失败消息的模式     true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
        // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
        rabbitTemplate.setMandatory(true);
        // MQ收到消息后,手动ack回执
        rabbitTemplate.setConfirmCallback(this);
        // return 配置
        rabbitTemplate.setReturnsCallback(this);

        // 设置消息的唯一ID
        CorrelationData data = new CorrelationData();
        data.setId(param.getMessageId());

        log.info("发送消息:" + param);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend(
                "topicExchange", // 交换机
                "queue", // 路由
                param, // 消息体内容
                data // 消息唯一Id
        );
    }



}



}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

ConfirmCallback实现判断消息是否到交换机
ReturnCallback,实现判断 消息到达交换机后是否到队列中

4、消费者

消费者可以获取 消息体和消息头部信息
使用:channel.basicAck(tag,false); 方法,告知MQ,消息已经被消费
tag:是这个消息的tagID,false:只确认当前的消息收到,true:确认所有的消息收到

package com.example.seckill.rabbit;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;


/**
 * 消息接收者
 */
/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "queue-topic01")
    @RabbitHandler
    public void receive01(@Payload MQParam param,
                          @Headers Map<String,Object> headers,
                          Channel channel) throws IOException {
        System.out.println("-----收到消息了-----");
        System.out.println("---发过来的用户是:" + param);
        System.out.println("---发过来的heard是:" + headers);
        /**
         * basicAck:表示确认已经消费消息,通知MQ,需要先得到  deliveryTag
         * deliveryTag 从消息头里get到
         */
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(tag,false);
    }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

5、yaml配置回调

publisher-confirm-type: correlated
  • 1

2、消费者重试

1、如果消费者发送了ack,MQ将把这个消息从待确认中删除,
2、如果消费者发送了nack,并指定不重入队列,此消息会被删除,
3、 重试:如果消费者发送了nack,指定重入队列,那么这条消息会进入队列,重新发送给消费者
注意: 重试的消息的消息头是amqp_redelivered属性会被设置成 true,客户端由此判断该消息是否被确认,
如果不做判断,每次失败重入队列再次发送,会导致不停的发送与拒绝。

消费者的手动确认消息

basicNack方法:
第一个参数是 该消息的tagId,
第二个参数:为true表示包含当前消息在内的所有比该消息的deliveryTag值小的消息都被拒绝, 除了已经被应答的消息。为false则表示只拒绝本条消息,
第三个参数:是否重入队列。首先判断此消息是否为重试消息,是的话就不重入队列,不是就重入队列,再次发送

package com.example.seckill.rabbit;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;


/**
 * 消息接收者
 */
/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "queue-topic01")
    @RabbitHandler
    public void receive01(@Payload MQParam param,
                          @Headers Map<String,Object> headers,
                          Channel channel) throws IOException {
        System.out.println("-----收到消息了-----");
        System.out.println("---发过来的用户是:" + param);
        System.out.println("---发过来的heard是:" + headers);
        /**
         * basicAck:表示确认已经消费消息,通知MQ,需要先得到  deliveryTag
         * deliveryTag 从消息头里get到
         */
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            channel.basicAck(tag,false);
        }catch (Exception e){

            // 判断是否是重试消息
            boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
          
            channel.basicNack(tag,false,!redelivered);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

yaml 配置自动确认以及重试

spring:
  rabbitmq:
    listener:
       retry:
    #    重试次数
          max-attempts: 3
        #   开启重试机制
          enabled: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

四、延迟消息、死信

死信 + 延迟消息
场景:红包,过时退回。订单:过期取消

1、什么是死信?

比如生产者发送的消息到MQ,这条消息因为各种原因没有被消费,消息最终死了。
死信队列死信交换机,和普通的没有区别,将其配置为死信处理者,死信转发到死信交换机和死信队列上,对死信进行处理。
这种设置、处理 在 RabbitMQ 中是点对点的,即一个普通队列 可以绑定一个死信交换机。

2、什么时候会产生死信

1、队列长度满了
2、消费者拒绝消费消息(丢弃):basicNack() .basicNack方法
3、消息TTL(存活时间) 过期 :TTL可以设置在队列、单条消息上,如果在队列上,则等同于设置该队列下所有消息

3、死信消息的处理

产生死信后,消息,会到死信交换机,再由死信交换机路由到 死信队列上,死信队列再推送给这个队列的消费者

4、通过死信机制实现延迟任务

1、创建没有消费者的队列,设置TTL,并绑定死信交换机
2、所有需要延迟的消息全部向这条队列发送
3、死信交换机绑定对应的死信队列,其消费者即为处理延迟消息的服务

配置类、再创建普通队列中设置了死信队列及死信路由,并且设置了延迟时间

package com.example.seckill.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * RabbitMq配置类
 *
 * 准备 队列
 *
 */
@Configuration
public class RabbitMqConfig {
    // 普通交换机
    public final static String SKYPYB_ORDINARY_EXCHANGE = "ordinary-exchange";
    // 死信交换机
    public final static String SKYPYB_DEAD_EXCHANGE = "dead-exchange";
    // 普通队列
    public final static String SKYPYB_ORDINARY_QUEUE_1 = "ordinary-queue";
    // 死信队列
    public final static String SKYPYB_DEAD_QUEUE = "dead-queue";
    // 普通路由
    public final static String SKYPYB_ORDINARY_KEY = "key.ordinary.one";
    // 死信路由
    public final static String SKYPYB_DEAD_KEY = "key.dead";

    // 创建普通交换机
    @Bean
    public DirectExchange ordinaryExchange() {
        return new DirectExchange(SKYPYB_ORDINARY_EXCHANGE, false, true);
    }

    // 创建死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(SKYPYB_DEAD_EXCHANGE, false, true);
    }

    // 创建 普通队列,并绑定死信路由和死信交换机  该队列的死信消息,转发到绑定的死信交换机上
    @Bean
    public Queue ordinaryQueue() {
        Map<String, Object> arguments = new HashMap<>();
        //TTL 5s
        arguments.put("x-message-ttl", 1000 * 5);
        // 设定当前队列中,允许存放的最大消息数目
         arguments.put("x-max-length",10);
        //绑定死信队列和死信交换机
        arguments.put("x-dead-letter-exchange", SKYPYB_DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", SKYPYB_DEAD_KEY);
        return new Queue(SKYPYB_ORDINARY_QUEUE_1, false, false, true, arguments);
    }

    // 创建死信队列
    @Bean
    public Queue deadQueue() {
        return new Queue(SKYPYB_DEAD_QUEUE, false, false, true);
    }

    // 普通的绑定
    @Bean
    public Binding bindingOrdinaryExchangeAndQueue() {
        return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(SKYPYB_ORDINARY_KEY);
    }

    // 死信的绑定
    @Bean
    public Binding bindingDeadExchangeAndQueue() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(SKYPYB_DEAD_KEY);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

消息发送者【生产者】发送消息到普通队列

package com.example.seckill.rabbit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 发送消息
 */
@Service
@Slf4j
public class MqSender {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * 回调函数,confirm 确认
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

        /***
         *
         * @param correlationData 消息的唯一ID
         * @param b 确认消息是否被MQ 服务器接收,true:接收 ,false:未接收
         * @param s
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            System.out.println("data:" + correlationData);
            if (b) {
                System.out.println("消息被 MQ 接收");
            } else {
                System.out.println("消息没有被MQ接收");
            }
        }
    };


    // 发送消息
    public void send01(MQParam param) {

        // 设置回调
        rabbitTemplate.setConfirmCallback(confirmCallback);

        CorrelationData data = new CorrelationData();
        data.setId(param.getMessageId());

        log.info("发送消息:" + param);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend("ordinary-exchange",
                "key.ordinary.one", param);
        rabbitTemplate.convertAndSend("ordinary-exchange",
                "key.ordinary.one", param);

        log.info("-----消息发送完毕-----");
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

消费者 监听死信队列

package com.example.seckill.rabbit;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;


/**
 * 消息接收者
 */
/**
 * 消息接收者
 */
@Service
@Slf4j
public class MqReceiver {


    // 监听 queue 队列的消息
    @RabbitListener(queues = "dead-queue")
    public void receive01(@Payload MQParam param,
                          @Headers Map<String,Object> headers,
                          Channel channel) throws IOException {

        System.out.println("-----死信队列收到消息了-----");
        System.out.println("---发过来的用户是:" + param);
        System.out.println("---发过来的heard是:" + headers);
        /**
         * basicAck:表示确认已经消费消息,通知MQ,需要先得到  deliveryTag
         * deliveryTag 从消息头里get到
         */
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            channel.basicAck(tag,false);
        }catch (Exception e){
            System.err.println(e.getMessage());
            // 判断是否是重试消息
            boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);

            channel.basicNack(tag,false,!redelivered);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

测试:


    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){
        MQParam param = new MQParam();
        param.setId(System.currentTimeMillis());
        param.setName("用户1");
        param.setMessageId("param$" + System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
        mqSender.send01(param);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

可以发现,在发送消息 5秒 之后,死信队列 监听到了消息


以上是设置的队列的存活时间,还可以设置 消息的存活时间
在 生产者代码中 设置消息的TTL 为3秒

 // 发送消息
    public void send01(MQParam param) {

        // 设置回调
        rabbitTemplate.setConfirmCallback(confirmCallback);

        CorrelationData data = new CorrelationData();
        data.setId(param.getMessageId());

        log.info("发送消息:" + param);
        // 将消息发送到交换机
        rabbitTemplate.convertAndSend(
                "ordinary-exchange",
                "key.ordinary.one",
                param,
                (message -> {
                    message.getMessageProperties().setExpiration("3000");
                    return message;
                }));
        rabbitTemplate.convertAndSend("ordinary-exchange",
                "key.ordinary.one", param);

        log.info("-----消息发送完毕-----");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

可以看到 3秒后 监听到一条消息,5秒后又监听到一条消息

5、RabbitMQ死信实现消息延迟的缺陷

改变上述的 生产者代码为,先发送TTL队列的消息,再发送设置了消息TTL 3 秒的消息,死信队列等待了5秒才监听到消息,并不是先监听到三秒的消息。
原因:
队列是先进后出的有序队列,MQ只对队尾的消息进行超时判断。上述队尾消息是5秒超时,所以不会先判断3秒的。

6、RabbitMQ死信实现消息延迟的适用

有固定时间的延迟任务,RabbitMQ还是很好的。

7、解决死信延迟的缺陷:延迟消息交换机

延迟消息交换机

工具::rabbitmq_delayed_message_exchange插件,称为延迟消息交换机
作用是:在 direct、topic、fanout 等这些 exchange 基础上新增一个交换机类型 x-delayed-message
使用:只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数x-delay[:毫秒值] 就能够实现每条消息的异步延时

原理: 创建 延迟消息交换机,需要延迟的消息都发送到这个队列上。和死信无关了
配置类
创建自定义交换机,指定类型x-delayed-message

@Configuration
public class RabbitBindConfig {
    public final static String SKYPYB_DELAY_EXCHANGE = "delay-exchange";
    public final static String SKYPYB_DELAY_QUEUE = "delay-queue";
    public final static String SKYPYB_DELAY_KEY = "key.delay";
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //自定义交换机
        return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args);
    }
    @Bean
    public Queue delayQueue() {
        return new Queue(SKYPYB_DELAY_QUEUE, false, false, true);
    }
    @Bean
    public Binding bindingDelayExchangeAndQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

消费者
监听该 延迟队列

@RabbitListener(queues = {RabbitBindConfig.SKYPYB_DELAY_QUEUE})
@Component
@Slf4j
public class DelayReceiver {
   
    @RabbitHandler
    public void onDelayMessage(@Payload String message,
                              @Headers Map<String, Object> headers,
                              Channel channel) throws IOException {
        log.info("监听延时交换机, 收到消息: {}", message);
        //delivery tag可以从headers中get出来
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
            channel.basicNack(deliveryTag, false, !redelivered);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

生产者

rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
                RabbitBindConfig.SKYPYB_DELAY_KEY, "消息体-5s",
                (msg) -> {
                    msg.getMessageProperties().setDelay(5000);
                    return msg;
                });
        rabbitTemplate.convertAndSend(
                RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
                RabbitBindConfig.SKYPYB_DELAY_KEY,
                "消息体-3s",
                (msg) -> {
                    msg.getMessageProperties().setDelay(3000);
                    return msg;
                });
        logger.info("-----消息发送完毕-----");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

面试题:

1、如何避免消息堆积?

  1. 多个消费者监听一个队列
  2. 监听到消息,通过线程池异步处理

2、如何避免消息丢失

  1. 生产者发送消息,使用回调方法,获取MQ的确认信息,判断是否发送成功
  2. 消费者接收消息,使用ACK进行消息确认
  3. 消息,队列、交换机设置持久化
    消息持久化:rabbitTemplate 默认持久化
    队列持久化,
 @Bean
    public Queue queue01(){
        return new Queue(QUEUE01,true);
    }
  • 1
  • 2
  • 3
  • 4

交换机持久化

    // 创建 交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(EXCHANGE,true,false);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3、Rabbit的优劣势(为什么使用消息队列)

参考
优点:
1、应用解耦:订单系统、库存系统,之间通过rabbitMQ 连接,模块处理数据连接rabbitmq即可,降低模块间的耦合度。
2、异步提速:支付成功后,有订单、物流等业务,可以使用 mq进行异步操作,提高效率。
3、流量削峰:大量请求进入 rabbit的队列中,将每秒500请求,从队列取出,交给接口处理。
缺点
RabbitMQ 一旦崩了,全崩了。

4、如果保证消息到了mq,和mq 将消息分发到 队列中

ConfirmCallback:实现判断消息是否到交换机
ReturnCallback:实现判断 消息到达交换机后是否到队列中

5、消息积压原因

1、消费者宕机,导致队列中的消息无法被消费
2、消费者的业务逻辑过大,导致消费能力不足
3、生产者产生的消息过多,比如”双十一“,导致消费者处理不过来

6、解决消息积压

1、增加消费者的数量
2、将消息从队列中取出,存进数据库,后期慢慢处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/617076
推荐阅读
相关标签
  

闽ICP备14008679号