赞
踩
在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。
生产者--(创建消息)-->交换机--(路由键)-->队列--(pull/push)-->消费者
1)直连交换器: Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的
什么是路由键?
每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串
直连交换机适用场景?
有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
直连交换机不适合的场景
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么直连交换机就不合适了
2)主题交换机: Topic Exchange(发布/订阅)
RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
主题交换机的routing_key定义规则:
交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:
*表示一个单词
#表示任意数量(零个或多个)单词
示例:
- Q1: *.TT.*
- Q2: TT.#
-
- 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
- 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到
3)广播交换机: Fanout Exchange
用于广播消息,将发送到Exchange中的消息发送到与该交换器关联的所有队列中。
死信队列用于存储没匹配队列的消息,超时没有被处理的消息,如果没有配置死信队列这些消息会被丢弃。即当出现没有匹配的队列的消息,或是超时的消息则将消息转入到死信队列里去,等待重新处理或人工干预。
死信队列的应用场景:
参数 | 作用 |
---|---|
exchange | 交换机名称 |
type | 交换机类型 |
durable | 是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除 |
autoDelete | 是否自动删除,如果没有与之绑定的Queue,直接删除 |
internal | 是否内置的,如果为true,只能通过Exchange到Exchange |
arguments | 结构化参数 |
示例:
- Exchange.DeclareOk exchangeDeclare(String exchange,
- String type,
- boolean durable,
- boolean autoDelete,
- boolean internal,
- Map<String, Object> arguments) throws IOException;
准配虚拟机 开启一个Docker 拉取镜像rabbitmq 运行容器
具体步骤:有道云笔记
需要架包
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit-test</artifactId>
- <scope>test</scope>
- </dependency>

配置文件 application 生产与消费者都可用 端口需要改动 还有RabbitMQ服务地址需要改动
-
- server.port=8081
- ## rabbitmq config
- spring.rabbitmq.host=192.168.164.128
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=xhz
- spring.rabbitmq.password=123
- spring.rabbitmq.virtual-host=my_vhost
- ## 消费者数量
- spring.rabbitmq.listener.simple.concurrency=10
- spring.rabbitmq.listener.simple.max-concurrency=10
- #消费者每次从队列中获取的消息数量
- spring.rabbitmq.listener.simple.prefetch=1
- #消费者自动启动
- spring.rabbitmq.listener.simple.auto-startup=true
- #消费失败,自动重新入队
- spring.rabbitmq.listener.simple.default-requeue-rejected=true
- #启用发送重试
- spring.rabbitmq.template.retry.enabled=true
- spring.rabbitmq.template.retry.initial-interval=1000
- spring.rabbitmq.template.retry.max-attempts=3
- spring.rabbitmq.template.retry.max-interval=10000
- spring.rabbitmq.template.retry.multiplier=1.0

完成演示图
所有消费消息类 模块
1)配置直接交换机,队列,并将直接交换机和该队列绑定。(在RabbitMQConfig类中配置,该类使用了@Configuration注解)
- package com.rabbitmq.provider.rabbitmqprovider.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
-
-
- @Configuration
- public class DirectConfig {
- @Bean
- public Queue directQueue(){
- return new Queue("zking-direct-queue");
- }
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("zking-direct-exchange");
- }
-
- @Bean
- public Binding directBinding(){
- return BindingBuilder.bind(directQueue()).to(directExchange()).with("zking-direc");
- }
-
- }

2)编写通过直接交换机发送消息的方法
- package com.rabbitmq.provider.rabbitmqprovider.web;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.Map;
-
- @RestController
- public class SenderController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @RequestMapping("/sendDirect")
- public String sendDirect(String routing){
- Map msg=new HashMap<>();
- msg.put("code",200);
- msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
- .ofPattern("yyyy-MM-dd HH:mm:ss")));
- rabbitTemplate.convertAndSend("zking-direct-exchange",routing,msg);
- return "direct success";
- }
-
- }

3.测试交换机发送消息
http://localhost:8081/sendDirect?routing=zking-direc
4.消费消息
创建模块
消费消息 我们运行这个项目
- package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Component
- @Slf4j
- //queues参数指定的是与直接交换机关联的队列名称
- @RabbitListener(queues = "zking-direct-queue")
- public class DirecReciewer {
-
- @RabbitHandler
- public void receive(Map msg) {
- log.info("接收通过直接交换机发送的消息: " + msg);
- }
- }

打印结果
1) 配置主题交换机,队列,并将主题交换机和该队列绑定。
- package com.rabbitmq.provider.rabbitmqprovider.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- //@Configuration
- public class TopicConfig {
- /**
- * 声明Topic类型的交换机,支持序列化,后面队列进行绑定(topic_queue_q1,topic_queue_q2)
- * @return
- */
- @Bean(name="topicExchange")
- public Exchange topicExchange() {
-
- return ExchangeBuilder
- .topicExchange("topic_exchange")
- .durable(true)
- .build();
- }
-
-
- /**
- * 声明队列,该队列与topic交换机绑定
- * @return
- */
- @Bean(name="topicQueue1")
- public Queue topicQueue1() {
- return QueueBuilder.durable("topic_queue_q1").build();
- }
-
-
- /**
- * 声明队列,该队列与topic交换机绑定
- * @return
- */
- @Bean(name="topicQueue2")
- public Queue topicQueue2() {
- return QueueBuilder.durable("topic_queue_q2").build();
- }
-
-
- /**
- * 将队列(topic_queue_q1)与topic型交换机进行绑定
- * @param queue
- * @param exchange
- * @return
- */
- @Bean
- public Binding topicBindingQ1(
- @Qualifier("topicQueue1") Queue queue,
- @Qualifier("topicExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("topic.queue.#")
- .noargs();
- }
-
-
- /**
- * 将队列(topic_queue_q2)与topic型交换机进行绑定
- * @param queue
- * @param exchange
- * @return
- */
- @Bean
- public Binding topicBindingQ2(
- @Qualifier("topicQueue2") Queue queue,
- @Qualifier("topicExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("topic.queue.#")
- .noargs();
- }
- }

测试 发送消息
- package com.rabbitmq.provider.rabbitmqprovider;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.Map;
-
- @SpringBootTest
- class RabbitmqProviderApplicationTests {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- void contextLoads() {
- Map msg=new HashMap<>();
- msg.put("code",200);
- msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
- .ofPattern("yyyy-MM-dd HH:mm:ss")));
- rabbitTemplate.convertAndSend("topic_exchange","topic.queue.ab",msg);
- }
-
- }

第二种发送消息
- package com.rabbitmq.provider.rabbitmqprovider.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class TopicConfig1 {
-
- @Bean(name="topicQueue1")
- public Queue topicQueue1() {
- return QueueBuilder.durable("topic_queue_q1").build();
- }
- @Bean(name="topicQueue2")
- public Queue topicQueue2() {
- return QueueBuilder.durable("topic_queue_q2").build();
- }
-
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange("topic-exchange");
- }
- @Bean
- public Binding topicBinding1( @Qualifier("topicQueue1") Queue queue,
- @Qualifier("topicExchange") TopicExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("person.yy");
- }
-
- @Bean
- public Binding topicBinding2( @Qualifier("topicQueue2") Queue queue,
- @Qualifier("topicExchange") TopicExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("person.*");
- }
-
- }

测试发送消息
- package com.rabbitmq.provider.rabbitmqprovider.web;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.Map;
-
- @RestController
- public class SenderController {
-
- @RequestMapping("/sendTopic")
- public String sendTopic(String routing){
- Map msg=new HashMap<>();
- msg.put("code",200);
- msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
- .ofPattern("yyyy-MM-dd HH:mm:ss")));
- rabbitTemplate.convertAndSend("topic-exchange",routing,msg);
- return "direct success";
- }
-
- }

http://localhost:8081/sendTopic?routing=person.y 只有条件为 y 或者 *
http://localhost:8081/sendTopic?routing=person.yy
消费消息
- package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Component
-
- public class TopicReciewer {
- @RabbitListener(queues={"topic_queue_q1"})
- @RabbitHandler
- public void handler(Map map){
- System.out.println(map);
-
- }
-
- @RabbitListener(queues={"topic_queue_q2"})
- @RabbitHandler
- public void handler1(Map map){
- System.out.println(map);
-
- }
-
- }

1)配置广播交换机,队列,并将主题交换机和该队列绑定。
- package com.rabbitmq.provider.rabbitmqprovider.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanoutConfig {
-
- @Bean
- public Queue fanoutQueue1() {
- return new Queue("fanout-queue1");
- }
-
- @Bean
- public Queue fanoutQueue2() {
- return new Queue("fanout-queue2");
- }
-
- @Bean
- public Queue fanoutQueue3() {
- return new Queue("fanout-queue3");
- }
-
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanout-exchange");
- }
-
- @Bean
- public Binding fanoutBInding1(@Qualifier("fanoutQueue1") Queue queue,
- @Qualifier("fanoutExchange") FanoutExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange);
- }
-
- @Bean
- public Binding fanoutBInding2(@Qualifier("fanoutQueue2") Queue queue,
- @Qualifier("fanoutExchange") FanoutExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange);
- }
-
- @Bean
- public Binding fanoutBInding3(@Qualifier("fanoutQueue3") Queue queue,
- @Qualifier("fanoutExchange") FanoutExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange);
- }
- }

生产的队列
向服务器发送消息
- package com.rabbitmq.provider.rabbitmqprovider.web;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.Map;
-
- @RestController
- public class SenderController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @RequestMapping("/sendFanout")
- public String sendFanout(){
- Map msg=new HashMap<>();
- msg.put("code",200);
- msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
- .ofPattern("yyyy-MM-dd HH:mm:ss")));
- rabbitTemplate.convertAndSend("fanout-exchange",null,msg);
- return "direct success";
- }
- }

发送与消费
消费消息
- package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Component
- public class FanoutRecevier {
-
- @RabbitListener(queues={"fanout-queue1"})
- @RabbitHandler
- public void fanout(Map map){
- System.out.println(map);
-
- }
-
- @RabbitListener(queues={"fanout-queue2"})
- @RabbitHandler
- public void fanout1(Map map){
- System.out.println(map);
-
- }
- @RabbitListener(queues={"fanout-queue3"})
- @RabbitHandler
- public void fanout2(Map map){
- System.out.println(map);
-
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。