赞
踩
实现步骤:创建队列 → 创建生产者 → 创建消费者 → 测试代码
//1、创建队列 @Configuration public class HelloConfiguration { public static final String QUEUE_NAME = "queue_name"; @Bean public Queue helloQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); } } // 2、创建生产者 @Slf4j @RestController public class HelloController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void helloWorld(@PathVariable String message) { rabbitTemplate.convertAndSend("", QUEUE_NAME, message); } } // 3、创建消费者 (此处可以进行多个消费者的绑定,只需要在写一个消费者绑定到对列就可以) @Slf4j @Component public class HelloConsumer { @RabbitListener(queues = QUEUE_NAME) public void helloConsumer(Message message) { log.info("消费者消费了消息:{}", new String(message.getBody())); } }
// 1、创建交换机 → 创建队列 → 绑定交换机和队列 @Configuration public class FanoutConfiguration { public static final String FANOUT_EXCHANGE_NAME = "exchange_name"; public static final String FANOUT_QUEUE_NAME_ONE = "queue_name_one"; public static final String FANOUT_QUEUE_NAME_TWO = "queue_name_two"; // 创建交换机 @Bean public FanoutExchange fanoutExchange() { return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).build(); } // 创建队列one @Bean public Queue queueOne() { return QueueBuilder.durable(FANOUT_QUEUE_NAME_ONE).build(); } // 创建队列two @Bean public Queue queueTwo() { return QueueBuilder.durable(FANOUT_QUEUE_NAME_TWO).build(); } // 绑定交换机和队列one @Bean public Binding queueOneToFanoutExchange(Queue queueOne, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueOne).to(fanoutExchange); } // 绑定交换机和队列two @Bean public Binding queueTwoToFanoutExchange(Queue queueTwo, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueTwo).to(fanoutExchange); } } // 2、创建生产者 @Slf4j @RestController public class FanoutController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendFanoutMessage/{message}") public void sendFanoutMessage(@PathVariable String message) { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", message); } } // 3、创建消费者 @Slf4j @Component public class FanoutOneConsumer { @RabbitListener(queues = FANOUT_QUEUE_NAME_ONE) public void receiveFanoutQueue(Message message) { log.info("consumer one 消费了消息:{}", new String(message.getBody())); } } @Slf4j @Component public class FanoutTwoConsumer { @RabbitListener(queues = FANOUT_QUEUE_NAME_TWO) public void receiveFanoutQueue(Message message) { log.info("consumer two 消费了消息:{}", new String(message.getBody())); } }
实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建生产者 → 创建消费者 → 测试代码
// 1、创建交换机 → 创建队列 → 绑定交换机和队列 @Configuration public class RoutingConfiguration { public static final String ROUTING_EXCHANGE_NAME = "routing_exchange_name"; public static final String ROUTING_QUEUE_NAME_ONE = "routing_queue_one"; public static final String ROUTING_QUEUE_NAME_TWO = "routing_queue_two"; @Bean public DirectExchange directExchange() { return ExchangeBuilder.directExchange(ROUTING_EXCHANGE_NAME).build(); } @Bean public Queue routingQueueOne() { return QueueBuilder.durable(ROUTING_QUEUE_NAME_ONE).build(); } @Bean public Queue routingQueueTwo() { return QueueBuilder.durable(ROUTING_QUEUE_NAME_TWO).build(); } @Bean public Binding queueOneBindingDirectExchangeWithInfo(Queue routingQueueOne, DirectExchange directExchange) { return BindingBuilder.bind(routingQueueOne).to(directExchange).with("info"); } @Bean public Binding queueTwoBindingDirectExchangeWithInfo(Queue routingQueueTwo, DirectExchange directExchange) { return BindingBuilder.bind(routingQueueTwo).to(directExchange).with("info"); } @Bean public Binding queueTwoBindingDirectExchangeWithWarning(Queue routingQueueTwo, DirectExchange directExchange) { return BindingBuilder.bind(routingQueueTwo).to(directExchange).with("warning"); } @Bean public Binding queueTwoBindingDirectExchangeWithError(Queue routingQueueTwo, DirectExchange directExchange) { return BindingBuilder.bind(routingQueueTwo).to(directExchange).with("error"); } } // 2、创建生产者 @RestController public class RoutingController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendRoutingMessageInfo/{message}") public void sendRoutingMessage(@PathVariable String message) { rabbitTemplate.convertAndSend(ROUTING_EXCHANGE_NAME, "info", message.getBytes()); } @RequestMapping("/sendRoutingMessageWarning/{message}") public void sendRoutingMessageWarning(@PathVariable String message) { rabbitTemplate.convertAndSend(ROUTING_EXCHANGE_NAME, "warning", message.getBytes()); } @RequestMapping("/sendRoutingMessageError/{message}") public void sendRoutingMessageError(@PathVariable String message) { rabbitTemplate.convertAndSend(ROUTING_EXCHANGE_NAME, "error", message.getBytes()); } } // 3、创建消费者 @Slf4j @Component public class RoutingOneConsumer { @RabbitListener(queues = ROUTING_QUEUE_NAME_ONE) public void receiveMessage(Message message) { log.info("RoutingOneConsumer接收到的消息为:{}", new String(message.getBody())); } } @Slf4j @Component public class RoutingTwoConsumer { @RabbitListener(queues = ROUTING_QUEUE_NAME_TWO) public void receiveMessage(Message message) { log.info("RoutingTwoConsumer接收到的消息为:{}", new String(message.getBody())); } }
.green. : 三个单词,中间的单词为green
..blue : 三个单词, 最后一个单词为blue
red.# : 以red 开头
注:如果满足两个条件路由则只发送一条
实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建生产者 → 创建消费者 → 测试代码
// 1、创建交换机 → 创建队列 → 绑定交换机和队列 @Configuration public class TopicConfiguration { public static final String TOPIC_EXCHANGE_NAME = "topic_exchange_name"; public static final String TOPIC_QUEUE_ONE_NAME = "topic_queue_one"; public static final String TOPIC_QUEUE_TWO_NAME = "topic_queue_two"; @Bean public TopicExchange topicExchange() { return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build(); } @Bean public Queue topicQueueOne() { return QueueBuilder.durable(TOPIC_QUEUE_ONE_NAME).build(); } @Bean public Queue topicQueueTwo() { return QueueBuilder.durable(TOPIC_QUEUE_TWO_NAME).build(); } @Bean public Binding topicQueueOneBindingExchangeWithGreen(Queue topicQueueOne, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueueOne).to(topicExchange).with("*.green.*"); } @Bean public Binding topicQueueTwoBindingExchangeWitBlue(Queue topicQueueTwo, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with("*.*.blue"); } @Bean public Binding topicQueueTwoBindingExchangeWitRed(Queue topicQueueTwo, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with("red.#"); } } // 2、创建生产者 @RestController public class TopicController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendTopicMessageColor/{message}") public void sendRoutingMessage(@PathVariable String message) { rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "red.green.blue", message.getBytes()); } @RequestMapping("/sendTopicMessageBlue/{message}") public void sendTopicMessageBlue(@PathVariable String message) { rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "a.a.blue", message.getBytes()); } @RequestMapping("/sendTopicMessageRedAndBlue/{message}") public void sendTopicMessageRedAndBlue(@PathVariable String message) { rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "red.a.blue", message.getBytes()); } } // 3、创建消费者 @Slf4j @Component public class TopicOneConsumer { @RabbitListener(queues = TOPIC_QUEUE_ONE_NAME) public void receiveMessage(Message message) { log.info("TopicOneConsumer接收到的消息为:{}", new String(message.getBody())); } } @Slf4j @Component public class TopicTwoConsumer { @RabbitListener(queues = TOPIC_QUEUE_TWO_NAME) public void receiveMessage(Message message) { log.info("TopicTwoConsumer接收到的消息为:{}", new String(message.getBody())); } }
首先在在配置文件中添加
spring.rabbitmq.publisher-confirm-type=correlated
实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建回调函数 → 创建生产者 → 创建消费者 → 测试代码
// 1、创建交换机 → 创建队列 → 绑定交换机和队列 @Configuration public class ConfirmConfiguration { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @Bean public DirectExchange confirmDirectExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).build(); } @Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding confirmQueueBindingConfirmDirectExchange(Queue confirmQueue, DirectExchange confirmDirectExchange) { return BindingBuilder.bind(confirmQueue).to(confirmDirectExchange).with("confirm_key"); } } // 2、创建回调函数 @Slf4j @Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("交换机已经收到ID为 {} 的消息", correlationData.getId()); } else { log.info("交换机还未收到Id为 {} 的消息,由于原因 {}", correlationData.getId(), cause); } } @Override public void returnedMessage(ReturnedMessage returned) { log.error("消息:{}, 被交换机{}退回, 退回原因:{}, 路由的key:{}", new String(returned.getMessage().getBody()), returned.getExchange(), returned.getReplyText(), returned.getRoutingKey()); } } //创建生产者 @RestController public class ConfirmController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendConfirmMessage/{message}") public void sendConfirmMessage(@PathVariable String message) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "confirm_key", message.getBytes(), correlationData); } } // 创建消费者 @Slf4j @Component public class ConfirmConsumer { @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveConfirmQueue(Message message) { log.info("ConfirmConsumer 消费了消息:{}", new String(message.getBody())); } }
通过自己的学习、画图理解和总结,加深了对RabbitMQ的 Exchange、routingKey、Queue、Producer、Consumer之间交互的理解,接下来还会通过画图的方式去分析死信队列、延时队列、优先级队列等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。