赞
踩
本笔记是学习该教程记录的:https://www.bilibili.com/video/BV1cb4y1o7zz
什么是MQ
MQ(Message Queue),从字面意义上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。
在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务。
使用了MQ之后,消息的发送上游只需要依赖MQ,不用依赖其他服务
为什么要用MQ
流量消峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作,系统是处理不了的,只能限制订单超过一万后不允许用户下单。
如果使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
应用解耦
以电商应用为例,应用中有订单系统,库存系统,物流系统,支付系统。用户创建订单后,如果耦合调用库存系统,物流系统,支付系统,任何一个子系统出了故障,都会造成下单操作异常。
当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复,在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成,当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
异步处理
有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式:A过一段时间去调用B的查询api查询,或者A提供一个callback api,B执行完之后调用api通知A服务,这两种方式都不是很优雅
使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务,这样A既不用循环调用B的查询api,也不用提供callback api,同样B服务也不用做这些操作,A服务还能及时得到异步处理成功的消息。
MQ的分类
ActiveMQ
优点:
单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点:
官方社区现在对ActiveMQ5.x维护越来越少,高吞吐量场景较少使用。
Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集,传输,存储的过程中发挥着举足轻重的作用,目前已经被LinkedIn,Uber,Twitter,Netfix等大公司所采纳。
优点:性能卓越,单机写入TPS约在百万条/秒,最大的有点,就是吞吐量高,时效性ms级,可用性非常高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且被消费一次,有优秀的第三方Kafka web管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用,功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试,支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢。
RocketMQ
RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进,被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java,我们可以自己阅读源码,定制自己公司的MQ
缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟;社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码。
RabbitMQ
2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一
优点:由于erlang语言的高并发特性,性能较好,吞吐量到万级,MQ功能比较完备,健壮,稳定,易用,跨平台,支持多种语言,如Python,Ruby,.NET,java,jms,c,php,ActionScript,XMPP,STOMP等,支持AJAX,文档齐全,开源提供的管理界面非常棒,用起来很好用,社区活跃度高,更新频率相当高
缺点:商业版需要收费,学习成本较高
MQ的选择
Kafka
Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。
RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RocketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ
RabbitMQ
结合erlang语言本身的并发优势,性能好,时效性微妙级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ
RabbitMQ是一个消息中间件:它接受并转发消息。
生产者(Producer)
产生数据发送消息的程序是生产者
交换机(Exchange)
交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中,交换机必须确切知道如何处理它接收到的消息,是将这些消息推送给特定队列还是推送到多个队列,亦或是把消息丢弃,这个得由交换机类型决定。
队列(Queue)
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,血多消费者可以尝试从一个队列接收数据,这就是我们使用队列的方式。
消费者(Consumer)
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序,请注意生产者,消费者和消费中间件很多时候并不在同一机器上,同一个应用程序既可以是生产者又是可以是消费者。
六大模式:简单模式,工作队列,发布订阅,路由模式,主题模式,发布确认模式
Broker
接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host
出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供服务时,可以划分出多个vhost,每个用户在自己的vhos创建exchange/queue等
Connection
publisher/consumer和broker之间的TCP连接
Channel
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也低,Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。channel作为轻量级的Connection极大减少了操作系统建立TCP Connection的开销。
Exchange
message到大broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去,常用的类型有:direct(point-to-point),topic(publish-subscribe) and fanout(multicast)
Queue
消息最终被送到这里等待consumer取走
Binding
exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
拉取RabbitMQ镜像(带manager是有web界面的)
docker pull rabbitmq:3.9.22-management-alpine
启动一个RabbitMQ容器
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.9.22-management-alpine
映射了两个端口:15672是用于web页面访问的,5672是代码内部生产者和消费者使用的。
登录RabbitMQ的Web查看
默认用户和密码都是 guest
,如果想更改,可以使用 RABBITMQ_DEFAULT_USER
和 RABBITMQ_DEFAULT_PASS
环境变量进行更改
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3.9.22-management-alpine
这里没有更改,还是使用默认用户和密码,访问:http://localhost:15672,可以进入到RabbitMQ登录页面:
输入账户和密码:guest
,进入主页面:
在这一部分内容中,计划用Java/go编写两个程序,发送单个消息的生产者和接收消息并打印出来的消费者。
模型如下图:P代表生产者,C代表消费者,中间的是队列RabbitMQ,代表使用者保留的消息缓冲区
Java版本
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
生产者代码
public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); // channel实现了自动close接口,自动关闭,不需要显示关闭 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { /** * 生成一个队列: String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * 1. queue: 队列名称 * 2. durable: 队列里面的消息是否持久化,默认消息存储在内存中 * 3. exclusive: 该队列是否只提供一个消费者进行消费,是否进行共享,true可以多个消费者消费 * 4. autoDelete: 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除, true为自动删除 * 5. arguments: 其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; /** * 发送一个消息: String exchange, String routingKey, BasicProperties props, byte[] body * 1.exchange: 发送到哪个交换机 * 2.routingKey: 路由的key是哪个 * 3.props: 其他的参数信息 * 4.body: 消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者消息已发送..."); } } }
消费者代码
public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("客户端等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody()); System.out.println("接收到的消息是:" + msg); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断..."); }; /** * 消费者消费消息:String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback * 1. queue: 消费哪个队列 * 2. autoAck: 消费成功之后是否自动应答,true代表自动应答,false代表手动应答 * 3. deliverCallback: 消费者成功消费消息的回调 * 4. cancelCallback: 消费者未成功消费消息的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
测试结果
首先运行生产者,如下图,消息已发送
再到RabbitMQ的Web界面查看,如下图,队列 hello
中有一条消息Ready,总共1条消息
然后启动消费者代码,接收到消息,如下图:
再次回到RabbitMQ的Web界面,可看到,消息已被消费了。
Go版本
生产者
package main import ( "log" amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { // 连接RabbitMQ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建channel channel, err := conn.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() // 声明队列 q, err := channel.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := "Hello World" err = channel.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }, ) failOnError(err, "Failed to publish a message") log.Printf("[x] Send: %s\n", body) }
消费者
package main import ( "log" amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() q, err := channel.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") msgs, err := channel.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") // 使用go协程打印消息 var forever chan struct{} go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行,我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终罪行任务,当有多个工作进程时,这些工作进程将一起处理这些任务。
轮询分发消息
在这个案例中我们会启动两个工作线程,一个消息发送线程。
首先上面的代码中有许多是重复的,我们可以进行代码抽取为工具类。
public class RabbitMqUtils { private static final String HOST = ""; private static final int PORT = 5672; private static final String USERNAME = "guest"; private static final String PASSWORD = "guest"; /** * 获取一个信道Channel * @return * @throws Exception */ public static Channel getChannel() throws Exception { // 创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(PORT); Connection connection = factory.newConnection(); return connection.createChannel(); } }
Java版本
启动两个工作线程
public class Worker01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { String receivedMessage = new String(message.getBody()); System.out.println("Worker01 receive:" + receivedMessage); }; CancelCallback cancelCallback = consumerTag -> { System.out.println(consumerTag + " 消费者取消消费接口回调逻辑"); }; System.out.println("c1 消费者启动等待消费..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
另一个工作线程和Worker01代码完全一样,只是将c1改为c2。
在IDEA中设置同一个main执行多次的方式如下:
启动一个发送线程
public class Task01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ try (Channel channel = RabbitMqUtils.getChannel();) { channel.queueDeclare(QUEUE_NAME, false, false,false, null); // 从控制台当中接收信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息完成: " + message); } } } }
通过控制台输入进行发送消息。
测试结果
通过发送线程发送四个消息,如下图:
查看两个工作进程的消息接收情况:
通过测试可知道,消息发送后,两个工作线程是轮询方式接收消息,你一个我一个这样按顺序的,也叫公平分发所以可能c1先收到aa,然后c2收到bb,然后c1再收到cc,c2最后收到dd;也可能反过来c2先收到aa…
Go版本
封装工具类
package utils import ( "log" amqp "github.com/rabbitmq/amqp091-go" ) var Conn *amqp.Connection func FailOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func init() { var err error Conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/") FailOnError(err, "Failed to connect RabbitMQ") } func GetChannel() *amqp.Channel { channel, err := Conn.Channel() FailOnError(err, "Failed to get a channel") return channel }
工作线程代码
package main import ( "bytes" "log" "rabbitmq-test/utils" "time" ) func main() { ch := utils.GetChannel() defer ch.Close() // 声明队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dotCount := bytes.Count(d.Body, []byte(".")) t := time.Duration(dotCount) time.Sleep(t * time.Second) log.Printf("Done") } }() log.Printf("[*] Waiting for messages.To exit press CTRL+C") <-forever }
发送线程
package main import ( "log" "os" "rabbitmq-test/utils" "strings" "github.com/rabbitmq/amqp091-go" ) func bodyFrom(args []string) string { var s string if (len(args)) < 2 || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s } func main() { body := bodyFrom(os.Args) ch := utils.GetChannel() defer ch.Close() // 声明队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) utils.FailOnError(err, "Failed to declare a queue") err = ch.Publish( "", q.Name, false, false, amqp091.Publishing{ DeliveryMode: amqp091.Persistent, ContentType: "text/plain", Body: []byte(body), }, ) utils.FailOnError(err, "Failed to publish a message") log.Printf(" [x] Send %s", body) }
测试结果
概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了部分突然它挂掉了,会发生什么情况?
RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除,在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它已经无法接收到了。
为了保证消息在发送过程中不丢失,RabbitMQ引入了**消息应答机制**,消息应答就是:**消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了**
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下适用。
消息应答的方法(手动应答)
Channel.basicAck
(用于肯定确认):RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了。
Channel.basicNack
(用于否定确认)
Channel.basicReject
(用于否定确认),与 Channel.basicNack
相比少一个参数
不处理该消息,直接拒绝,可以将其丢弃了
Multiple
的解释
手动应答的好处是可以批量应答并且减少网络拥堵
multiple的true和false代表不同的意思:
true代表批量应答channel上未应答的消息
比如channel上有传送tag的消息5,6,7,8,当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答
false通true相比,只会应答tag=8的消息,而5,6,7这三个消息依然不会被确认收到消息应答
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者,这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
消息手动应答代码
Java代码
生产消息的代码与上一个例子一致
两个工作线程,分别设置Worker01接收消息时处理时间1s,Worker02处理时间较长为30s,模拟一个消费者处理任务比较慢的情形
public class Worker01 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { String receivedMessage = new String(message.getBody()); // 休眠1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Worker01 receive:" + receivedMessage); /** * 手动应答 * 1. 消息标记tag * 2. 是否批量应答未应答消息 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = consumerTag -> { System.out.println(consumerTag + " 消费者取消消费接口回调逻辑"); }; System.out.println("c1 消费者启动等待消费,处理时间较短..."); // 使用手动应答 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); } }
public class Worker02 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ final Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody()); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Worker01 receive:" + msg); // 手动应答消息 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; System.out.println("C2 消费者启动等待消费,处理时间较长。。。"); // 消费消息 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { System.out.println("消费者取消消费接口回调逻辑"); }); } }
开启手动应答的方式就是在消费者消费消息时,参数autoAck设置为false,代码中是:
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
,并且在处理完消息后要手动应答,使用:
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
测试结果
正常情况下,生产者发送4个消息:aa,bb,cc,dd,工作队列是轮询分发消息的,两个工作进程c1和c2应该是分别收到两个消息aa,cc或bb,dd。
但我们在发送者发送完消息dd后,就将c2这个消费者停止掉,正常来说应该是c2来处理这个消息dd,但是由于它处理时间要30s,比较长,还没处理完,就被我们停掉,这时候消息被c1给接收到了,说明消息被重新入队,分配给了能处理消息的c1了。
类似下图效果(打印信息与代码不一致,这图片是教程里拿的)
Go代码
消息生产者与上面的代码一致
这里两个Worker代码类似,一个休眠1s,一个休眠30s
package main import ( "log" "rabbitmq-test/utils" "time" ) func main() { ch := utils.GetChannel() defer ch.Close() // 声明队列 q, err := ch.QueueDeclare( "ack_queue", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) msgs, err := ch.Consume( q.Name, "", false, // auto-ack false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { log.Printf("Worker1 Received a message: %s", d.Body) t := time.Duration(1) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf("[Worker1] Waiting for messages.To exit press CTRL+C") <-forever }
同样的,在消费消息时候将自动应答设置为false,
msgs, err := ch.Consume( q.Name, "", false, // auto-ack false, false, false, nil, )
处理完消息后,手动应答:
d.Ack(false)
,false为不批量应答
测试结果
与Java代码测试一样
概念
刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当RabbitMQ服务停掉以后消息生产者发送过来的消息不丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知她不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化
队列如何实现持久化?
之前我们创建的队列都是非持久化的,rabbitmq如果重启的话,该队列就会被删除掉,如果要队列实现持久化,需要在声明队列的时候把 durable
参数设置为持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
golang代码也一样:
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
但是,需要注意的就是,如果之前声明的队列不是持久化的,需要先将原队列删除,或者重新创建一个持久化的队列,不然就会出现错误。
当我们声明的队列是持久化的,在Web控制页面显示上也会有点不同,如下图,持久化的队列会有个标记 D
消息实现持久化
要想让消息实现持久化需要在消息生产者处修改代码,在发布消息时,添加消息属性:MessageProperties.PERSISTENT_TEXT_PLAIN
在golang中,也是在生产者处修改,在 amqp.Publishing
中添加 amqp.Persistent
,具体如下:
rr = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
将消息标记为持久化并不能完全保证不会丢失消息,尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘,持久性保证并不强,但是对于我们的简单任务队列而言已经绰绰有余了。
如果需要更强有力的持久化策略,需要考虑发布确认模式了。
不公平分发
在最开始的时候我们学习到RabbitMQ分发消息采用的是轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮询分发的话,处理速度快的消费者很大一部分时间会处于空闲状态,而处理慢的消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况,它依然很公平的进行分发。
为了避免这种情况,我们可以设置prefetchCount参数 channel.basicQos(1)
意思就是:如果这个任务我还没有处理完成,或者我还没有应答你,你先别分配新的任务给我,我目前只能处理一个任务,然后RabbitMQ就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的worker或者改变其他存储任务的策略。
预取值
本身消息的发送就是异步发送的,所以在任何时候,channel上肯定不止只有一个消息。另外来自消费者的手动应答本质上也是异步的。因此这里就存在一个**未确认的消息缓冲区**,因此希望开发人员能**限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题**
这时候就可以通过使用`basicQos`方法来设置“预取值”来完成,**该值定义通道上允许的未确认消息的最大数量**,一旦数量达到配置的数量,RabbitMQ将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。
比如,假设在通道上有未确认的消息5,6,7,8,并且通道的预取计数设置为4,此时RabbitMQ将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被ack,比方说tag=6的这个消息刚刚被确认ack,RabbitMQ将会感知这个情况并再发一条消息。
**消息应答和Qos预取值对用户吞吐量有重大影响**。通常,增加预取值将提高向消费者传递消息的速度。**虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗**。
应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同,100到300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为1是最保守的,当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中,对于大多是应用来说,稍微高一点的值将是最佳的。
java中使用的是
basicQos(int prefetchSize, int prefetchCount, boolean global)
方法进行设置,golang中使用如下方式设置:err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global )
- 1
- 2
- 3
- 4
- 5
前面内容我们创建了一个工作队列,我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程),在这一部分中,我们将做一些完全不同的事情,我们将消息传达给多个消费者,这模式成为**发布订阅**
为了说明这种模式,我们将构建一个简单的日志系统,它将又两个程序组成:第一个程序发出日志消息,第二个程序是消费者,其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志将广播给所有消费者。
在开始实现这个简单的日志系统前,我们需要先了解下面的一些知识:
概念
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者都不知道这些消息传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将他们推入队列。交换机必须确切知道如何处理收到的消息,是应该把消息放到特定队列还是说把它们放到许多队列还是说应该丢弃它们,这就得又交换机的类型来决定。
Exchange类型
总共有以下类型:直接(direct),主题(topic),标题(headers),扇出(fanout)
我们这个简单的日志系统使用的就是fanout交换机。
无名exchange
在前面部分总我们对exchange一无所知,但仍然能够将消息发送到队列,其原因是我们使用的是默认交换机,通过空字符串进行标识。
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数是交换机的名称,空字符串表示默认或无名称交换机;
消息能路由发送到队列中其实是由routingkey(bindingkey)绑定key指定的(前提它存在)
前面我们使用的是具有特定名称的队列(比如hello,ack_queue),队列的名称对我们来说至关重要,我们需要指定我们的消费者去消费哪个队列的消息。
但在我们这个简单的log日志系统中,并不心队列名称,我们希望获取所有log消息,而且我们希望获取最新的消息而不是旧的消息,所以有两个事情需要我们解决:
每当我们连接到了RabbitMQ,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。
其次一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式:String queueName = channel.queueDeclare().getQueue();
golang中创建临时队列则是在声明队列时候,将队列名称设置为空字符串
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
创建出来之后长这样:
什么是binding绑定呢?binding其实是exchange和queue之间的桥梁,它告诉我们exchange和哪个队列进行了绑定,比如说下面这张图告诉我们的就是交换机X和队列Q1,Q2进行了绑定。
Fanout介绍
Fanout这种类型非常简单,它是将接收到的所有消息广播到它知道的所有队列中。
简单日志系统实战
主要框架图如下:
java代码
EmitLog:发送消息给两个消费者接收
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { /** * 声明一个exchange * 1. exchange的名称 * 2. exchange的类型 */ // channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String message = sc.nextLine(); // routingkey为空,不需要指定,因为fanout是广播 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息:" + message); } } } }
ReceiveLogs01:将接收到的消息打印在控制台
public class ReceiveLogs01 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); /** * 生成一个临时队列 */ String queueName = channel.queueDeclare().getQueue(); // 队列绑定交换机 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息,把接收到的消息打印在屏幕....."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("控制台打印接收到的消息:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
ReceiveLogs02:将接收到的消息存储在磁盘文件
public class ReceiveLogs02 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); /** * 生成一个临时队列 */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息,把接收到的消息写到文件...."); DeliverCallback deliverCallback = (consumerTag, message) -> { File file = new File("F:\\rabbitmq_info.txt"); BufferedWriter bos = new BufferedWriter(new FileWriter(file, true)); bos.write(new String(message.getBody(), StandardCharsets.UTF_8)); bos.newLine(); System.out.println("数据写入文件成功"); bos.close(); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
fanout模式为广播,如果没有队列绑定到fanout类型交换机,消息是会被丢失的。
但在这个日志系统,这个问题对我们来说是可以接受的。
Go代码
emit_log
package main import ( "fmt" "rabbitmq-test/utils" "github.com/rabbitmq/amqp091-go" ) func main() { channel := utils.GetChannel() // 声明交换机 err := channel.ExchangeDeclare( "logs", "fanout", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") var message string for { n, _ := fmt.Scanln(&message) if n > 0 { err = channel.Publish( "logs", "", false, false, amqp091.Publishing{ ContentType: "text/plain", Body: []byte(message), }, ) utils.FailOnError(err, "Failed to publish a message") fmt.Println("生产者发出消息:", message) } } }
receive_logs
package main import ( "fmt" "rabbitmq-test/utils" ) func main() { channel := utils.GetChannel() err := channel.ExchangeDeclare( "logs", "fanout", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") // 声明队列,用来接收广播消息 queue, err := channel.QueueDeclare( "", false, false, true, false, nil, ) utils.FailOnError(err, "Failed to declare a queue") // 绑定队列 err = channel.QueueBind( queue.Name, "", "logs", false, nil, ) utils.FailOnError(err, "Failed to bind a queue") msgs, err := channel.Consume( queue.Name, "", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { fmt.Println("接收到消息:", string(d.Body)) } }() fmt.Println("等待接收消息,把接收到的消息打印在屏幕.....") <-forever }
在上一节中,我们构建了一个简单的日志记录系统,我们能够向许多接收者广播日志之消息。
在这节我们将向其中添加一些特别的功能,比如说我们只让某个消费者订阅发布到的部分消息,例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
我们再次来回顾下什么是bindings,绑定是交换机和队列之间的桥梁,也可以这么理解:**队列只对它绑定的交换机的消息感兴趣**。绑定用参数 `routingKey`来表示,也可以称该参数为 `binding key`
binding key取决于交换机的类型,比如上节用到的`fanout`交换机就不需要`binding key,它会忽略这个参数。
上一节中我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errors),而不存储那些警告(warning)或信息(info)日志消息避免浪费磁盘空间。
Fanout这种交换机类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用`direct`这种类型的交换机来进行替换,这种类型的工作方式是:消息只去到它绑定的routingkey队列中去。
在上面这张图中,我们可以看到direct类型的交换机X绑定了两个队列,队列Q1的绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green。
在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定建为black或green的消息会被发布到队列Q2,其他类型的消息将被丢弃。
当然如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct,但是它表现的就和fanout有点类似了,就跟广播差不多,如上图所示。
java代码
生产者
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 创建多个bindingKey Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info", "普通info信息"); bindingKeyMap.put("warning", "警告warning信息"); bindingKeyMap.put("error", "错误error信息"); // debug没有消费者接收这个消息,所有就丢失了 bindingKeyMap.put("debug", "调试debug信息"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); // 发布消息 channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息:" + message); } } } }
消费者01:接收error消息,存储到磁盘
public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { final Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 队列名称 String queueName = "disk"; // 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); msg = "接收绑定键:" + message.getEnvelope().getRoutingKey() + ",消息:" + msg; File file = new File("F:\\error.txt"); BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file, true)); bufferedWriter.write(msg); bufferedWriter.newLine(); bufferedWriter.close(); System.out.println("错误日志已经接收。"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
消费者02:接收warning和info消息,打印在控制台
public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { final Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "console"; channel.queueDeclare(queueName,false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody()); System.out.println("接收绑定键:" + message.getEnvelope().getRoutingKey() + ",消息:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
go代码
生产者
package main import ( "fmt" "rabbitmq-test/utils" "github.com/rabbitmq/amqp091-go" ) func main() { channel := utils.GetChannel() // 声明交换机 err := channel.ExchangeDeclare( "direct_logs", "direct", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") var msg string var routingKey string for { n, _ := fmt.Scanln(&routingKey, &msg) if n > 0 { err = channel.Publish( "direct_logs", routingKey, false, false, amqp091.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) utils.FailOnError(err, "Failed to publish a message") } fmt.Printf("生产者发送了key为:%s,msg为:%s\n", routingKey, msg) } }
消费者
package main import ( "fmt" "os" "rabbitmq-test/utils" ) func main() { channel := utils.GetChannel() err := channel.ExchangeDeclare( "direct_logs", "direct", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") q, err := channel.QueueDeclare( "", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a queue") // 通过命令行参数指定routingKey if len(os.Args) < 2 { fmt.Printf("Usage: %s [info] [warning] [error]\n", os.Args[0]) os.Exit(0) } // 绑定队列与交换机 for _, s := range os.Args[1:] { fmt.Printf("Binding queue %s to exchange %s with routing key %s\n", q.Name, "direct_logs", s) err = channel.QueueBind( q.Name, s, "direct_logs", false, nil, ) utils.FailOnError(err, "Failed to bind a queue") } // 消费消息 msgs, err := channel.Consume( q.Name, "", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { fmt.Printf("消费者接收到key为:%s的消息:%s\n", d.RoutingKey, d.Body) } }() <-forever }
之前类型的问题:
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的fanout交换机,而是使用了direct交换机,从而能实现有选择项地接收日志。
尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性,比方说我们想接收的日志类型有info.base和info.advantage,某个队列只想要info.base的消息,那这个时候direct就办不到了,这个时候就只能使用**topic**类型。
发送到类型是topic的交换机的消息的routing key不能随意写,必须满足一定的要求,它**必须是一个单词列表,以点号分隔开。**这些单词可以是任意单词,比如说:“stock.usd.nyse","nyse.vmw","quick.orange.rabbit"这种类型的,当然这个单词列表不能超过255个字节。
在这个规则列表中,其中有两个替换符是需要注意的:
上图绑定关系如下:
Q1 -->绑定的是:中间带orange,带三个单词的字符串(*.orange.*)
Q2 -->绑定的是:
以下一些例子看看匹配情况是怎样的:
quick.orange.rabbit
:被队列Q1Q2接收到lazy.orange.elephant
:被队列Q1Q2接收到quick.orange.fox
:被队列Q1接收到lazy.brown.fox
:被队列Q2接收到lazy.pink.rabbit
:被队列Q2接收到(虽然两个都满足Q2的绑定关系,但只会被Q2接收一次)quick.brown.fox
:没有队列接收到,会被丢弃quick.orange.male.rabbit
:没有队列接收到lazy.orange.male.rabbit
:被队列Q2接收到注意:
topic交换机功能很强大,它可以表现的跟其他交换机一样:
Java代码
生产者
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); /** * Q1绑定的是: *.orange.* * Q2绑定的是: *.*.rabbit,lazy.# */ Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到"); bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到"); bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到"); bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到"); bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只会被队列Q2接收一次"); bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2"); for (Map.Entry<String, String> entry : bindingKeyMap.entrySet()) { String bindingKey = entry.getKey(); String message = entry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息:" + message); } } } }
消费者
public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("接收队列:" + queueName + ",绑定键:" + message.getEnvelope().getRoutingKey() + ",消息:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("接收队列:" + queueName + ",绑定键:" + message.getEnvelope().getRoutingKey() + ",消息:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
Go代码
生产者
package main import ( "fmt" "rabbitmq-test/utils" "github.com/rabbitmq/amqp091-go" ) func main() { channel := utils.GetChannel() // 声明交换机 err := channel.ExchangeDeclare( "topic_logs", "topic", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") bindingKeyMaps := make(map[string]string) bindingKeyMaps["quick.orange.rabbit"] = "被队列Q1Q2接收到" bindingKeyMaps["lazy.orange.elephant"] = "被队列Q1Q2接收到" bindingKeyMaps["quick.orange.fox"] = "被队列Q1接收到" bindingKeyMaps["lazy.brown.fox"] = "被队列Q2接收到" bindingKeyMaps["lazy.pink.rabbit"] = "虽然满足两个绑定但只会被队列Q2接收一次" bindingKeyMaps["quick.brown.fox"] = "不匹配任何绑定不会被任何队列接收到会被丢弃" bindingKeyMaps["quick.orange.male.rabbit"] = "是四个单词不匹配任何绑定会被丢弃" bindingKeyMaps["lazy.orange.male.rabbit"] = "是四个单词但匹配Q2" for routingKey, msg := range bindingKeyMaps { err = channel.Publish( "topic_logs", routingKey, false, false, amqp091.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) utils.FailOnError(err, "Failed to publish a message") fmt.Println("生产者发送了消息: ", msg) } }
消费者
package main import ( "fmt" "os" "rabbitmq-test/utils" ) func main() { channel := utils.GetChannel() err := channel.ExchangeDeclare( "topic_logs", "topic", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") if len(os.Args) < 3 { fmt.Printf("Usage: %s [queueName] [binding_key]...\n", os.Args[0]) os.Exit(0) } queue, err := channel.QueueDeclare( os.Args[1], false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a queue") for _, s := range os.Args[2:] { fmt.Printf("Binding queue %s to exchange %s with routing key %s\n", queue.Name, "topic_logs", s) err = channel.QueueBind( queue.Name, s, "topic_logs", false, nil, ) utils.FailOnError(err, "Failed to bind a queue") } msgs, err := channel.Consume( queue.Name, "", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { fmt.Printf("接收到routingkey: %s, 消息:%s\n", d.RoutingKey, d.Body) } }() fmt.Println("等待接收消息....") <-forever }
生产者将信道设置成`confirm`模式,一旦信道进入confirm模式,**所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始)**,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中 `deliver-tag`域包含了确认消息的序列号,此外broker也可以设置 `basic.ack`的 `multiple`域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
开启发布确认的方法
发布确认默认是没有开启的,如果要开启需要调用方法:confirmSelect
,每当你想要使用发布确认,都需要在channel上调用该方法。
channel.confirmSelect();
单个确认发布
这是一种简单的确认方式,它是一种**同步确认发布**的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,`waitForConfirmsOrDie(long)`这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:**发布速度特别的慢**,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
public class Producer { private static final int MESSAGE_COUNT = 100; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { String queueName = "Hello"; channel.queueDeclare(queueName, false, false, false, null); // 开启发布确认 channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); // 服务端返回false或超时时间内未返回,生产者可以消息重发 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms"); } } }
批量确认发布
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息,然后一起确认可以极大的提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
public class Producer { private static final int MESSAGE_COUNT = 100; public static void main(String[] args) throws Exception { publishMessageBatch(); } public static void publishMessageBatch() throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { String queueName = "Hello"; channel.queueDeclare(queueName, false, false, false, null); // 开启发布确认 channel.confirmSelect(); // 批量确认消息大小 int batchSize = 30; // 未确认消息个数 int outstandingMessageCount = 0; long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.waitForConfirms(); outstandingMessageCount = 0; } } // 为了确保还有剩余没有确认消息,再次确认 if (outstandingMessageCount > 0 ) { channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms"); } } }
异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。
public class Producer { private static final int MESSAGE_COUNT = 100; public static void main(String[] args) throws Exception { publishMessageAsync(); } public static void publishMessageAsync() throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { String queueName = "Hello"; channel.queueDeclare(queueName, false, false, false, null); // 开启发布确认 channel.confirmSelect(); /** * 线程安全有序的一个哈希表,适用于高并发情况 * 1. 轻松的将序号和消息进行关联 * 2. 轻松批量删除条目,只要给到序列号 * 3. 支持并发访问 */ ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); /** * 确认收到消息的一个回调 * 1. 消息序列号 * 2. true 可以确认小于等于当前序列号的消息 * false 确认当前序列号消息 */ ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { // 返回的是小于等于当前序列号的未确认消息,是一个map ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true); // 清除该部分未确认消息 confirmed.clear(); } else { // 只清除当前序列号的消息 outstandingConfirms.remove(deliveryTag); } }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("发布的消息:" + message + "未被确认,序列号:" + deliveryTag); }; /** * 添加一个异步确认的监听器 * 1. 确认收到消息的回调 * 2. 未收到消息的回调 */ channel.addConfirmListener(ackCallback, nackCallback); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i; /** * channel.getNextPublishSeqNo()获取下一个消息的序列号 * 通过序列号与消息体进行一个关联 */ outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms"); } } }
最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用`ConcurrentLinkedQueue`,这个队列在confirm,callbacks与发布线程之间进行消息的传递。
单独发布消息
同步等待确认,简单,但吞吐量非常有限
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难推断出是哪条消息出现了问题
异步处理
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
发布100个消息运行结果:
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
代码框架图
消息TTL过期
生产者代码
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception{ try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); // 设置消息的TTL时间 AMQP.BasicProperties pro = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", pro, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息: " + message); } } } }
生产者正常往Exchange中发送消息,设置了消息的过期时间为10s
消费者C1
public class Consumer01 { // 普通交换机名字 private static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交换机名字 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信队列与死信交换机绑定 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); // 正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); // 正常队列设置死信交换机,参数key是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信 routing key params.put("x-dead-letter-routing-key", "lisi"); // 绑定正常队列与正常交换机 String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("Consumer01接收到消息:" + msg); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {}); } }
消费者C1做的事情稍微多一些,首先它是个正常消费者,正常接收消息,但在这里增加了死信队列操作,在声明队列时候,将死信队列和死信交换机作为参数设置进去,当消费者C1无法消费消息时,就会转到死信队列去处理。
消费者C2
public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false,false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信队列消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("Consumer02接收死信队列的消息:" + msg); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {}); } }
消费者C2,就是一个普通的消费者,只不过他消费的是死信队列的消息,只有在正常消费者无法消费时候,它才有机会去消费消息。
验证步骤
先启动消费者C1,然后关闭,模拟它无法消费消息
此时的普通队列和死信队列已经创建了,生产者还没发消息,都为0
启动生产者,生产者会发送10条消息,此时正常队列中有10条未消费的消息
10s后,由于正常队列里的消息没有消费,超时,消息就进入到了死信队列
这时候启动消费者C2,它会消费掉死信队列里的消息
go代码
生产者
func main() { channel := utils.GetChannel() defer channel.Close() // 声明死信交换机 err := channel.ExchangeDeclare( "dead_exchange", "direct", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") // 声明死信队列 queue, err := channel.QueueDeclare( "dead-queue", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare queue") // 绑定队列 err = channel.QueueBind( queue.Name, "lisi", "dead_exchange", false, nil, ) utils.FailOnError(err, "Failed to binding queue") fmt.Println("消费者02等待接收死信队列消息....") msgs, err := channel.Consume( queue.Name, "", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan bool go func() { for d := range msgs { fmt.Printf("消费者02接收死信队列的消息:%s\n", d.Body) } }() <-forever }
消费者C1
func main() { channel := utils.GetChannel() defer channel.Close() // 声明交换机 err := channel.ExchangeDeclare( normal_exchange, "direct", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange: "+normal_exchange) err = channel.ExchangeDeclare( dead_exchange, "direct", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange: "+dead_exchange) // 声明死信队列 deadQueue, err := channel.QueueDeclare( "dead-queue", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a dead queue") // 绑定死信队列和死信交换机 err = channel.QueueBind( deadQueue.Name, "lisi", dead_exchange, false, nil, ) utils.FailOnError(err, "Failed to bind dead queue") // 声明正常队列 normalQueue, err := channel.QueueDeclare( "normal-queue", false, false, false, false, amqp091.Table{ "x-dead-letter-exchange": dead_exchange, // 指定死信交换机 "x-dead-letter-routing-key": "lisi", // 指定死信routing key }, ) utils.FailOnError(err, "Failed to declare normal queue") // 绑定正常队列和正常交换机 err = channel.QueueBind( normalQueue.Name, "zhangsan", normal_exchange, false, nil, ) utils.FailOnError(err, "Failed to bind normal queue") fmt.Println("消费者01等待接收消息....") msgs, err := channel.Consume( normalQueue.Name, "", true, false, false, false, nil, ) var forever chan bool go func() { for d := range msgs { fmt.Printf("消费者01接收到的消息: %s\n", d.Body) } }() <-forever }
消费者C2
func main() { channel := utils.GetChannel() defer channel.Close() // 声明死信交换机 err := channel.ExchangeDeclare( "dead_exchange", "direct", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare an exchange") // 声明死信队列 queue, err := channel.QueueDeclare( "dead-queue", false, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare queue") // 绑定队列 err = channel.QueueBind( queue.Name, "lisi", "dead_exchange", false, nil, ) utils.FailOnError(err, "Failed to binding queue") fmt.Println("消费者02等待接收死信队列消息....") msgs, err := channel.Consume( queue.Name, "", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to register a consumer") var forever chan bool go func() { for d := range msgs { fmt.Printf("消费者02接收死信队列的消息:%s\n", d.Body) } }() <-forever }
队列达到最大长度
生产者代码:去掉TTL属性
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception{ try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); // 设置消息的TTL时间 // AMQP.BasicProperties pro = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息: " + message); } } } }
消费者C1:增加队列长度属性
public class Consumer01 { // 普通交换机名字 private static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交换机名字 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信队列与死信交换机绑定 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); // 正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); // 正常队列设置死信交换机,参数key是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信 routing key params.put("x-dead-letter-routing-key", "lisi"); // 正常队列长度的限制 params.put("x-max-length", 6); // 绑定正常队列与正常交换机 String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("Consumer01接收到消息:" + msg); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {}); } }
注意此时要先把原先的队列删除了,因为参数发生了改变
消费者C2代码不变
验证步骤
先删除两个队列,因为队列参数变了
先启动消费者C1,然后关闭,模拟接收不到消息
随后启动消费者C2
启动生产者,发送10条消息,然后会有4条消息在死信队列,被消费者C2消费
go代码
生产者代码,发布消息时,将TTL去掉
// 发布消息 for i := 1; i < 11; i++ { msg := "info" + strconv.Itoa(i) err = channel.Publish( "normal_exchange", "zhangsan", false, false, amqp091.Publishing{ ContentType: "text/plain", // Expiration: "10000", Body: []byte(msg), }, ) utils.FailOnError(err, "Failed to publish a message") fmt.Printf("生产者发送消息:%s\n", msg) }
消费者01,设置队列最大长度
// 声明正常队列
normalQueue, err := channel.QueueDeclare(
"normal-queue",
false,
false,
false,
false,
amqp091.Table{
"x-dead-letter-exchange": dead_exchange, // 指定死信交换机
"x-dead-letter-routing-key": "lisi", // 指定死信routing key
"x-max-length": 6,
},
)
消费者02代码不变
消息被拒
生产者代码同上生产者一致
消费者C1
public class Consumer01 { // 普通交换机名字 private static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交换机名字 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信队列与死信交换机绑定 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); // 正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); // 正常队列设置死信交换机,参数key是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信 routing key params.put("x-dead-letter-routing-key", "lisi"); // 正常队列长度的限制 // params.put("x-max-length", 6); // 绑定正常队列与正常交换机 String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); // 模拟消费者拒绝消息 if (msg.equals("info5")) { System.out.println("消费者C1接收到消息:" + msg + ",并且拒绝签收该消息"); // requeue设置为false,代表拒绝重新入队,该队列如果配置了死信队列将发送到死信队列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer01接收到消息:" + msg); // 正常应答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {}); } }
改为手动应答,在第五条消息时候拒绝
消费者C2代码不变
测试结果
go代码
生产者代码不变
消费者C1,将应答改为手动应答,并在第五条消息时候拒绝
// 声明正常队列 normalQueue, err := channel.QueueDeclare( "normal-queue", false, false, false, false, amqp091.Table{ "x-dead-letter-exchange": dead_exchange, // 指定死信交换机 "x-dead-letter-routing-key": "lisi", // 指定死信routing key }, ) utils.FailOnError(err, "Failed to declare normal queue") msgs, err := channel.Consume( normalQueue.Name, "", false, // 自动应答改为false false, false, false, nil, ) var forever chan bool go func() { for d := range msgs { if string(d.Body) == "info5" { fmt.Printf("消费者01接收到的消息: %s,并且拒绝签收该消息\n", d.Body) channel.Reject(d.DeliveryTag, false) } else { fmt.Printf("消费者01接收到的消息: %s\n", d.Body) // 正常应答 channel.Ack(d.DeliveryTag, false) } } }() <-forever
消费者02代码不用改
延迟队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了之后或之前取出和处理,**简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。**
订单在十分钟之内未支付则自动取消。
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
用户注册成功后,如果三天内没有登录则进行短信提醒。
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或之前的指定事件点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的要求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案,但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭”,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
TTL(Time-To-Live)是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用,有两种方式设置TTL:
第一种方式是在创建队列的时候设置队列的 x-message-ttl
属性
props.put("x-message-ttl", 5000);
channel.queueDeclare(normalQueue, false, false, false, params);
另一种方式便是针对每条消息设置TTL(在生产者中设置)
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列则被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为**消息是否过期是在即将投递到消费者之前判定的**,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;
另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
前一小节我们介绍了死信队列,刚刚又介绍了TTL,至此利用RabbitMQ实现延时队列的两大要素已经集齐,接下来只需要将他们进行融合,再加入一点点调味料,延迟队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。
创建两个队列 QA
和 QB
,两者队列TTL分别设置为10s和40s,然后再创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列 QD
,他们的绑定关系如上图。
这次实战我们使用Springboot实现,先创建好Springboot工程
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
修改配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
编写RabbitMQ配置类
@Configuration public class RabbitMQConfig { // 普通交换机和普通队列 public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; // 死信交换机和死信队列 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; // 声明交换机 @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } // 声明队列A @Bean("queueA") public Queue queueA() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", "YD"); // 声明队列的TTL,10s args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 绑定队列A和交换机X @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } // 声明队列B @Bean("queueB") public Queue queueB() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", "YD"); // 声明队列的TTL,40s args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } // 绑定队列B和交换机X @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } // 声明死信队列QD @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } // 绑定死信队列QD和死信交换机 @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
消息生产者代码
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
System.out.printf("当前时间:%s,发送一条信息给两个TTL队列:%s\n", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
}
消息消费者代码
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.printf("当前时间:%s,收到死信队列消息:%s\n", new Date().toString(), msg);
}
}
测试
在浏览器中发起一个请求:http://localhost:8080/ttl/sendMsg/哈哈哈,执行结果如下图:
第一条消息在10s后就变成了死信消息了,然后被消费者消费掉,第二条消息在40s后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10s和40s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
在前面的基础上新增一个队列QC,绑定关系如下图,该队列不设置TTL时间
配置类代码
public static final String QUEUE_C = "QC"; // 声明队列C @Bean("queueC") public Queue queueC() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } // 绑定队列C和交换机X @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }
消息生产者
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, message1 -> {
message1.getMessageProperties().setExpiration(ttlTime);
return message1;
});
System.out.printf("当前时间:%s,发送一条时长%s毫秒TTL信息给队列C:%s\n", new Date().toString(), ttlTime, message);
}
其他代码与上面的一样
测试
发起两个请求,第一个请求消息过期时间为20s,第二个请求消息过期时间为2s
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好2/2000
运行结果如下图:
按我们的预想,应该是2s的消息先过期,会先执行,但是现象却是第一条消息还没过期,后面的也没有过期。
这是因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这样会造成消息不会及时地过期淘汰,导致消息的堆积。
为了解决上面提到的问题,如果不能实现在消息粒度上的TTL,并使其在设置的TTL时间内及时死亡,就无法设计成一个通用的延时队列了,那如何解决呢?
**解决方案:使用RabbitMQ插件:`rabbitmq_delayed_message_exchange`**
根据rabbitmq版本号下载 rabbitmq_delayed_message_exchange
插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件
因为我们是用Docker运行的Rabbitmq,可以先进入docker容器,查看是否有这个插件
F:\>docker exec -it rabbitmq /bin/bash
bash-5.1# pwd
/
bash-5.1# cd plugins
bash-5.1# ls -l | grep delay
bash-5.1# exit
将下载的插件拷贝到F盘,然后通过 docker cp
命令拷贝到rabbitmq容器内
F:\>docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
F:\>docker exec -it rabbitmq /bin/bash
bash-5.1# cd plugins
bash-5.1# ls -l | grep delay
-rwxr-xr-x 1 root root 36358 Aug 26 07:07 rabbitmq_delayed_message_exchange-3.9.0.ez
bash-5.1#
进入RabbitMQ容器,启用插件
F:\>docker exec -it rabbitmq /bin/bash bash-5.1# cd plugins bash-5.1# rabbitmq-plugins enable rabbitmq_delayed_message_exchange Enabling plugins on node rabbit@d4a628333fa5: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_prometheus rabbitmq_web_dispatch Applying plugin configuration to rabbit@d4a628333fa5... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins. bash-5.1# exit exit
退出容器,重新启动RabbitMQ容器
F:\>docker restart rabbitmq
rabbitmq
验证插件是否成功安装
打开web页面,在交换机类型一栏可以看到如下,多了一个 x-delayed-message
类型
未添加插件之前
添加延迟插件之后
在这里新增了一个队列 delayed.queue
,一个自定义交换机 delayed.exchange
,绑定关系如上图
配置文件类代码
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; // 声明队列 @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } // 自定义交换机,这里定义的是一个延迟交换机 @Bean public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); // 自定义交换机类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } // 绑定队列与交换机 @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
生产者代码
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delayTime);
return message;
}
});
System.out.printf("当前时间:%s,发送一条延时%d毫秒TTL信息给队列delayed.queue:%s\n", new Date().toString(), delayTime, message);
}
消费者代码
@RabbitListener(queues = "delayed.queue")
public void receiveDelayedQueue(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.printf("当前时间:%s,收到延迟队列消息:%s\n", new Date().toString(), msg);
}
测试
依旧是发送两条消息,第一条delay时长20s,第二条delay时长2s:
http://localhost:8080/ttl/sendDelayMsg/Hello1/20000
http://localhost:8080/ttl/sendDelayMsg/Hello2/2000
执行结果如下:
第二条延时较短的消息先被消费掉了,符合预期。
Go代码
生产者
func main() { channel := utils.GetChannel() defer channel.Close() // 声明交换机时类型为:x-delayed-message,参数传递"x-delayed-type": "direct" err := channel.ExchangeDeclare( "delayed.exchange", "x-delayed-message", true, false, false, false, amqp091.Table{ "x-delayed-type": "direct", }, ) utils.FailOnError(err, "Failed to declare an exchange") queue, err := channel.QueueDeclare( "delayed.queue", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a queue") err = channel.QueueBind( queue.Name, "delayed.routingkey", "delayed.exchange", false, nil, ) utils.FailOnError(err, "Failed to bind queue and exchange") // 发布消息,header带上参数“x-delay“ err = channel.Publish( "delayed.exchange", "delayed.routingkey", false, false, amqp091.Publishing{ ContentType: "text/plain", Body: []byte("Hello"), Headers: map[string]interface{}{ "x-delay": os.Args[1], }, }, ) utils.FailOnError(err, "Failed to publish message") fmt.Printf("当前时间:%s, 发送一条延迟%s毫秒的TTL消息", time.Now(), os.Args[1]) }
消费者
func main() { channel := utils.GetChannel() defer channel.Close() // 声明正常队列 normalQueue, err := channel.QueueDeclare( "delayed.queue", true, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare normal queue") fmt.Println("消费者等待接收消息....") msgs, err := channel.Consume( normalQueue.Name, "", true, false, false, false, nil, ) var forever chan bool go func() { for d := range msgs { fmt.Printf("当前时间:%s,消费者接收到的消息: %s\n", time.Now(), d.Body) } }() <-forever }
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠传送,消息可靠投递,死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其他选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景。
确认机制方案
代码架构图
代码实战
配置文件
需要添加 spring.rabbitmq.publisher-confirm-type = correlated
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
添加配置类
@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @Bean public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1"); } }
生产者(需要设置回调接口)
@RestController @RequestMapping("/confirm") public class ConfirmProducer { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; // 在对象加载完依赖注入后执行 @PostConstruct public void init() { // 依赖注入rabbitTemplate后再设置它的回调对象 rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { // 指定消息id为1 CorrelationData correlationData = new CorrelationData("1"); String routingkey = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingkey, message+routingkey, correlationData); CorrelationData correlationData2 = new CorrelationData("2"); routingkey = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingkey, message+routingkey, correlationData2); System.out.printf("发送消息内容:%s\n", message); } }
回调接口
@Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机是否收到消息 的回调方法 * @param correlationData 消息相关数据 * @param ack 交换机是否收到消息 * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { System.out.printf("交换机已经收到id为%s的消息\n", id); } else { System.out.printf("交换机未收到id为%s的消息,原因是:%s\n", id, cause); } } }
消费者
@Component
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
System.out.println("接收到队列confirm.queue的消息:" + msg);
}
}
测试结果
还是在浏览器中输入:http://localhost:8080/confirm/sendMessage/hello
,终端输出如下:
可以看到,发送了两条消息,第一条消息routingkey为"key1",第二条消息routingkey为"key2",可以看到两条消息都被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,那是因为第二条消息的routingkey与队列的绑定key不一致,也没有其他队列能接收这个消息,所以第二条消息被直接丢弃了。
Mandatory参数
在仅开启了生产者确认机制的情况下,交换机收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理。
**通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者**
代码实战
生产者,增加Mandatory的设置
@RestController @RequestMapping("/confirm") public class ConfirmProducer { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; // 在对象加载完依赖注入后执行 @PostConstruct public void init() { // 依赖注入rabbitTemplate后再设置它的回调对象 rabbitTemplate.setConfirmCallback(myCallBack); /** * true: 交换机无法将消息路由时,会将该消息返回给生产者 * false: 交换机无法将消息路由时,直接丢弃消息 */ rabbitTemplate.setMandatory(true); // 设置回退消息交给谁处理 rabbitTemplate.setReturnsCallback(myCallBack); } @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { // 指定消息id为1 CorrelationData correlationData = new CorrelationData("1"); String routingkey = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingkey, message+routingkey, correlationData); System.out.printf("发送消息id: %s, 内容:%s\n", correlationData.getId(), message+routingkey); CorrelationData correlationData2 = new CorrelationData("2"); routingkey = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingkey, message+routingkey, correlationData2); System.out.printf("发送消息id: %s, 内容:%s\n", correlationData2.getId(), message+routingkey); } }
回调接口,增加回退消息的处理
@Component public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * 交换机是否收到消息 的回调方法 * @param correlationData 消息相关数据 * @param ack 交换机是否收到消息 * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { System.out.printf("交换机已经收到id为%s的消息\n", id); } else { System.out.printf("交换机未收到id为%s的消息,原因是:%s\n", id, cause); } } @Override public void returnedMessage(ReturnedMessage returned) { System.out.printf("消息:%s被服务器退回,退回原因:%s,交换机是:%s,路由key:%s\n", new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey() ); } }
消费者代码不变
测试结果
还是在浏览器中输入:http://localhost:8080/confirm/sendMessage/hello
,终端输出如下:
可以看到,没有被路由到的消息给回退回来了。
原理
有了mandatory参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理,但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多态机器的时候,手动复制日志会更加麻烦而且容易出错,而且设置mandatory参数会增加生产者的复杂性,该怎么做呢?
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在RabbitMQ中,有一种备份交换机的机制存在,可以很好的应对这个问题,什么是备份交换机呢?备份交换机可以理解为RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由的消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了,当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
代码架构图
代码实战
修改配置类,增加备份交换机,备份队列,警告队列,以及设置确认交换机与备份交换机关系
@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; // 声明备份交换机 @Bean public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean public DirectExchange confirmExchange() { ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return exchangeBuilder.build(); } @Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1"); } @Bean public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding backupBinding(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); } }
增加报警消费者
@Component public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning.queue"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message) { String msg = new String(message.getBody()); System.out.println("报警发现不可路由消息:" + msg); } @RabbitListener(queues = BACKUP_QUEUE_NAME) public void receiveBackupMsg(Message message) { String msg = new String(message.getBody()); System.out.println("备份队列接收到消息:" + msg); } }
其他代码不变
测试结果
可以看出:mandatory参数与备份交换机可以一起使用,当两者同时开启时,备份交换机优先级更高。
概念
用户对于统一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。
消息重复消费
消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断,故MQ未收到确认消息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已经成功消费了该条消息,造成消费者消费了重复的消息。
解决思路
MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单号,消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时该id先判断该消息是否已消费过。
消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能救护重复发送了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。
业界主流的幂等性有两种操作:
唯一ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单,就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
Redis原子性
利用Redis执行setnx命令,天然具有幂等性,从而实现不重复消费
使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款,那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理所当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用redis来存放的定时轮询,大家都知道redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
如何添加
控制台页面添加
队列代码中添加
Map<String, Object> params = new HashMap<>();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
消息中代码添加
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
注意事项
要让队列实现优先级,需要做的事情有如下:
使用场景
RabbitMQ从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线,宕机亦或者由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
两种模式
队列具备两种模式:**default**和**lazy**,默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用 `channel.queueDeclare`方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过`x-queue-mode`参数来设置队列的模式,取值为 `default`和 `lazy`。
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
内存开销对比
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。