赞
踩
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。
//开启发布确认
channel.confirmSelect();
就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布
waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
- public static void publishMessageIndividually() throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- //队列声明
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName, true, false, false, null);
- //开启发布确认
- channel.confirmSelect();
-
- long begin = System.currentTimeMillis();
-
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message = i + "";
- channel.basicPublish("", queueName, null, message.getBytes());
- //服务端返回 false 或超时时间内未返回,生产者可以消息重发
- boolean flag = channel.waitForConfirms();
- if (flag) {
- System.out.println("消息发送成功");
- }
- }
-
- long end = System.currentTimeMillis();
- System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
-
- }
他是利用回调函数来达到消息可靠性传递的
- // 异步确认发布
- private static void publishMessageAsync() throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- //队列声明
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName, true, false, false, null);
- //开启发布确认
- channel.confirmSelect();
-
-
- /**线程安全有序的一个哈希表,适用于高并发的情况下
- 1.轻松的将序号与消息进行关联
- 2.轻松批量删除条目,只要给到序号
- 3.支持高并发(多线程)
- **/
- //消息确认成功,回调函数
- ConcurrentSkipListMap<Long,String> outstandingConfirms =
- new ConcurrentSkipListMap<>();
- ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个
- if (multiple){ //批量的
- //2.轻松批量删除条目,只要给到序号
- ConcurrentNavigableMap<Long,String> confirmed =
- outstandingConfirms.headMap(deliveryTag); //消息的序号
- confirmed.clear();
- }else { //单个的
- outstandingConfirms.remove(deliveryTag);
- }
- System.out.println("确认的消息:"+deliveryTag);
- };
-
- //消息确认失败,回调函数
- /**
- * 1.消息的标记
- * 2.是否为批量确认
- */
- ConfirmCallback nackCallback = (deliveryTag , multiple)->{
- //3、打印一下未确认的消息都有哪些
- String message = outstandingConfirms.get(deliveryTag);
- System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);
- };
-
-
- //准备消息的监听器 ,监听哪些消息成功了, 哪些失败了
- channel.addConfirmListener( ackCallback, nackCallback);
-
- //开始时间
- long begin = System.currentTimeMillis();
-
- //发送消息
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message = i + "";
- channel.basicPublish("", queueName, null, message.getBytes());
- //1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面
- outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
- }
-
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
- }
退回模式(return)
说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。
默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式
可以在生产者端监听改消息是否被成功投递到队列中
- channel.addReturnListener(new ReturnCallback() {
- @Override
- public void handle(Return returnMessage) {
- System.out.println("消息被回退,原因:"+returnMessage.getReplyText());
- System.out.println(returnMessage.getExchange()); // 交换机
- System.out.println(returnMessage.getReplyCode()); // 返回原因的代码
- System.out.println(returnMessage.getReplyText()); // 返回信息,例如NO_ROUTE
- System.out.println(returnMessage.getRoutingKey()); // 路由KEY
- }
- });
- spring:
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: root
- password: root
- #虚拟host 可以不设置,使用server默认host
- virtual-host: JCcccHost
- #确认消息已发送到交换机(Exchange)
- publisher-confirm-type: correlated
- #确认消息已发送到队列(Queue)
- publisher-returns: true
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
- /**
- * @Author : JCccc
- * @CreateTime : 2019/9/3
- * @Description :
- **/
- @Configuration
- public class RabbitConfig {
-
- @Bean
- public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
- rabbitTemplate.setConnectionFactory(connectionFactory);
-
- // 配置发布到交换机的确认
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- /**
- *correlationData :客户端在发送原始消息时提供的对象。
- *ack:exchange交换机是否成功收到了消息。true成功,false代表失败。
- *cause:失败原因。
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
- System.out.println("ConfirmCallback: "+"确认情况:"+ack);
- System.out.println("ConfirmCallback: "+"原因:"+cause);
- }
- });
-
- // 配置交换机没有找到对应的消息队列时,消息退回时的处理
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("ReturnCallback: "+"消息:"+message);
- System.out.println("ReturnCallback: "+"回应码:"+replyCode);
- System.out.println("ReturnCallback: "+"回应信息:"+replyText);
- System.out.println("ReturnCallback: "+"交换机:"+exchange);
- System.out.println("ReturnCallback: "+"路由键:"+routingKey);
- }
- });
-
- return rabbitTemplate;
- }
-
- }
- package com.student.rabbitmq.config;
-
-
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- /**
- * 第二种方式
- */
- @Component
- public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- // 将我们实现的MyCallBack接口注入到RabbitTemplate中
- @PostConstruct
- public void init() {
- // 设置确认消息交给谁处理
- rabbitTemplate.setConfirmCallback(this);
- // 设置回退消息交给谁处理
- rabbitTemplate.setReturnCallback(this);
- }
-
- /**
- * 交换机确认回调方法
- *
- * @param correlationData 保存回调消息的ID以及相关信息
- * @param ack 表示交换机是否收到消息(true表示收到)
- * @param cause 表示消息接收失败的原因(收到消息为null)
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- String id = correlationData != null ? correlationData.getId() : "";
- if (ack) {
- System.out.println("交换机已经收到ID为:{}的消息");
- } else {
- System.out.println("交换机还未收到ID为:{}的消息,原因为:{}" + cause);
- }
- }
-
- /**
- * 路由出现问题的消息回退方法
- *
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println(new String(message.getBody())+
- exchange+
- replyText+
- routingKey);
- }
-
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。