赞
踩
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
//备份交换机 public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; //备份队列 public static final String BACKUP_QUEUE_NAME = "backup.queue"; //告警队列 public static final String WARNING_QUEUE_NAME = "warning.queue"; @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } // 声明确认队列 @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding backupQueueBindingBackupQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningQueueBindingBackupQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); }
package com.hong.springboot.rabbitmq.consumer; import com.hong.springboot.rabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; /** * @Description: 告警消费者 * @Author: hong * @Date: 2024-03-10 17:54 * @Version: 1.0 **/ @Slf4j @Component public class WarningConsumer { //接收告警消息 @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarningMessage(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{},备用交换机收到不可路由信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg); } }
先删了confirm.exchange交换机(改了配置)再启动项目,发送http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才
package com.hong.rabbitmq10; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; /** * @Description: 优先级队列生产者 * @Author: hong * @Date: 2024-03-11 21:11 * @Version: 1.0 **/ public class PriorityProducer { private static final String QUEUE_NAME = "priority-queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.4"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 声明消息队列 * 第一个参数: 队列名称 * 第二个参数: 是否持久化 false不持久化,rabbitmq重启队列丢失 true持久化,rabbitmq重启队列不丢失 * 第三个参数: 是否排外 false排外只能一个消费者消费 false不排外可多消费者消费 * 第四个参数: 是否自动删除 true自动删除最后一个消费者断开连接后该队列自动删除 false不自动删除 * 第五个参数: 其他参数信息 */ Map<String,Object> map = new HashMap<>(); //设置最大优先级 map.put("x-max-priority",10); channel.queueDeclare(QUEUE_NAME,true,false,false,map); for (int i = 1; i < 11; i++) { String message = "HELLO RABBITMQ!-------"+i; if(i == 5){ AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("",QUEUE_NAME,properties,message.getBytes()); }else{ /** * 发布消息 * 第一个参数:要将消息发送到哪个交换机 * 第二个参数:路由的 key 是哪个 * 第三个参数:其他参数信息 * 第四个参数:消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); } } System.out.println("消息发送完成!"); } }
消费者
package com.hong.rabbitmq10; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Description: 优先级队列消费者 * @Author: hong * @Date: 2024-03-11 21:14 * @Version: 1.0 **/ public class PriorityConsumer { private static final String QUEUE_NAME = "priority-queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.4"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (comsumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!"); /** * 消息消费 * 第一个参数: 消费哪个队列 * 第二个参数: 是否自动确认消息 * 第三个参数: 当一个消息发送过来后的回调接口 * 第四个参数: 当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。