赞
踩
主题模式Topic是使用最多的模式,也是4种模式的最终版
消息发送到 交换机 --交换机下有不同的路由–不同的路由下有不同的队列
将消息发送到 队列中,消费者监听队列。获取相应的消息
1、pom文件
<!-- RabbitMQ-AMQP依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、yaml配置
spring:
#RabbitMQ
rabbitmq:
#服务器地址
host: 127.0.0.1
#用户名
username: guest
#密码
password: guest
#虚拟主机
virtual-host: /
#端口
port: 5672
listener:
simple:
#消费者最小数量
concurrency: 10
#消费者最大数量
max-concurrency: 10
#限制消费者每次只处理一条消息,处理完再继续下一条消息
prefetch: 1
#启动时是否默认启动容器,默认true
auto-startup: true
#被拒绝时重新进入队列
default-requeue-rejected: true
template:
retry:
#发布重试,默认false
enabled: true
#重试时间 默认1000ms
initial-interval: 1000
#重试最大次数,默认3次
max-attempts: 3
#重试最大间隔时间,默认10000ms
max-interval: 10000
#重试间隔的乘数。比如配2.0 第一次等10s,第二次等20s,第三次等40s
multiplier: 1.0
3、配置类
配置一个 MQ队列用来接收消息
配置 队列:名称 queue 并且持久化(队列和消息都要持久化)
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置类
*
* 准备 队列
*
*/
@Configuration
public class RabbitMqConfig {
// 配置 队列:名称 queue 并且持久化(队列和消息都要持久化)
@Bean
public Queue queue(){
return new Queue("queue",true);
}
}
向队列发送消息,监听此队列获取消息
消息发送者【生产者】
/**
* 消息发送者
*
*/
@Service
@Slf4j
public class MqSender {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
public void send(Object msg){
log.info("发送消息:" + msg);
// 往队列中发送 msg消息
rabbitTemplate.convertAndSend("queue",msg);
}
}
消息接收者【消费者】
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "queue")
public void receive(Object msg){
log.info("接收消息:" + msg);
}
}
测试请求
@Autowired
MqSender mqSender;
@RequestMapping("/mq")
@ResponseBody
public void mq(){
mqSender.send("小白兔");
}
运行项目:
在RabbitMq的可视化页面,可以看到 有一个长连接(接收者要时刻监听消息)
有多个通道
自定义的队列
调用接口:
点击页面的队列,可以看到
表示 消息 生成 和消息被消费了
创建交换机并在它下绑定两个队列,向交换机发送消息,监听两个队列的消费者都可以获取消息
1、MQ配置类
创建两个队列 和一个交换机,并将队列绑定到交换机上
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置类
*
* 准备 队列
*
*/
@Configuration
public class RabbitMqConfig {
// 广播模式 :准备两个队列 一个交换机
private static final String QUEUE01 = "queue-fanout01";
private static final String QUEUE02 = "queue-fanout02";
private static final String EXCHANGE = "fanoutExchange";
// 创建 队列一
@Bean
public Queue queue01(){
return new Queue(QUEUE01,true);
}
// 创建 队列2
@Bean
public Queue queue02(){
return new Queue(QUEUE02,true);
}
// 创建 交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGE,true,false);
}
// 将队列绑定到交换机上
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue01()).to(fanoutExchange());
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue02()).to(fanoutExchange());
}
// // 配置 队列:名称 queue 并且持久化(队列和消息都要持久化)
// @Bean
// public Queue queue(){
// return new Queue("queue",true);
// }
}
2、消息发送者【生产者】
将消息发送到 交换机上,路由为空
/**
* 消息发送者
*
*/
@Service
@Slf4j
public class MqSender {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
public void send(Object msg){
log.info("发送消息:" + msg);
// 将消息发送到交换机
rabbitTemplate.convertAndSend("fanoutExchange","",msg);
}
}
3、消息接收者【消费者】
监听两条队列的消息
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "queue-fanout01")
public void receive01(Object msg){
log.info("01接收消息:" + msg);
}
@RabbitListener(queues = "queue-fanout02")
public void receive02(Object msg){
log.info("02接收消息:" + msg);
}
}
4、测试类
@RequestMapping("/mq")
@ResponseBody
public void mq(){
mqSender.send("小白兔");
}
运行项目
在MQ可视化页面可以看到创建的交换机
点击交换机。可以看到 绑定的两个 队列
并且 队列目录下也有这两个队列
运行结果:交换机下的两个队列都接收到了消息
消息发送到交换机并指定 路由键,就会发送对应的队列上
如:发送到X交换机 ,指定black路由键,消息会到 Q2队列中,由监听该队列的消费者获取消息
1、配置类
创建两个队列,一个交换机,两个路由,将队列绑定到交换机的路由key上
package com.example.seckill.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置类
*
* 准备 队列
*
*/
@Configuration
public class RabbitMqConfig {
// 广播模式 :准备两个队列 一个交换机
private static final String QUEUE01 = "queue-direct01";
private static final String QUEUE02 = "queue-direct02";
private static final String EXCHANGE = "directExchange";
private static final String ROUTINGKEY01 = "queue.red";
private static final String ROUTINGKEY02 = "queue.green";
// 创建 队列一
@Bean
public Queue queue01(){
return new Queue(QUEUE01,true);
}
// 创建 队列2
@Bean
public Queue queue02(){
return new Queue(QUEUE02,true);
}
// 创建 交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE,true,false);
}
// 将队列绑定到交换机上 并指定路由键
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
}
}
2、消息发送者【生产者】
消息发送到 指定交换机的指定 路由key上
package com.example.seckill.rabbit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 消息发送者
*
*/
@Service
@Slf4j
public class MqSender {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
public void send01(Object msg){
log.info("发送消息:" + msg);
// 将消息发送到交换机
rabbitTemplate.convertAndSend("directExchange","queue.red",msg);
}
public void send02(Object msg){
log.info("发送消息:" + msg);
// 将消息发送到交换机
rabbitTemplate.convertAndSend("directExchange","queue.green",msg);
}
}
3、消息接收者【消费者】
监听队列,获取队列的消息
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "queue-direct01")
public void receive01(Object msg){
log.info("01接收消息:" + msg);
}
@RabbitListener(queues = "queue-direct02")
public void receive02(Object msg){
log.info("02接收消息:" + msg);
}
}
4、测试:
@Autowired
MqSender mqSender;
@RequestMapping("/mq")
@ResponseBody
public void mq(){
mqSender.send01("小白兔");
mqSender.send02("白又白");
}
运行项目
在可视化页面可以看到创建的 交换机,点进去,可以看到绑定的两个队列、以及队列对应的路由
运行结果
01发送小白兔,01接收小白兔
主题模式是在路由模式的基础上,对路由key做了通配符匹配,以满足复杂的消息分发场景
注意:
‘#’ :匹配0个或者多个
‘*’:匹配一个
如:#.red: 可以匹配:a.red、a.b.red等
*.red:可以匹配a.red,不能匹配a.b.red
1、配置类
使用通配符定义 路由key
key1:#.queue.# —匹配路由中带有.queue 的所有路由
key2:*.queue.# — 匹配 类似:a.queue的所有路由
package com.example.seckill.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置类
*
* 准备 队列
*
*/
@Configuration
public class RabbitMqConfig {
// 广播模式 :准备两个队列 一个交换机
private static final String QUEUE01 = "queue-topic01";
private static final String QUEUE02 = "queue-topic02";
private static final String EXCHANGE = "topicExchange";
private static final String ROUTINGKEY01 = "#.queue.#";
private static final String ROUTINGKEY02 = "*.queue.#";
// 创建 队列一
@Bean
public Queue queue01(){
return new Queue(QUEUE01,true);
}
// 创建 队列2
@Bean
public Queue queue02(){
return new Queue(QUEUE02,true);
}
// 创建 交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE,true,false);
}
// 将队列绑定到交换机上 并指定路由键
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
}
}
2、消息发送者【生产者】
01发送的路由为 queue.red.message,匹配的是 路由key1
02发送的路由是 a.queue.green 匹配的是 路由key1和key2
所以 02 发送的消息到了 queue01 和 queue 02两个队列
/**
* 消息发送者
*
*/
@Service
@Slf4j
public class MqSender {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
public void send01(Object msg){
log.info("发送消息:" + msg);
// 将消息发送到交换机
rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
}
public void send02(Object msg){
log.info("发送消息:" + msg);
// 将消息发送到交换机
rabbitTemplate.convertAndSend("topicExchange","a.queue.green",msg);
}
}
3、消息接收者【消费者】
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "queue-topic01")
public void receive01(Object msg){
log.info("01接收消息:" + msg);
}
@RabbitListener(queues = "queue-topic02")
public void receive02(Object msg){
log.info("02接收消息:" + msg);
}
}
4、测试:
@Autowired
MqSender mqSender;
@RequestMapping("/mq")
@ResponseBody
public void mq(){
mqSender.send01("小白兔");
mqSender.send02("白又白");
}
结果:
分为三部分:
1、消息体类:
/**
* MQ消息数据模型
*/
@Data
public class MQParam implements Serializable {
private Long id;
private String name;
private String messageId;//储存消息发送的唯一标识
}
2、消息配置类
/**
* RabbitMq配置类
*
* 准备 队列
*
*/
@Configuration
public class RabbitMqConfig {
// 广播模式 :准备两个队列 一个交换机
private static final String QUEUE01 = "queue-topic01";
private static final String QUEUE02 = "queue-topic02";
private static final String EXCHANGE = "topicExchange";
private static final String ROUTINGKEY01 = "#.queue.#";
private static final String ROUTINGKEY02 = "*.queue.#";
// 创建 队列一
@Bean
public Queue queue01(){
return new Queue(QUEUE01);
}
// 创建 队列2
@Bean
public Queue queue02(){
return new Queue(QUEUE02);
}
// 创建 交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE);
}
// 将队列绑定到交换机上 并指定路由键
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
}
}
3、发送消息【生产者】
首先定义 回调函数,让生产者知道 消息成功发送到了MQ服务器
注意: :RabbitTemplate是spring生成的Bean,是单例的,这设置一次回调后,其他使用默认RabbitTemplate的发送者发送消息都会触发这个回调
package com.example.seckill.rabbit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 发送消息
*/
@Service
@Slf4j
public class MqSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
/**
* 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
* @param correlationData 相关配置信息
* @param b exchange 交换机,判断交换机是否成功收到消息 true 表示交换机收到
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("data:" + correlationData);
log.info("---- confirm ----ack="+b+" cause="+String.valueOf(s));
if (b){
System.out.println("消息被 MQ 接收");
}else {
System.out.println("消息没有被MQ接收");
}
}
/**
* 交换机并未将数据丢入指定的队列中时,触发
* channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
* 参数三:true 表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
* @param returnedMessage 消息对象
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("----交换机并未将数据丢入指定的队列中----replyCode="+returnedMessage.getReplyCode()+" replyText="+returnedMessage.getReplyText()+" ");
}
// 发送消息
public void send01(MQParam param) {
// 设置交换机处理失败消息的模式 true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
// 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
rabbitTemplate.setMandatory(true);
// MQ收到消息后,手动ack回执
rabbitTemplate.setConfirmCallback(this);
// return 配置
rabbitTemplate.setReturnsCallback(this);
// 设置消息的唯一ID
CorrelationData data = new CorrelationData();
data.setId(param.getMessageId());
log.info("发送消息:" + param);
// 将消息发送到交换机
rabbitTemplate.convertAndSend(
"topicExchange", // 交换机
"queue", // 路由
param, // 消息体内容
data // 消息唯一Id
);
}
}
}
ConfirmCallback实现判断消息是否到交换机
ReturnCallback,实现判断 消息到达交换机后是否到队列中
4、消费者
消费者可以获取 消息体和消息头部信息
使用:channel.basicAck(tag,false); 方法,告知MQ,消息已经被消费
tag:是这个消息的tagID,false:只确认当前的消息收到,true:确认所有的消息收到
package com.example.seckill.rabbit;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
/**
* 消息接收者
*/
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "queue-topic01")
@RabbitHandler
public void receive01(@Payload MQParam param,
@Headers Map<String,Object> headers,
Channel channel) throws IOException {
System.out.println("-----收到消息了-----");
System.out.println("---发过来的用户是:" + param);
System.out.println("---发过来的heard是:" + headers);
/**
* basicAck:表示确认已经消费消息,通知MQ,需要先得到 deliveryTag
* deliveryTag 从消息头里get到
*/
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(tag,false);
}
}
5、yaml配置回调
publisher-confirm-type: correlated
1、如果消费者发送了ack,MQ将把这个消息从待确认中删除,
2、如果消费者发送了nack,并指定不重入队列,此消息会被删除,
3、 重试:如果消费者发送了nack,指定重入队列,那么这条消息会进入队列,重新发送给消费者
注意: 重试的消息的消息头是amqp_redelivered属性会被设置成 true,客户端由此判断该消息是否被确认,
如果不做判断,每次失败重入队列再次发送,会导致不停的发送与拒绝。
消费者的手动确认消息
basicNack方法:
第一个参数是 该消息的tagId,
第二个参数:为true表示包含当前消息在内的所有比该消息的deliveryTag值小的消息都被拒绝, 除了已经被应答的消息。为false则表示只拒绝本条消息,
第三个参数:是否重入队列。首先判断此消息是否为重试消息,是的话就不重入队列,不是就重入队列,再次发送
package com.example.seckill.rabbit;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
/**
* 消息接收者
*/
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "queue-topic01")
@RabbitHandler
public void receive01(@Payload MQParam param,
@Headers Map<String,Object> headers,
Channel channel) throws IOException {
System.out.println("-----收到消息了-----");
System.out.println("---发过来的用户是:" + param);
System.out.println("---发过来的heard是:" + headers);
/**
* basicAck:表示确认已经消费消息,通知MQ,需要先得到 deliveryTag
* deliveryTag 从消息头里get到
*/
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag,false);
}catch (Exception e){
// 判断是否是重试消息
boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
channel.basicNack(tag,false,!redelivered);
}
}
}
yaml 配置自动确认以及重试
spring:
rabbitmq:
listener:
retry:
# 重试次数
max-attempts: 3
# 开启重试机制
enabled: true
死信 + 延迟消息
场景:红包,过时退回。订单:过期取消
比如生产者发送的消息到MQ,这条消息因为各种原因没有被消费,消息最终死了。
死信队列和死信交换机,和普通的没有区别,将其配置为死信的处理者,死信转发到死信交换机和死信队列上,对死信进行处理。
这种设置、处理 在 RabbitMQ 中是点对点的,即一个普通队列 可以绑定一个死信交换机。
1、队列长度满了
2、消费者拒绝消费消息(丢弃):basicNack() .basicNack方法
3、消息TTL(存活时间) 过期 :TTL可以设置在队列、单条消息上,如果在队列上,则等同于设置该队列下所有消息
产生死信后,消息,会到死信交换机,再由死信交换机路由到 死信队列上,死信队列再推送给这个队列的消费者
1、创建没有消费者的队列,设置TTL,并绑定死信交换机
2、所有需要延迟的消息全部向这条队列发送
3、死信交换机绑定对应的死信队列,其消费者即为处理延迟消息的服务
配置类、再创建普通队列中设置了死信队列及死信路由,并且设置了延迟时间
package com.example.seckill.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMq配置类
*
* 准备 队列
*
*/
@Configuration
public class RabbitMqConfig {
// 普通交换机
public final static String SKYPYB_ORDINARY_EXCHANGE = "ordinary-exchange";
// 死信交换机
public final static String SKYPYB_DEAD_EXCHANGE = "dead-exchange";
// 普通队列
public final static String SKYPYB_ORDINARY_QUEUE_1 = "ordinary-queue";
// 死信队列
public final static String SKYPYB_DEAD_QUEUE = "dead-queue";
// 普通路由
public final static String SKYPYB_ORDINARY_KEY = "key.ordinary.one";
// 死信路由
public final static String SKYPYB_DEAD_KEY = "key.dead";
// 创建普通交换机
@Bean
public DirectExchange ordinaryExchange() {
return new DirectExchange(SKYPYB_ORDINARY_EXCHANGE, false, true);
}
// 创建死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(SKYPYB_DEAD_EXCHANGE, false, true);
}
// 创建 普通队列,并绑定死信路由和死信交换机 该队列的死信消息,转发到绑定的死信交换机上
@Bean
public Queue ordinaryQueue() {
Map<String, Object> arguments = new HashMap<>();
//TTL 5s
arguments.put("x-message-ttl", 1000 * 5);
// 设定当前队列中,允许存放的最大消息数目
arguments.put("x-max-length",10);
//绑定死信队列和死信交换机
arguments.put("x-dead-letter-exchange", SKYPYB_DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", SKYPYB_DEAD_KEY);
return new Queue(SKYPYB_ORDINARY_QUEUE_1, false, false, true, arguments);
}
// 创建死信队列
@Bean
public Queue deadQueue() {
return new Queue(SKYPYB_DEAD_QUEUE, false, false, true);
}
// 普通的绑定
@Bean
public Binding bindingOrdinaryExchangeAndQueue() {
return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(SKYPYB_ORDINARY_KEY);
}
// 死信的绑定
@Bean
public Binding bindingDeadExchangeAndQueue() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(SKYPYB_DEAD_KEY);
}
}
消息发送者【生产者】发送消息到普通队列
package com.example.seckill.rabbit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 发送消息
*/
@Service
@Slf4j
public class MqSender {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
/**
* 回调函数,confirm 确认
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/***
*
* @param correlationData 消息的唯一ID
* @param b 确认消息是否被MQ 服务器接收,true:接收 ,false:未接收
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("data:" + correlationData);
if (b) {
System.out.println("消息被 MQ 接收");
} else {
System.out.println("消息没有被MQ接收");
}
}
};
// 发送消息
public void send01(MQParam param) {
// 设置回调
rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData data = new CorrelationData();
data.setId(param.getMessageId());
log.info("发送消息:" + param);
// 将消息发送到交换机
rabbitTemplate.convertAndSend("ordinary-exchange",
"key.ordinary.one", param);
rabbitTemplate.convertAndSend("ordinary-exchange",
"key.ordinary.one", param);
log.info("-----消息发送完毕-----");
}
}
消费者 监听死信队列
package com.example.seckill.rabbit;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
/**
* 消息接收者
*/
/**
* 消息接收者
*/
@Service
@Slf4j
public class MqReceiver {
// 监听 queue 队列的消息
@RabbitListener(queues = "dead-queue")
public void receive01(@Payload MQParam param,
@Headers Map<String,Object> headers,
Channel channel) throws IOException {
System.out.println("-----死信队列收到消息了-----");
System.out.println("---发过来的用户是:" + param);
System.out.println("---发过来的heard是:" + headers);
/**
* basicAck:表示确认已经消费消息,通知MQ,需要先得到 deliveryTag
* deliveryTag 从消息头里get到
*/
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag,false);
}catch (Exception e){
System.err.println(e.getMessage());
// 判断是否是重试消息
boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
channel.basicNack(tag,false,!redelivered);
}
}
}
测试:
@RequestMapping("/mq")
@ResponseBody
public void mq(){
MQParam param = new MQParam();
param.setId(System.currentTimeMillis());
param.setName("用户1");
param.setMessageId("param$" + System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
mqSender.send01(param);
}
可以发现,在发送消息 5秒 之后,死信队列 监听到了消息
以上是设置的队列的存活时间,还可以设置 消息的存活时间
在 生产者代码中 设置消息的TTL 为3秒
// 发送消息
public void send01(MQParam param) {
// 设置回调
rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData data = new CorrelationData();
data.setId(param.getMessageId());
log.info("发送消息:" + param);
// 将消息发送到交换机
rabbitTemplate.convertAndSend(
"ordinary-exchange",
"key.ordinary.one",
param,
(message -> {
message.getMessageProperties().setExpiration("3000");
return message;
}));
rabbitTemplate.convertAndSend("ordinary-exchange",
"key.ordinary.one", param);
log.info("-----消息发送完毕-----");
}
可以看到 3秒后 监听到一条消息,5秒后又监听到一条消息
改变上述的 生产者代码为,先发送TTL队列的消息,再发送设置了消息TTL 3 秒的消息,死信队列等待了5秒才监听到消息,并不是先监听到三秒的消息。
原因:
队列是先进后出的有序队列,MQ只对队尾的消息进行超时判断。上述队尾消息是5秒超时,所以不会先判断3秒的。
有固定时间的延迟任务,RabbitMQ还是很好的。
工具::rabbitmq_delayed_message_exchange插件,称为延迟消息交换机,
作用是:在 direct、topic、fanout 等这些 exchange 基础上新增一个交换机类型 x-delayed-message
使用:只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数x-delay[:毫秒值] 就能够实现每条消息的异步延时
原理: 创建 延迟消息交换机,需要延迟的消息都发送到这个队列上。和死信无关了
配置类
创建自定义交换机,指定类型x-delayed-message
@Configuration
public class RabbitBindConfig {
public final static String SKYPYB_DELAY_EXCHANGE = "delay-exchange";
public final static String SKYPYB_DELAY_QUEUE = "delay-queue";
public final static String SKYPYB_DELAY_KEY = "key.delay";
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args);
}
@Bean
public Queue delayQueue() {
return new Queue(SKYPYB_DELAY_QUEUE, false, false, true);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs();
}
}
消费者
监听该 延迟队列
@RabbitListener(queues = {RabbitBindConfig.SKYPYB_DELAY_QUEUE})
@Component
@Slf4j
public class DelayReceiver {
@RabbitHandler
public void onDelayMessage(@Payload String message,
@Headers Map<String, Object> headers,
Channel channel) throws IOException {
log.info("监听延时交换机, 收到消息: {}", message);
//delivery tag可以从headers中get出来
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
channel.basicNack(deliveryTag, false, !redelivered);
}
}
}
生产者
rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
RabbitBindConfig.SKYPYB_DELAY_KEY, "消息体-5s",
(msg) -> {
msg.getMessageProperties().setDelay(5000);
return msg;
});
rabbitTemplate.convertAndSend(
RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
RabbitBindConfig.SKYPYB_DELAY_KEY,
"消息体-3s",
(msg) -> {
msg.getMessageProperties().setDelay(3000);
return msg;
});
logger.info("-----消息发送完毕-----");
@Bean
public Queue queue01(){
return new Queue(QUEUE01,true);
}
交换机持久化
// 创建 交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE,true,false);
}
参考
优点:
1、应用解耦:订单系统、库存系统,之间通过rabbitMQ 连接,模块处理数据连接rabbitmq即可,降低模块间的耦合度。
2、异步提速:支付成功后,有订单、物流等业务,可以使用 mq进行异步操作,提高效率。
3、流量削峰:大量请求进入 rabbit的队列中,将每秒500请求,从队列取出,交给接口处理。
缺点
RabbitMQ 一旦崩了,全崩了。
ConfirmCallback:实现判断消息是否到交换机
ReturnCallback:实现判断 消息到达交换机后是否到队列中
1、消费者宕机,导致队列中的消息无法被消费
2、消费者的业务逻辑过大,导致消费能力不足
3、生产者产生的消息过多,比如”双十一“,导致消费者处理不过来
1、增加消费者的数量
2、将消息从队列中取出,存进数据库,后期慢慢处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。