赞
踩
RabbitMQ作为一款用途非常广泛的消息队列,可以做到解耦,异步调用,以及流量削峰等非常强大的功能(上一篇博客有详细介绍
四种MQ的介绍与区别
)。接下来详细介绍RabbitMQ的具体代码实现~
RabbitMQ是一个使用Erlang语言开发的(即安装RabbitMQ之前,必须先安装Erlang,一键式傻瓜安装),实现AMQP (高级消息队列协议)的开源消息中间件。它可以实现异步消息处理,是一款消息代理,它接受和转发消息。
RabbitMQ有以下特点:
首先添加依赖,如下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
这里使用了自动配置版本,若指定version,需要和springboot版本保持一致。
然后在yaml文件配置RabbitMQ的相关ip以及端口
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
这里使用了自动配置版本,若指定version,需要和springboot版本保持一致
@Configuration
public class RabbitMQConfig {
/**
* 创建队列*/
@Bean
public Queue createQ1(){
return new Queue("queue01");
}
}
@Component
@RabbitListener(queues = "queue01") //queues指定监听的队列名称,即要消费哪一个队列的消息
public class MqListener {
@RabbitHandler
public void handler(String msg){ //处理消息
System.err.println("监听器消费消息:"+msg+"--->"+ LocalTime.now());
}
}
这里我们创建一个接口,来向队列发送消息,看看能不能被监听器消费掉
@RestController
public class Controller {
@Resource
private RabbitTemplate template;
@GetMapping("/api/mq/test1")
public String sendMsg(String msg){
System.out.println("发送消息:"+msg+"--->"+ LocalTime.now());
template.convertAndSend("","queue01",msg);
return "OK";
}
}
上述便是最基础的消息队列(点对点),一个队列,一个生产者,一个消费者。
总
如果同时有多个监听器监听同一个队列呢?下面再添加一个监听器
@Component
@RabbitListener(queues = "queue01")
public class MqListener01 {
@RabbitHandler
public void handler(String msg){
System.err.println("(MqListener01)监听器消费消息:"+msg+"--->"+ LocalTime.now());
}
}
接下来我们多次向队列发送消息
修改一下发送消息接口:
@RestController
public class Controller {
@Resource
private RabbitTemplate template;
@GetMapping("/api/mq/test1")
public String sendMsg(String msg){
for(int i=1; i<11; i++){
System.out.println("第"+i+"次发送消息:"+msg+"--->"+ LocalTime.now());
template.convertAndSend("","queue01","第"+i+"个消息"+msg);
}
return "OK";
}
}
控制台输出如下:
因此,我们可以总结以下:
当有多个监听器监听同一个消息队列,监听器会相互竞争处理消息。
RabbitMQ基于Exchange实现消息的一对多,一个消息可以被消费多次
Exchange有4种类型:
1.fanout 直接转发,不对消息做过滤
2.direct 路由关键字过滤,只支持精确值,可以对消息进行过滤匹配
3.topic 路由关键字过滤,支持模糊值, *:1个单词 #:0或多个单词
4.header 消息头过滤,对消息的请求消息头进行匹配过滤,any(类似 or):任意1个 all:全部(类似 and)
首先,创建两个消息队列,以及一个交换器,两个消息队列分别与交换器绑定
public class RabbitMQConfig01 { /** * 创建交换器*/ @Bean public FanoutExchange createFe(){ return new FanoutExchange("fanout"); } /** * 创建队列*/ @Bean public Queue createQ2(){ return new Queue("queue02"); } @Bean public Queue createQ3(){ return new Queue("queue03"); } /** * 创建绑定关系*/ @Bean public Binding createBq2(FanoutExchange fe){ return BindingBuilder.bind(createQ2()).to(fe); } @Bean public Binding createBq3(FanoutExchange fe){ return BindingBuilder.bind(createQ3()).to(fe); } }
接下来我们创建两个监听器,分别监听queue02和queue03这两个消息队列
@Component @RabbitListener(queues = "queue02") public class MqListener02 { @RabbitHandler public void handler(String msg){ System.err.println("(MqListener02)监听器消费消息:"+msg+"--->"+ LocalTime.now()); } @Component @RabbitListener(queues = "queue03") class MqListener03 { @RabbitHandler public void handler(String msg) { System.err.println("(MqListener03)监听器消费消息:" + msg + "--->" + LocalTime.now()); } } }
现在,创建一个接口,来向交换器发送信息
@GetMapping("/api/mq/test2")
public String sendMsg02(String msg) {
System.out.println("向交换器发送消息");
template.convertAndSend("fanout", "", msg);
return "OK";
}
结果如下
可以得到以下结论:
向交换器发送消息,该交换器绑定的所有队列都会收到消息。即一条消息被多个消费者多次消费
direct 交换器可以对指定的固定字符进行过滤 首先创建direct交换器,并且绑定队列 @Configuration public class RabbitMQConfig02 { /** * 创建交换器*/ @Bean public DirectExchange createDe(){ return new DirectExchange("direct"); } /** * 创建队列*/ @Bean public Queue createQ4(){ return new Queue("queue04"); } @Bean public Queue createQ5(){ return new Queue("queue05"); } /** * 创建绑定关系*/ @Bean public Binding createBq4(DirectExchange de){ return BindingBuilder.bind(createQ4()).to(de).with("info"); //创建绑定关系,并且声明固定字符 } @Bean public Binding createBq5(DirectExchange de){ return BindingBuilder.bind(createQ5()).to(de).with("error"); } }
创建监听器 @Component @RabbitListener(queues = "queue04") public class MqListener04 { @RabbitHandler public void handler(String msg){ System.err.println("(MqListener04_info)监听器消费消息:"+msg+"--->"+ LocalTime.now()); } @Component @RabbitListener(queues = "queue05") class MqListener05 { @RabbitHandler public void handler(String msg) { System.err.println("(MqListener05_error)监听器消费消息:" + msg + "--->" + LocalTime.now()); } } }
创建接口,向交换器发送消息
@GetMapping("/api/mq/test3")
public String sendMsg03(String msg,String key) {
System.out.println("向交换器发送消息");
template.convertAndSend("direct", key, msg); //且指定key,对应交换器的过滤字符
return "OK";
}
结果如下
可以看到,我们向direct交换器发送消息时,指定了key,即该消息只会被转发到指定了相同key的消息队列上
首先,创建topic交换器,且绑定队列
@Configuration public class RabbitMQConfig03 { /** * 创建交换器*/ @Bean public TopicExchange createTe(){ return new TopicExchange("topic"); } /** * 创建队列*/ @Bean public Queue createQ6(){ return new Queue("queue06"); } @Bean public Queue createQ7(){ return new Queue("queue07"); } /** * 创建绑定关系*/ @Bean public Binding createBq6(TopicExchange te){ return BindingBuilder.bind(createQ6()).to(te).with("*"); //*表示一个单词 } @Bean public Binding createBq7(TopicExchange te){ return BindingBuilder.bind(createQ7()).to(te).with("#");// #表示任意个单词 } }
创建监听器监听队列 @Component @RabbitListener(queues = "queue06") public class MqListener06 { @RabbitHandler public void handler(String msg){ System.err.println("(MqListener06_*)监听器消费消息:"+msg+"--->"+ LocalTime.now()); } @Component @RabbitListener(queues = "queue07") class MqListener07 { @RabbitHandler public void handler(String msg) { System.err.println("(MqListener07_#)监听器消费消息:" + msg + "--->" + LocalTime.now()); } } }
创建接口,向交换器发送消息
@GetMapping("/api/mq/test4")
public String sendMsg04(String msg,String key) {
System.out.println("向交换器发送消息");
template.convertAndSend("topic", key, msg);
return "OK";
}
结果如下
可以看到,我们指定发送的key为“skr”,两个队列的绑定关系都成立
接下来,我们换一下,key為“skr.skr”
可以看到成功发送消息到过滤字符为‘#’的队列,应为“skr.skr”相当于两个单词(.为分隔符)
创建header交换器
@Configuration public class RabbitMQConfig04 { /** * 创建交换器*/ @Bean public HeadersExchange createHe(){ return new HeadersExchange("header"); } /** * 创建队列*/ @Bean public Queue createQ8(){ return new Queue("queue08"); } @Bean public Queue createQ9(){ return new Queue("queue09"); } /** * 创建绑定关系*/ @Bean public Binding createBq8(HeadersExchange he){ return BindingBuilder.bind(createQ8()).to(he).whereAny("id","author").exist(); //any 任意1个即可 } @Bean public Binding createBq9(HeadersExchange he){ return BindingBuilder.bind(createQ9()).to(he).whereAll("id","author").exist(); //all 同时满足 } }
创建监听器
@Component @RabbitListener(queues = "queue08") public class MqListener08 { @RabbitHandler public void handler(String msg){ System.err.println("(MqListener08_any)监听器消费消息:"+msg+"--->"+ LocalTime.now()); } @Component @RabbitListener(queues = "queue09") class MqListener09 { @RabbitHandler public void handler(String msg) { System.err.println("(MqListener09_all)监听器消费消息:" + msg + "--->" + LocalTime.now()); } } }
创建接口发送消息
@GetMapping("/api/mq/test5")
public String sendMsg(String msg){
MessagePostProcessor postProcessor=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("id","121"); //指定消息头
return message;
}
};
template.convertAndSend("header","",msg,postProcessor);
return "OK";
}
结果如下
即绑定any的队列收到了消息,因为我们只指定了id这一个消息头
当一个队列过期,长度超过指定值就会成为死信队列。随之这个消息会自动绑定一个交换器。
5.1 创建死信队列,正常队列和交换器绑定
@Configuration public class RabbitMQConfigdead { //1.创建 死信交换器 专门转发死信消息 @Bean("dead") public DirectExchange createDLX(){ return new DirectExchange("deadex"); } //2.创建队列 //队列 生成死信 1.没有消费者 2.有效期 3.设置死信和死信匹配关键字 @Bean public Queue createQ10() { Map<String, Object> param = new HashMap<>(); //设置死信交换器 param.put("x-dead-letter-exchange", "deadex"); //设置死信交换器 匹配的路由 param.put("x-dead-letter-routing-key", "test1"); //设置队列中消息的有效期 8秒 param.put("x-message-ttl", 8000); return QueueBuilder.durable("queue10").withArguments(param).build(); } @Bean public Queue createQ11(){ return new Queue("queue11"); } //3.绑定 @Bean public Binding createBd8(@Qualifier("dead") DirectExchange dead){ return BindingBuilder.bind(createQ11()).to(dead).with("test1"); } }
这里queue10消息队列,在8s后会过期,成为死信队列
然后创建监听器,监听queue11这个正常队列的消息
@Component
@RabbitListener(queues = "queue11")
public class MqListenerDead {
@RabbitHandler
public void handler(String msg){
System.err.println("(MqListenerDead)监听器消费消息:"+msg+"--->"+ LocalTime.now());
}
}
然后创建接口,向queue这个死信队列发送消息
@GetMapping("/api/mq/test6")
public String sendMsg06(String msg) {
System.out.println("向8s后会成为死信队列的队列发送消息--->"+LocalTime.now());
template.convertAndSend("", "queue10", msg);
return "OK";
}
这里启动可能会报冲突,应为前面有一个注册bean同样注册了DirectExchange类型,我们加个注解,使之通过名称指定(正常启动请忽略),
修改后的direct交换器配置类
@Configuration public class RabbitMQConfig02 { /** * 创建交换器*/ @Bean("direct") public DirectExchange createDe(){ return new DirectExchange("direct"); } /** * 创建队列*/ @Bean public Queue createQ4(){ return new Queue("queue04"); } @Bean public Queue createQ5(){ return new Queue("queue05"); } /** * 创建绑定关系*/ @Bean public Binding createBq4(@Qualifier("direct") DirectExchange de){ return BindingBuilder.bind(createQ4()).to(de).with("info"); } @Bean public Binding createBq5(@Qualifier("direct") DirectExchange de){ return BindingBuilder.bind(createQ5()).to(de).with("error"); } }
可以看到,监听queue11的监听器收到了消息,也就是说我们发送到queue10的消息被交换器转发了。
当然,如果有监听器监听queue10,是之在过期之前被消费掉,他也就不会成为死信队列被转发到queue11.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。