当前位置:   article > 正文

消息队列-RabbitMQ的使用_交换机绑定队列没有权限

交换机绑定队列没有权限

一、使用场景

官方文档:https://www.rabbitmq.com/networking.htmll
在这里插入图片描述

1.1 异步处理

在这里插入图片描述

1.2 应用解耦

在这里插入图片描述

1.3 流量控制

在这里插入图片描述

二、RabbitMQ概念

RabbitMQ简介:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,
这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可
能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直
在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交
换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道
发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都
是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加
密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥
有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时
指定,RabbitMQ 默认的 vhost 是 / 。

在这里插入图片描述

消息队列主要有两种形式的目的地

  1. 队列(queue):点对点消息通信(point-to-point)
    消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获
    取消息内容,消息读取后被移出队列
    消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
  2. 主题(topic):发布(publish)/订阅(subscribe)消息通信
    发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个
    主题,那么就会在消息到达时同时收到消息

三、RabbitMAQ的安装

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
官方文档:https://www.rabbitmq.com/networking.html

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述
web管理页面:http://192.168.56.10:15672/#/
默认的用户名:guest
默认的密 码:guest

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

四:Exchange 交换机类型类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、
fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,
headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接
看另外三种类型:
direct : 定向交换机也叫点对点交换机
topic : 主题交换机
fanout:广播交换机

direct 定向接收,精确匹配routing key,绑定了这个key的所有队列都能收到
在这里插入图片描述
fanout 这种交换机所有的对列都会接收,忽略routing key 所有绑定了的队列都会接收
在这里插入图片描述
★ topic 队列通过条件接收,通过条件匹配符合条件的队列会接收
在这里插入图片描述

4.1 新建一个交换机

在这里插入图片描述

4.1.1 交换机参数

alternate-exchange:候补(备用)交换机

4.1.2 交换机详细信息

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

发送消息参数说明

content_type : 消息内容的类型 [常用 ]
content_encoding: 消息内容的编码格式
priority: 消息的优先级 根据发布的时消息的重要性
correlation_id:关联id
reply_to: 用于指定回复的队列的名称 -> RPC 远程调用时回复是可以带上[常用 ]
expiration: 消息的失效时间
message_id: 消息id 拥有确认消息之间是否是同一个,用来消息的回复[常用 ]
timestamp:消息的时间戳
type: 类型
user_id: 用户id
app_id: 应用程序id一般模块名称[常用 ]
cluster_id: 集群id

4.2 新建一个队列

在这里插入图片描述

4.2.1 新建队列参数说明:
  • x-message-ttl:【重要】发布到队列的消息在被丢弃之前可以生存多长时间(毫秒)
  • x-expires:多长时间(毫秒)后,队列(注意是把这个队列删除)会被自动删除。
  • x-max-length:【重要】可以包含ready状态的消息的最大条数,过了这个数量默认最早的数据会丢失。
  • x-max-length-bytes:可以包含ready状态的消息的最大存储量,单位为byte
  • x-overflow:队列溢出后的处理逻辑,需要与x-max-length或x-max-length-bytes配合使用:取值有(drop head【默认】 删除头部的就是老数据、reject publish或reject publish dlx)
  • x-dead-letter-exchange:【重要】死信(看下面解释)息交换机
  • x-dead-letter-routing-keys:【重要】死信交换机路由
  • x-single-active-consumer:如果设置,确保一次只有一个消费者从队列中消费,并在活动消费者被取消或死亡的情况下故障转移到另一个注册消费者
  • x-max-priority:【重要】队列支持的最大优先级数量;如果未设置,队列将不支持消息优先级。
  • x-queue-mode:将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM使用;如果未设置,队列将保留内存缓存,以尽可能快地传递消息。
  • x-queue-master-locator:将队列设置为主位置模式,确定在节点集群上声明时队列主位置的规则。
4.2.2 队列详细信息在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
runtime metrics 运行队列的基本内存状况和占用磁盘大小
在这里插入图片描述

死信的三种情况
  • 如果queue中的消息被消费者接收, 但是消费者拒绝消费(消费者执行了reject 或nack 并将 requee 参数设置为 false )的时候,这个消息就会变成死信。
  • 消息本身设置了过期时间(x-message-ttl), 并且消息过期时间已经生效, 还未被消费的消息就会变成死信
  • 队列设置了最大长度限制(x-max-length或x-max-length-bytes), 当队列已满, 之后从交换机路由到该队列的消息会自动变成死信

4.3 交换机绑定队列

在这里插入图片描述
在这里插入图片描述

五、测试

在这里插入图片描述
分析:创建出3个交换机,一个为direct类型,一个为fanout类型,一个为topic类型
发消息给交换机,查看队列中的数据。

5.1 发消息给指定的队列

在这里插入图片描述

在这里插入图片描述

5.2 接收队列

在这里插入图片描述

在这里插入图片描述
对于fanout的交换机:填写或者不填写Routing key 所有绑定的队列都会接收。
在这里插入图片描述
*对于topic 的交换机,可以设置不同的规则匹配,符合Routing key条件的队列会接收,#匹配0个或多个单词,匹配一
个单词

特别注意:(# )匹配0个或多个单词,(*) 匹配一个单词

在这里插入图片描述
在这里插入图片描述

六、springboot整合RabbitMQ

1. 引入 spring-boot-starter-amqp
2. application.yml配置
3. 测试RabbitMQ
	1. AmqpAdmin:管理组件
	2. RabbitTemplate:消息发送处理组件
	3. @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)
	• Object content, Message message, Channel 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

第一步:引入依赖

<!--        引入RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

第二步:添加配置
1.配置文件配置

# RabbitMQ配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.在GulimallOrderApplication上开启注解
@EnableRabbit

3.配置 把RabbitMQ接收和发送的数据转为json

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

第三步:测试发送消息
组件:
1. AmqpAdmin:管理组件
2. RabbitTemplate:消息发送处理组件
3. @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)
• Object content, Message message, Channel

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {

    // 可以创建交换机,队列,并管理
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 第一步:创建交换机
     * 1、如何创建Exchange、Queue、Binding
     *      1)、使用AmqpAdmin进行创建
     * 2、如何收发消息
     */
    @Test
    public void createExchange() {

        /**
         * name 名称
         * durable 是否持久化
         * autoDelete 是否可自动删除,没有绑定任何队列时删除
         */
        Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange[{}]创建成功:","hello-java-exchange");
    }


    /**
     * 第二步:创建队列
     */
    @Test
    public void testCreateQueue() {
        /**
         * name 名称
         * durable 是否持久化
         * exclusive 是否排它, 如果有一个连接了其他就无法连接它,这个最好设置为false
         * autoDelete 是否可自动删除,没有绑定任何队列时删除
         */
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]创建成功:","hello-java-queue");
    }


    /**
     *第三步:交换机与队列进行绑定
     */
    @Test
    public void createBinding() {

        /**
         * String destination,  目的地,队列名称
         * Binding.DestinationType destinationType, 类型,可以绑定队列或者交换机
         * String exchange,  交换机名称
         * String routingKey,  路由键
         * Map<String, Object> arguments  参数
         */
        Binding binding = new Binding("hello-java-queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",
                null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding[{}]创建成功:","hello-java-binding");

    }

    /**
     * 第四步:交换机发送消息
     */
    @Test
    public void sendMessageTest() {
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("reason");
        reasonEntity.setStatus(1);
        reasonEntity.setSort(2);
        String msg = "Hello World";
        //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

        //2、发送的对象类型的消息,可以是一个json
        /**
         * String exchange, 交换机
         * String routingKey,  路由键,要与第四步绑定时设置的路由键有关系才能接收
         * Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
         * @Nullable CorrelationData correlationData
         */
        rabbitTemplate.convertAndSend("hello-java-exchange","hello2.java",
                reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
                reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
        log.info("消息发送完成:{}",reasonEntity);
    }

//    @Test
//    public void create() {
//        HashMap<String, Object> arguments = new HashMap<>();
//        arguments.put("x-dead-letter-exchange", "order-event-exchange");
//        arguments.put("x-dead-letter-routing-key", "order.release.order");
//        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
//        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
//        amqpAdmin.declareQueue(queue);
//        log.info("Queue[{}]创建成功:","order.delay.queue");
//    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

在这里插入图片描述

在这里插入图片描述
第四步:测试接收消息

@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    @Override
    public PageUtils queryPage(Map<String, Object> params) {
        IPage<OrderItemEntity> page = this.page(
                new Query<OrderItemEntity>().getPage(params),
                new QueryWrapper<OrderItemEntity>()
        );

        return new PageUtils(page);
    }


    /**
     * queues:声明需要监听的队列
     * message : 如果我们不知道发送的对象类型,可以统一用org.springframework.amqp.core.Message 来接收
     * OrderReturnReasonEntity:接收消息的对象,因为我们发的时这个对象,所以接收也可以用这个对象
     * Message message,OrderReturnReasonEntity 这两个参数写一个就可以
     * channel:当前传输数据的通道
     */
    @RabbitListener(queues = {"hello-java-queue"})
    public void revieveMessage(Message message,
                               OrderReturnReasonEntity content) {
        //拿到主体内容
        byte[] body = message.getBody();
        //拿到的消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接受到的消息...内容" + message + "===内容:" + content.getClass());

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

在这里插入图片描述

七、最佳实战@RabbitListener + @RabbitHandler

@RabbitListener 可以在类或者方法上
@RabbitHandler 只能加在方法上
第一步:发送消息

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
	@Autowired
	RabbitTemplate rabbitTemplate;
	@RequestMapping("/send")
	public R sendMessage(){
		for (int i=0;i<10;i++){
			if(i%2==0){
				OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
				reasonEntity.setId((long) i);
				reasonEntity.setCreateTime(new Date());
				reasonEntity.setName("reason");
				reasonEntity.setStatus(1);
				reasonEntity.setSort(2);
				//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

				//2、发送的对象类型的消息,可以是一个json
				/**
				 * String exchange, 交换机
				 * String routingKey,  路由键,要与第四步绑定时设置的路由键有关系才能接收
				 * Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
				 * @Nullable CorrelationData correlationData
				 */
				rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
						reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
//				log.info("消息发送完成OrderReturnReasonEntity:{}",i);
			}else{
				OrderEntity orderEntity = new OrderEntity();
				orderEntity.setId((long) i);
//				orderEntity.setCreateTime(new Date());
				orderEntity.setOrderSn(String.valueOf(UUID.randomUUID()));
				//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

				//2、发送的对象类型的消息,可以是一个json
				/**
				 * String exchange, 交换机
				 * String routingKey,  路由键,要与第四步绑定时设置的路由键有关系才能接收
				 * Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
				 * @Nullable CorrelationData correlationData
				 */
				rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
						orderEntity,new CorrelationData(UUID.randomUUID().toString()));
//				log.info("消息发送完成OrderEntity:{}",i);

			}
			System.out.println("maruis---消息发送完成--->" + i);
		}
		return R.ok();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

第二步:接收消息

@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    @RabbitHandler
    public void revieveMessage( OrderReturnReasonEntity content) {
        System.out.println("接受到的消息OrderReturnReasonEntity...内容" + content.getId() + "===类型:" + content.getClass().getName());
    }

    @RabbitHandler
    public void revieveMessage( OrderEntity content) {
        System.out.println("接受到的消息OrderEntity...内容" + content.getId() + "===类型:" + content.getClass().getName());
    }

    @Override
    public PageUtils queryPage(Map<String, Object> params) {
        IPage<OrderItemEntity> page = this.page(
                new Query<OrderItemEntity>().getPage(params),
                new QueryWrapper<OrderItemEntity>()
        );

        return new PageUtils(page);
    }


    /**
     * queues:声明需要监听的队列
     * message : 如果我们不知道发送的对象类型,可以统一用org.springframework.amqp.core.Message 来接收
     * OrderReturnReasonEntity:接收消息的对象,因为我们发的时这个对象,所以接收也可以用这个对象
     * Message message,OrderReturnReasonEntity 这两个参数写一个就可以
     * channel:当前传输数据的通道
     * 场景:
     *      1)、订单服务启动多个,同一个信息,只能有一个客户端接收。
     *      2)、只有一个消息完全处理完,方法运行结束,我们才可以接收其他的消息。
     */
//    @RabbitListener(queues = {"hello-java-queue"})
//    public void revieveMessage(Message message,
//                               OrderReturnReasonEntity content,
//                               Channel channel) {
//        //拿到主体内容
//        byte[] body = message.getBody();
//        //拿到的消息头属性信息
//        MessageProperties messageProperties = message.getMessageProperties();
//        System.out.println("接受到的消息...内容" + message + "===内容:" + content.getClass());
//
//    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

在这里插入图片描述

八、RabbitMQ消息确认机制-可靠抵达

在这里插入图片描述
• publisher confirmCallback 确认模式,发送信息mq服务器,并且服务器接收以后回调。
• publisher returnCallback 未投递到 queue 退回模式,queue接收到信息后回调
• consumer ack机制 消费者接收到信息后回调

前两个时发送端确认,最后一个时消费端确认

8.1 发送端的消息确认confirmCallback+returnCallback

第一步:yml配置

# RabbitMQ配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

第二步:设置配置类

@Configuration
public class MyRabbitConfig {

    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 注意:这个方法时消息没有抵达才会调用
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

在这里插入图片描述

8.2 消费端消息确认

消费端默认的确认方式是自动确认的,但是这种会出现一个问题就是在还未确认之前发生宕机时,数据就会从RabbitMQ中丢失,为了让数据准确抵达,我们需要手动确认,手动确认的结果有两种:一种时接收;一种是拒绝(拒绝后数据可以丢失,也可以重新让如队列)

第一步: 配置手动确认

# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2

第二步:发送小心

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
	@Autowired
	RabbitTemplate rabbitTemplate;
	@RequestMapping("/send")
	public R sendMessage(){
		for (int i=0;i<10;i++){
			if(i%2==0){
				OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
				reasonEntity.setId((long) i);
				reasonEntity.setCreateTime(new Date());
				reasonEntity.setName("reason");
				reasonEntity.setStatus(1);
				reasonEntity.setSort(2);
				//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

				//2、发送的对象类型的消息,可以是一个json
				/**
				 * String exchange, 交换机
				 * String routingKey,  路由键,要与第四步绑定时设置的路由键有关系才能接收
				 * Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
				 * @Nullable CorrelationData correlationData 消息的唯一ID,为了实现消息的可靠投递
				 */
				rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
						reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
//				log.info("消息发送完成OrderReturnReasonEntity:{}",i);
			}else{
				OrderEntity orderEntity = new OrderEntity();
				orderEntity.setId((long) i);
//				orderEntity.setCreateTime(new Date());
				orderEntity.setOrderSn(String.valueOf(UUID.randomUUID()));
				//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

				//2、发送的对象类型的消息,可以是一个json
				/**
				 * String exchange, 交换机
				 * String routingKey,  路由键,要与第四步绑定时设置的路由键有关系才能接收
				 * Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
				 * @Nullable CorrelationData correlationData 消息的唯一ID,为了实现消息的可靠投递
				 */
				rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
						orderEntity,new CorrelationData(UUID.randomUUID().toString()));
//				log.info("消息发送完成OrderEntity:{}",i);

			}
			System.out.println("maruis---消息发送完成--->" + i);
		}
		return R.ok();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

第三步:消息确认

/**
     * queues:声明需要监听的队列
     * message : 如果我们不知道发送的对象类型,可以统一用org.springframework.amqp.core.Message 来接收
     * OrderReturnReasonEntity:接收消息的对象,因为我们发的时这个对象,所以接收也可以用这个对象
     * Message message,OrderReturnReasonEntity 这两个参数写一个就可以
     * channel:com.rabbitmq.clientChannel 当前传输数据的通道
     * 场景:
     *      1)、订单服务启动多个,同一个信息,只能有一个客户端接收。
     *      2)、只有一个消息完全处理完,方法运行结束,我们才可以接收其他的消息。
     */
     @RabbitListener(queues = {"hello-java-queue"})
//    @RabbitHandler
    public void revieveMessage(Message message,
                               Channel channel)   {
        //拿到主体内容
        byte[] body = message.getBody();
        //拿到的消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接受到的消息...内容" + message + "===类型:" + message.getClass());
        // 在chanel内部是顺序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("maruis----deliveryTag-->" + deliveryTag);

        /**
         *deliveryTag 消息id
         * false   不批量
         */
        // 签收消息,非批量模式
        try {
            if(deliveryTag%2==0){
                channel.basicAck(deliveryTag,false);
                System.out.println("maruis------>" + "签收成功");
            }else{
                // 参数: 消息id,是否重新投递给服务器
//                channel.basicReject(deliveryTag,false);
                // 参数: 消息id,是否批量拒绝(如果为true,那么这条信息之前的信息全部被拒),是否重新投递给服务器(如果是true,就会重新放入队列,false丢弃)
                channel.basicNack(deliveryTag,false,true);
                System.out.println("maruis------>" + "退货");
            }
        }catch (Exception e){
            // 网络终端
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/513250
推荐阅读
相关标签
  

闽ICP备14008679号