赞
踩
微服务之间的通讯有同步和异步两种方式:
同步通讯:就像是打电话,需要实时的响应。
异步通讯:就像是发邮件,不需要马上就响应。
两种方式各有优劣:
打电话可以立即得到对方的响应,但是你不能同时和多个人同时通话,而发邮件可以同时给多个人发,但是响应会延迟。
在学习springcloud的时候,微服务之间的调用通过feign调用,这种方式就是同步通讯,虽然可以实时得到结果,但是存在下面的问题:
异步通讯则可以避免上面的问题:
以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓
库分配响应的库存并准备发货。在事件模式中,支付服务是事件发布者,在支付完成后只需要发布一个支付成功的
事件,事件中带上订单id。订单服务和物流服务是事件订阅者,订阅支付成功的事件,监听到事件后完成自己业务
即可。
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件
到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议
一样,让服务间的通讯变得标准和可控。
好处:
缺点:
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
基于centos7虚拟机中使用docker安装。
-- 拉取镜像
docker pull rabbitmq:3.8-management
-- 执行命令运行MQ容器
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
MQ的基本结构:
RabbitMQ中的一些角色:
创建springboot项目,包括三部分:
父工程导入依赖:
<dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--单元测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
简单队列模式的模型图:
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
思路:
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("172.20.211.110"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("root"); factory.setPassword("root"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
代码思路:
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("172.20.211.110"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("root"); factory.setPassword("root"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
代码执行顺序:
先执行publisher发送消息,再执行consumer消费消息
执行结果:
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP提供了三个功能:
在父工程引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
首先配置MQ地址,在publisher服务的application.yml中添加配置:
spring:
rabbitmq:
host: 172.20.211.110
port: 5672
virtual-host: /
username: root
password: root
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } }
首先配置MQ地址,在consumer服务的application.yml中添加配置:
spring:
rabbitmq:
host: 172.20.211.110 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: root # 密码
然后在consumer服务的cn.itcast.mq.listener
包中新建一个类SpringRabbitListener,代码如下:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息
结果:
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/** * workQueue * 向队列中不停发送消息,模拟消息堆积。 */ @Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
测试:
启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。
可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
host: 172.20.211.110 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: root # 密码
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成后才能获取下一条消息
Work模型的使用:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
我们的计划是这样的:
在consumer中创建一个类,声明队列和交换机:
@Configuration public class FanoutConfig { /** * 声明交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("czy.fanout"); } /** * 第一个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1Exchange(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第二个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2Exchange(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
在publisher服务的SpringAmqpTest类中添加测试方法:
/**
* fanoutExchange
*
*/
@Test
public void testFanoutExchange(){
//交换机名称
String exchangeName = "czy.fanout";
//消息
String message = "czy.fanout,hello";
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
交换机的作用是什么?
声明队列、交换机、绑定关系的Bean是什么?
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
RoutingKey
(路由key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
@RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "czy.direct",type = ExchangeTypes.DIRECT), key = {"red","blue"} ) ) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); } @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "czy.direct",type = ExchangeTypes.DIRECT), key = {"red","yellow"} ) ) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到消息:【" + msg + "】" + LocalTime.now()); }
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "czy.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
描述下Direct交换机与Fanout交换机的差异?
Topic类型的交换机与Direct相比,都是根据routingkey把消息路由到不同的队列,只不过topic类型的交换机可以让队列在绑定routindkey的时候使用通配符。
routingkey一般都是由一个或者多个单词组成,多个单词之间以“,”分割,例如:czy.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
解释:
china.#
,因此凡是以 china.
开头的routing key
都会被匹配到。包括china.news和china.weather#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配。包括china.news和japan.news在publisher服务的SpringAmqpTest类中添加测试方法:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "czy.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener( bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "czy.topic",type = ExchangeTypes.TOPIC), key = "#.news" ) ) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); } @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "czy.topic",type = ExchangeTypes.TOPIC), key = "china.#" ) ) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到消息:【" + msg + "】" + LocalTime.now()); }
描述下Direct交换机与Topic交换机的差异?
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
修改消息发送的代码,发送一个Map对象:
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("simple.queue", msg);
}
停止consumer服务
在rabbitmq可视化界面查看消息:
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置消息转换器。
在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
再次查看rabbitmq可视化界面的消息信息:
消息队列在使用过程中,面临着很多实际问题需要思考:
消息从发送,到消息被接收,中间会经历多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
针对这些问题,RabbitMq分别给出了解决方案:
RabbitMq提供了生产者确认机制来避免消息发送到Mq过程中丢失。这种机制必须给每个消息一个唯一的id,消息发送到Mq以后,会返回一个结果给生产者,表示消息是否处理成功。
返回结果有两种形式:
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring:
rabbitmq:
host: 172.20.211.110 # rabbitMQ的ip地址
port: 5672 # 端口
username: root
password: root
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:
simple
:同步等待confirm结果,直到超时correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
修改publisher服务,添加一个:
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有业务需要,可以重发消息 }); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 自定义的数据 * @param ack 是否确认 * @param cause 原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ // 3.1.ack,消息成功 log.debug("消息发送成功, ID:{}", correlationData.getId()); }else{ // 3.2.nack,消息失败 log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), cause); } } }); } @Bean public DirectExchange simpleExchange(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", false, false); } @Bean public Queue simpleQueue(){ return new Queue("simple.queue",false); } @Bean public Binding binding(){ return BindingBuilder.bind(simpleQueue()).to(simpleExchange()).with("simple"); } }
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一会儿,等待ack回执
Thread.sleep(2000);
}
测试
结论:
通过发送确认 和 消息返还机制可以确保消息 一定能够投递到指定的队列中,如果消息没有投递成功 或返还了
也可以通过定时重新投递的方式进行补偿
生产者确认可以确保消息投递到mq的队列中,但是消息发送到mq以后,如果突然宕机,也可能导致消息丢失,要想确保消息在mq中安全保存,必须开启消息持久化机制。
rabbitmq中交换机默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
事实上,默认情况下,由springAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D
的标示:
RabbitMQ中队列如果设置成非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public Queue simpleQueue(){
return new Queue("simple.queue",true);
}
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D
的标示:
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
用java代码指定:
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
@Test public void testSendMessage2SimpleQueue() throws InterruptedException { String routingKey = "simple"; String message = "hello, spring amqp!"; // 自定义数据 CorrelationData data = new CorrelationData(UUID.randomUUID().toString()); // 发送消息 rabbitTemplate.convertAndSend("simple.direct", routingKey, message, new MessagePostProcessor() { // 后置处理消息 @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置消息的持久化方式 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } },data); }
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立即删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
SpringAMQP允许配置三种确认模式:
由此可知:
一般情况下,使用默认的auto模式就行了。
修改consumer服务的application.yml文件,添加下面内容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消费者接收到simple.queue的消息:【{}】", msg);
// 模拟异常
System.out.println(1 / 0);
log.debug("消息处理完成!");
}
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
再次把确认机制修改为auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
怎么办呢?
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
结论:
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试机制之后,重试次数耗尽,如果消息仍然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
如何确保RabbitMQ消息的可靠性?
什么是死信?
如果这个含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机。
队列将死信投递给死信交换机时,必须知道两个信息:
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
在consumer中CommonConfig 修改消息策略
// 修改 失败消息策略
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
// return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
return new RejectAndDontRequeueRecoverer();
}
给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
在producer服务CommonConfig中,定义一组死信交换机、死信队列:
@Bean public Queue simpleQueue(){ return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化 .deadLetterExchange("dl.direct") // 指定死信交换机 .build(); } // 声明死信交换机 dl.direct @Bean public DirectExchange dlExchange(){ return new DirectExchange("dl.direct", true, false); } // 声明存储死信的队列 dl.queue @Bean public Queue dlQueue(){ return new Queue("dl.queue", true); } // 将死信队列 与 死信交换机绑定 @Bean public Binding dlBinding(){ return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl"); }
什么样的消息会成为死信?
死信交换机的使用场景是什么?
一个队列中的消息如果超时未消费,则会变成死信,超时分为两种情况:
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
.build();
}
注意,这个队列设定了死信交换机为dl.ttl.direct
声明交换机,将ttl与交换机绑定:
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
发送消息,但是不要指定TTL:
@Test
public void testTTLQueue() {
// 创建消息
String message = "hello, ttl queue";
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 记录日志
log.debug("发送消息成功");
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。