赞
踩
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。
(1)流量消峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正 常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分 散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体 验要好。
(2)应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于 消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中途用户感受不到物流系统的故障,提升系统的可用性。
(3)异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可 以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题, A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此 消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不 用做这些操作。A 服务还能及时的得到异步处理成功的消息。
(1)ActiveMQ
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较 低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用
(2)Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥 着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。
优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非 常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采 用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持: 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消 息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
(3)RocketMQ
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场 景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分 布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅 读源码,定制自己公司的 MQ
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
(4)RabbitMQ
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最 主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。
缺点:商业版需要收费,学习成本较高
(1)生产者
产生数据发送消息的程序是生产者
(2)交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
(3)队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
(4)消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
添加开机启动 RabbitMQ 服务 chkconfig rabbitmq-server on
启动mq服务 /sbin/service rabbitmq-server start
查看服务状态 /sbin/service rabbitmq-server status
停止服务 /sbin/service rabbitmq-server stop
开启 web 管理插件 rabbitmq-plugins enable rabbitmq_management
(开启web管理界面方便操作,但linux上默认的guest用户不允许登录,所以要创建新用户并给予权限)
开放防火墙端口5672和15672,如果是云服务器还要开启安全组
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
创建账号 rabbitmqctl add_user admin 123
设置用户角色 rabbitmqctl set_user_tags admin administrator
设置用户权限 set_permissions [-p ]
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
(表示用户 admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 )
查看当前所有用户和角色 rabbitmqctl list_users
关闭应用的命令为 rabbitmqctl stop_app
清除的命令为 rabbitmqctl reset
重新启动命令为 rabbitmqctl start_app
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
public class RabbitMQUtils { //连接工厂 private static final ConnectionFactory factory=new ConnectionFactory(); //连接 private static Connection connection; static { //工厂设置 factory.setHost("47.106.223.84"); factory.setUsername("govd"); factory.setPassword("123456"); //创建连接 try { connection=factory.newConnection(); } catch (Exception e){ System.out.println(e); } } //创建频道 public static Channel getChannel() throws IOException, TimeoutException { Channel channel = connection.createChannel(); return channel; } }
public class Provider { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //获取一个频道,channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Channel channel = RabbitMQUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列是否持久化 ,如果持久化,服务重启后该队列依然存在 * 3.该队列是否只供一个消费者进行消费 是否进行共享 ;true时只允许一个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除; true 自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="hello world"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }
public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("等待接收消息...."); //消息成功交付接口回调 DeliverCallback deliverCallback=(consumerTag,message)->{ String msg = new String(message.getBody()); System.out.println(msg); }; //消费中断接口回调 CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者成功消费的回调 * 4.消费者消费中断的回调 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。(说白了就是多个消费者,以轮询的方式去处理队列中的消息)
public class Provider { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws Exception { Channel channel= RabbitMQUtils.getChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台当中接受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } }
创建两个消费者,代码逻辑都一样,改一下名字即可
public class Work1 { private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback=(consumerTag,message)->{ String msg = new String(message.getBody()); System.out.println("接收到消息:"+msg); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }; System.out.println("C1 消费者启动等待消费......"); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。如果RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 就可以把该消息删除了。
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了;另一方面这种模式下消费者那边可能接收过载的消息,没有对传递的消息数量进行限制, 这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。
手动应答的好处是可以批量应答并且减少网络拥堵,但我们强烈建议不要使用Mutiple,应该一个一个应答,以防消息丢失
multiple 的 true 和 false 代表不同意思
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。(即如果mq没有收到对于一个消息的应答就不会将其从队列中删除,会将其重新入队,交给其他消费者处理)
public class Provider { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws Exception { Channel channel= RabbitMQUtils.getChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台当中接受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } }
public class Work1 { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback=(consumerTag,message)->{ String msg = new String(message.getBody()); // 模拟一个1s的业务 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("接收到消息:"+msg); /** 参数 * 1.消息标记 tag,对应某条消息 * 2.是否批量应答未应答消息 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }; System.out.println("C1 等待接收消息处理时间较短"); // 取消自动应答 channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
public class Work2 { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback=(consumerTag,message)->{ String msg = new String(message.getBody()); //模拟一个30s的业务 try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("接收到消息:"+msg); /** 参数 * 1.消息标记 tag,对应某条消息 * 2.是否批量应答未应答消息 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }; System.out.println("C2 等待接收消息处理时间较长"); // 取消自动应答 channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wxyj6M5X-1651586549149)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220419235736243.png)]
重新入队效果展示:
生产者发送cc,消费者C1处理;生产者发送消息 dd,发出消息之后的把 C2 消费者停掉,导致mq没有收到消息dd的ACK应答,则将该消息重新入队; 将消息交给C1处理。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qyElMizG-1651586549150)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220420000518254.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NJvZJq2s-1651586549151)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220420000536832.png)]
基于消息应答机制,我们可以保证在消费者宕机时不会导致消息丢失。但如果不进行消息的持久化,mq宕机的话会导致消息丢失。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化。
/**
* 生成一个队列
* 1.队列名称
* 2.队列是否持久化 ,如果持久化,服务重启后该队列依然存在
* 3.该队列是否只供一个消费者进行消费 是否进行共享 ;true时只允许一个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除; true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新 创建一个持久化的队列,不然就会出现错误.(可以在web管理界面直接删除队列)
持久化成功效果:
这个时候即使重启 rabbitmq 队列也依然存在。
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
将队列和消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,mq就宕机了,此时并没 有真正写入磁盘。这样的持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要彻底保证持久化,则需要进行发布确认。
RabbitMQ 分发消息默认采用的轮训分发,但是在某种场景下这种策略并不是 很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
Qos(Quality of Service) : 服务质量,用于评估服务方满足客户服务需求的能力。通过配置QoS,对网络流量进行调控,避免并管理网络拥塞。
//预取值
int preFetchCount=1;
channel.basicQos(preFetchCount);
给两个消费者都设置为1的预取值,即两个消费者都维护一个大小为1的窗口,即一次只能接收一个消息,只有对mq发回ACK确认,才能接收下一个消息。
当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加 新的 worker 或者改变其他存储任务的策略。
消费者端存在一个未确认的消息缓冲区,开发人员能限制此 缓冲区的大小,以避免缓冲区里面无限制的存入问题。这个时候就可以通过使用 **channel.basicQos(preFetchCount); 方法设 置“预取值”**来完成。该值定义通道允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,直至一个未处理的消息被确认。
例如,假设在通道上有 未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何 消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知 这个情况到并再发送一条消息。
消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高 向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)。
找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
预取值为 1 是最保守的。当然这 将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,对于大多数应用来说,稍微高一点的值将是最佳的。
不设置预取值的话默认为0,即不设置缓冲区大小,不限制接收的消息数,则mq不断轮询分发消息。
//预取值
int preFetchCount=预取值;
channel.basicQos(preFetchCount);
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布.
waitForConfirms( )这个方法在消息被确认前会阻塞,直至确认消息返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
public class Provider { private static final String QUEUE_NAME="publish_confirm"; public static void main(String[] args) throws Exception { Channel channel= RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //开启发布确认 channel.confirmSelect(); long begin=System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message=i+""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); boolean flag = channel.waitForConfirms(); if(flag){ System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("使用单个发布确认,发布1000条消息,耗时" + (end - begin) + "ms"); } } // 发布1000个单独确认消息,耗时9582ms
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息;而后重新发布这一批消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。
批量发布在网络环境良好的情况下,比单个发布确认的效率要好。
public class Provider { private static final String QUEUE_NAME="publish_confirm"; public static void main(String[] args) throws Exception { Channel channel= RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //开启发布确认 channel.confirmSelect(); //批量确认消息大小 int batchSize = 100; long begin=System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message=i+""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); if(i%batchSize==0){ boolean flag = channel.waitForConfirms(); if(flag){ System.out.println("批量消息发送成功"); } } } long end = System.currentTimeMillis(); System.out.println("使用批量发布确认,发布1000条消息,耗时" + (end - begin) + "ms"); } } // 使用批量发布确认,发布1000条消息,耗时250ms
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都最好。
生产者无需同步等待确认消息之后才能继续发送消息;而是不断的发送消息即可,利用回调函数可以异步的达到消息的可靠性传递。
即每个消息如果接收成功会发回一个ackCallBack(),如果接收失败会发回一个nackCallBack(); 我们只需要将发回nackCallBack()的消息重新发送即可。
public class ProviderAsync { private static final String QUEUE_NAME="publish_confirm"; public static void main(String[] args) throws Exception { Channel channel= RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //开启发布确认 channel.confirmSelect(); /** ConcurrentSkipListMap * 线程安全且key有序的一个哈希表,适用于高并发的情况 * 1.轻松的将序号与消息进行关联 * 2.轻松批量删除条目 只要给到序列号 * 3.支持并发访问 */ ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>(); /** * 确认收到消息的一个回调 * 1.消息序列号 * 2.是否为批确认 true 可以确认小于等于当前序列号的消息 * false 单个确认,只确认当前序列号消息 */ ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { //返回的是map中key小于等于当前序列号的一个截取 ConcurrentNavigableMap<Long, String> confirmed = map.headMap(sequenceNumber, true); //清除该部分确认消息 confirmed.clear(); }else{ //只清除当前序列号的消息 map.remove(sequenceNumber); } }; // 未确认收到的回调 ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = map.get(sequenceNumber); System.out.println("发布的消息"+message+"未被确认,序列号:"+sequenceNumber); }; /** * 添加一个异步确认的监听器 * 1.确认收到消息的回调 * 2.未收到消息的回调 */ channel.addConfirmListener(ackCallback, nackCallback); long begin=System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message=i+""; //(下一个消息的序号,消息体)存入map map.put(channel.getNextPublishSeqNo(),message); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("使用异步发布确认,发布1000条消息,耗时" + (end - begin) + "ms"); } } // 使用异步发布确认,发布1000条消息,耗时54ms
从未确认的回调中重新发布nack-ed消息可能很诱人,但应避免这种情况,因为消息确认的回调方法是在I/O线程中调度的,他不应该用来做业务操作。 更好的解决方案是将消息放入由发布线程轮询的内存队列中。 诸如ConcurrentLinkedQueue之类的类将是在确认回调和发布线程之间传输消息的理想选择。
// 发布1000个单独确认消息,耗时9582ms
// 使用批量发布确认,发布1000条消息,耗时250ms
// 使用异步发布确认,发布1000条消息,耗时54ms
可以发现虽然异步发布确认的逻辑要复杂一些,但效率是最好的
RabbitMQ 消息传递模型的核心思想是: **生产者生产的消息从不会直接发送到队列。**实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。
Exchanges 的类型 :直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
如果我们发送消息时不指定交换机,使用的就是默认交换机。
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey指定的,如果它存在的话
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称 的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连 接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
创建后效果如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MDStPMTh-1651586549161)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422123952413.png)]
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JwhclLvW-1651586549161)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422124055972.png)]
Fanout 这种类型非常简单。正如从名称中表示的那样,它是将接收到的所有消息广播到它知道的 所有队列中。
Fanout是交换机的一种类型,mq默认也自带有一个
借由Fanout类型的交换机,我们可以实现发布订阅模式。
Logs 交换机和临时队列的绑定关系如下图
编写两个消费者,代码逻辑都一样
public class Consumer1 { private static final String EXCHANGE_NAME = "fanout_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //声明一个exchange,类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 生成一个临时队列 String queue = channel.queueDeclare().getQueue(); //把该临时队列绑定我们的 exchange 其中 routingKey(也称之为 binding key)为空字符串 //因为是fanout模式,即广播发布,所以不需要指定routingKey channel.queueBind(queue,EXCHANGE_NAME,""); System.out.println("消费者C1等待接收消息....."); DeliverCallback deliverCallback=(consumerTag, message)->{ String msg = new String(message.getBody(), "UTF-8"); System.out.println("接收到的消息: "+msg); }; channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
public class Provider { private static final String EXCHANGE_NAME = "fanout_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); // 广播发布,所以不需要指定routingKey channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }
fanout是向所有消费者广播消息;如果我们只想对特定的消费者发送消息,就使用Direct 类型的交换机;
通过绑定routingKey,我们就可以实现消息的指定发送。借由direct交换机实现路由模式
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TAglNT0y-1651586549163)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422130728875.png)]
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,
public class Consumer1 { private static final String EXCHANGE_NAME = "direct_logs"; private static final String QUEUE_NAME = "console"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,true,null); //绑定交换机和队列,指定routingKey channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning"); System.out.println("消费者C1等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println("接收到消息:"+msg);; }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }
public class Consumer2 { private static final String EXCHANGE_NAME = "direct_logs"; private static final String QUEUE_NAME = "disk"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,true,null); //绑定交换机和队列,指定routingKey channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); System.out.println("消费者C2等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println("接收到消息:"+msg);; }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }
public class Provider { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建多个 bindingKey Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info","普通 info 信息"); bindingKeyMap.put("warning","警告 warning 信息"); bindingKeyMap.put("error","错误 error 信息"); //debug 没有这个routingKey对应的队列接收这个消息 所以就丢失了 bindingKeyMap.put("debug","调试 debug 信息"); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } }
尽管使用 direct 交换机可以实现对指定的routingKey进行消息发送;但如果我们想将消息发送到多个routingKey对应的队列,direct交换机就做不到了;这时候我们使用Topic交换机。借由Topic交换机实现主题模式。
topic 交换机与队列绑定的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
Q1–>绑定的是 中间带 orange 带 3 个单词的字符串(** .orange.* *)
Q2–>绑定的是 最后一个单词是 rabbit 的 3 个单词(* . *.rabbit) 第一个单词是 lazy 的多个单词(lazy.#)
我们来看看他们之间数据接收情况是怎么样的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VRKp8iSB-1651586549165)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422140543014.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5OfkkXAi-1651586549165)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422140548570.png)]
当队列绑定关系是下列这种情况时需要引起注意
public class Consumer1 { private static final String EXCHANGE_NAME = "topic_logs"; private static final String QUEUE_NAME = "Q1"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare(QUEUE_NAME,false,false,true,null); //声明 Q1 队列与绑定关系 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" 接收队列 :"+QUEUE_NAME+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+msg); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }
public class Consumer2 { private static final String EXCHANGE_NAME = "topic_logs"; private static final String QUEUE_NAME = "Q2"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare(QUEUE_NAME,false,false,true,null); //声明 Q1 队列与绑定关系 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" 接收队列 :"+QUEUE_NAME+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+msg); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }
public class Provider { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //存放消息和对应的routingKey Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }
死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效.
public class Provider { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //设置消息的 TTL 时间 单位为ms 即10s过期 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i <=10 ; i++) { String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes()); System.out.println("生产者发送消息:"+message); } } } }
public class Consumer1 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key ;这里是指明死信交换机要将死信发送到哪个队列,和上面的绑定不一样 params.put("x-dead-letter-routing-key", "dead"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息"+message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
public class Consumer2 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead"); System.out.println("等待接收死信队列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接收死信队列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NvvJhjpn-1651586549166)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422144646652.png)]
之前的代码注释掉过期时间即可
public class Provider { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //设置消息的 TTL 时间 单位为ms 即10s过期 //AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i <=10 ; i++) { String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE, "normal", null, message.getBytes()); System.out.println("生产者发送消息:"+message); } } } }
声明正常队列时,参数中限制队列长度
注意此时需要把原先队列删除 因为参数改变了
public class Consumer1 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key ;这里是指明死信交换机要将死信发送到哪个队列,和上面的绑定不一样 params.put("x-dead-letter-routing-key", "dead"); //设置队列长度限制 params.put("x-max-length",6); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息"+message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
代码不变
从结果我们可以发现在队列长度满了之后,是将队首的消息踢出并加入死信队列
代码不变
去掉限制队列长度的参数;在回调函数中进行消息拒绝;并且将应答改为手动应答。
public class Consumer1 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key ;这里是指明死信交换机要将死信发送到哪个队列,和上面的绑定不一样 params.put("x-dead-letter-routing-key", "dead"); //设置队列长度限制 //params.put("x-max-length",6); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), "UTF-8"); //只拒绝info5 if(msg.equals("info5")){ System.out.println("Consumer01 接收到消息" + msg + "并拒绝签收该消息"); //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); }else { System.out.println("Consumer01 接收到消息"+msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {}); } }
代码不变
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用定时任务显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BxHMBL2s-1651586549168)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220422225001599.png)]
借由死信队列的TTL消息过期方式即可实现延迟队列
如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
两种实现方式的区别:
<dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
spring.rabbitmq.host=47.106.223.84
spring.rabbitmq.port=5672
spring.rabbitmq.username=govd
spring.rabbitmq.password=123456
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
整合springboot后,交换机,队列,绑定都在配置类中进行声明;
生产者和消费者只专注于消息的处理。
@Configuration public class TtlQueueConfig { //两个交换机和三个队列的名称 public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; //声明交换机 @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } //声明交换机 @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列 @Bean("queueA") public Queue queueA(){ //第一种写法 Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); //第二种写法 //return QueueBuilder.durable(QUEUE_A) //.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE).deadLetterRoutingKey("YD").ttl(10000).build(); } //声明队列 @Bean("queueB") public Queue queueB(){ //第二种写法 return QueueBuilder.durable(QUEUE_B) .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE).deadLetterRoutingKey("YD").ttl(40000).build(); } //声明队列 @Bean("queueD") public Queue queueD(){ //第二种写法 return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } //声明队列 A 绑定 X 交换机 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明队列 B 绑定 X 交换机 @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } //声明死信队列 QD 绑定关系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
@RestController
@Slf4j
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMSg(@PathVariable("message") String message){
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
}
}
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue 里面的消息。
@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数类型需要与发送的消息类型一致
@Component
public class PointConsumer {
//监听的队列名
@RabbitListener(queues = "point.to.point")
public void processOne(String name) {
System.out.println("point.to.point:" + name);
}
}
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
@Component @RabbitListener(queues = "consumer_queue") public class Receiver { @RabbitHandler public void processMessage1(String message) { System.out.println(message); } @RabbitHandler public void processMessage2(Student message) { System.out.println(message.toString()); } // Message类型的body类型为byte[] @RabbitHandler public void processMessage2(Message message) { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg); } }
启动程序,发起一个请求求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p5Jt0GtS-1651586549168)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220423000233514.png)]
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
存在的问题:按照在队列设置ttl的方式,如果每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列;如果要求很多个过期时间,岂不是要增加无数个队列才能满足需求?
在之前的基础上,新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间
消息的TTL由生产者发送消息时来指定。
在之前的基础上添加
public static final String QUEUE_C = "QC";
//声明队列 C
@Bean("queueC")
public Queue queueC(){
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE).deadLetterRoutingKey("YD").build();
}
//声明队列 c 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
在生产者方指定过期时间
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, msg ->{
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
log.info("当前时间:{},发送一条过期时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}
消费者代码无需变动
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
发起请求:
这种方式确实不用无限创建队列了,但是依然存在问题;我们从结果发现过期时长短的反而在过期时长长的后面输出。
之前就声明过在生产者方指定TTL; 消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查队首消息是否过期,如果过期则丢到死信队列, 如果队首消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,只能阻塞在队列中。
从上面我们知道借助死信队列来实现延迟队列,无论哪种方式都不够完美;所以我们借助官方的延时插件来实现。
下载地址: https://www.rabbitmq.com/community-plugins.html
cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cyHEJSIN-1651586549169)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220423173709660.png)]
添加插件之后我们发现交换机的类型增加了一种延迟交换机。
不再借助死信队列的实现方式,即在队列设置过期时间或发送消息时指定过期时间;
而是借助安装了插件之后增加的延迟交换机,我们只要在发送消息时指定延迟时间交给交换机即可,延迟交换机会在延迟时间后将消息发送到队列。
@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingKey"; @Bean public CustomExchange delayedExchange(){ //设置参数,虽然是延迟交换机,但还是direct模式 Map<String,Object> args=new HashMap<>(); args.put("x-delayed-type", "direct"); //自定义交换机的类型 return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args); } @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DELAYED_QUEUE_NAME).build(); } @Bean public Binding dQBindDE(@Qualifier("delayedQueue") Queue DQ,@Qualifier("delayedExchange") CustomExchange DE){ return BindingBuilder.bind(DQ).to(DE).with(DELAYED_ROUTING_KEY).noargs(); } }
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送一条延时时长{}毫秒的信息给队列 delayQueue:{}", new Date(),delayTime, message);
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME,DELAYED_ROUTING_KEY,message,msg->{
//注意这里是设置延迟时间,和之前设置过期时间不一样
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
发起请求:
可以发现完美的达到了我们想要的效果。
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?
在之前的发布确认中我们已经知道了可以有三种方式来处理发布确认,下面看看整合了springboot之后是怎么做的。
确认机制方案
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kwwYbvJQ-1651586549170)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220423224845036.png)]
当mq出现了问题,导致消息无法交付到交换机,或者交换机无法将消息发送到队列时,都应该进行处理;我们可以借助回调函数或者备份机来进行处理;如果使用回调函数的形式,我们要将未成功交付的信息存入一个缓存中,然后再使用定时任务将消息重新发送。
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated //开启发布确认
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
注意这里开启发布确认并借由回调函数解决的是消息无法交付到交换机;如果是交换机无法将消息交到队列,参考后面的回退消息
@Configuration @Slf4j public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //声明业务 Exchange @Bean public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } // 声明确认队列 @Bean public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } // 声明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1"); } }
@Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机不管是否收到消息的一个回调方法 * CorrelationData 消息相关数据,由生产者指定,通常只需要一个id * ack 交换机是否收到消息 * cause 没有收到消息的原因,收到消息时该参数为null * */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null?correlationData.getId():""; if(ack){ log.info("交换机已经收到 id 为:{}的消息",id); }else{ log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause); } } }
@RestController @RequestMapping("/confirm") @Slf4j public class ConfirmController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; /** * @PostConstruct该注解被用来修饰一个非静态的void()方法。 * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。 * @PostConstruct注解 该注解的方法在整个Bean初始化中的执行顺序: * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法) * 从这里理解即先实例化类RabbitTemplate,然后将其Autowired注入到这, * 拿到之后在服务器加载这个controller时再将自定义的ConfirmCallback, set到RabbitTemplate中去 **/ @PostConstruct public void init(){ // confirmCallBack是rabbitTemplate中的一个接口,我们自定义实现后要set进去 rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable("message") String message){ //回调函数携带的相关信息由生产者指定,通常只需要一个id; //如果消息接收失败,通过id我们也可以很方便的将失败的消息进行处理 CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message,correlationData); log.info("发送消息内容:{}",message); } }
上面的代码正常执行,交换机是可以正常收到消息的,正常情况如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3ywqdXFp-1651586549173)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220423233242727.png)]
我们修改一下生产者的代码,随便修改一下交换机名称,即模拟无法交付到交换机的情况
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"123","key1",message,correlationData);
报错信息:
#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange123’ in vhost ‘/’, class-id=60, method-id=40) 即无法找到这个交换机
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会通过回调接口函数给消息生产者发送确认消息,如 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
那么如何 让无法被路由的消进行处理?最起码通知我一声,我好自己处理啊。通过设置 回退消息接口函数可以在当消息传递过程中不可达目的地时将消息返回给生产者;生产者再对无法交付到队列的消息进行处理。
spring.rabbitmq.publisher-returns=true //开启回退消息机制
@Component
@Slf4j
public class MyReturnCallback implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 错误路由 key:{}",
new String(returned.getMessage().getBody()),returned.getExchange(),
returned.getReplyText(),returned.getRoutingKey());
}
}
在之前生产者的基础上将自定义回退消息接口函数set到rabbitTemplate中即可
@RestController @RequestMapping("/confirm") @Slf4j public class ConfirmController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyConfirmCallback myConfirmCallback; @Autowired private MyReturnCallback myReturnCallback; /** * @PostConstruct该注解被用来修饰一个非静态的void()方法。 * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。 * @PostConstruct注解 该注解的方法在整个Bean初始化中的执行顺序: * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法) * 从这里理解即先实例化类RabbitTemplate,然后将其Autowired注入到这, * 拿到之后在服务器加载这个controller时再将自定义的ConfirmCallback, set到RabbitTemplate中去 **/ @PostConstruct public void init(){ // confirmCallBack是rabbitTemplate中的一个接口,我们自定义实现后要set进去 rabbitTemplate.setConfirmCallback(myConfirmCallback); rabbitTemplate.setReturnsCallback(myReturnCallback); } @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable("message") String message){ //回调函数携带的相关信息由生产者指定,通常只需要一个id; //如果消息接收失败,通过id我们也可以很方便的将失败的消息进行处理 CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message,correlationData); log.info("发送消息内容:{}",message); } }
消费者无需变动
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg=new String(message.getBody());
log.info("接受到队列 confirm.queue 消息:{}",msg);
}
}
保证交换机名称正确,确保消息可以交付到交换机;随便改动一下消息发送时的routingKey,模拟无法交付到队列的情况
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key123",message,correlationData);
消息成功交付到交换机,但是因为routingKey错误,无法交付到队列;返回回退消息给生产者。
有了回退消息,我们获得了对消息无法投递到队列的感知能力。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然 后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者 所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。
而且设置回退消息会增 加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的 复杂性,该怎么做呢?
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理。
通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个处理队列,这样所有那些原交换机无法被路由的消息,就会都 进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
备份机用于处理无法路由的消息,即可以省去在生产者设置回退消息;
但备份机不能处理消息无法交付到交换机的情况,所以还是要设置回调接口函数处理交换机异常
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YqLV8Vsp-1651586549174)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424001314042.png)]
在之前的配置类基础上进行修改,增加一个备份交换机和两个与之绑定的队列。
并在confirm交换机的参数中声明备份机;
注意这里前面已经在mq中存在confirmExchange,这里修改了配置,要先删掉confirmExchange
@Configuration @Slf4j public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; //声明confirm Exchange @Bean public DirectExchange confirmExchange(){ // 在参数中声明confirmExchange的备份机backupExchange return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build(); } // 声明确认队列 @Bean public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } // 声明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1"); } //声明备份 Exchange @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } // 声明警告队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } // 声明警告队列绑定关系 @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); } // 声明备份队列 @Bean("backQueue") public Queue backQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } // 声明备份队列绑定关系 @Bean public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); } }
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}
保持交换机名称正确,随便改动routingKey,模拟无法交付到队列;
则消息会交给备份机,然后备份机将消息广播到绑定的队列,其中报警消费者从报警队列进行消费
发送请求:http://localhost:8080/confirm/sendMessage/你好
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DkvvSqei-1651586549175)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424003842299.png)]
回退消息与备份交换机一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先 级高?
从结果可以看出,回退消息并没有执行,直接采用了备份机的方案;即备份机优先级高
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。 举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误 立即回滚,但是在响应客户端的时候也有可能出现网络中断或者异常等等
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但 实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费 者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id。总而言之就是需要一个唯一标识,每次消费消 息时用该 id 先判断该消息是否已消费过。
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基 本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存 在数据库中;优势就是实现简单,就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费,推荐使用。
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如 果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧。
但是,对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理;采用 RabbitMQ 的优先级队列,通过设置消息的优先级,对优先级高的消息优先处理。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lqlqlYHx-1651586549175)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424174540682.png)]
**注意点:**队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样d 消息才是有序的。
生产者:
public class Provider { private static final String QUEUE_NAME="priority_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, params); //给消息赋予一个 priority 属性 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); for (int i = 1; i <=10; i++) { String message = "info"+i; if(i==5){ channel.basicPublish("", QUEUE_NAME, properties, message.getBytes()); }else{ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } System.out.println("发送消息完成:" + message); } } }
消费者:
public class Consumer { private static final String QUEUE_NAME="priority_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, params); System.out.println("消费者启动等待消费......"); DeliverCallback deliverCallback=(consumerTag, message)->{ String msg = new String(message.getBody()); System.out.println("接收到消息:"+msg); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{}); } }
结果:实现了优先级高的消息先处理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hHONn76K-1651586549175)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424182323078.png)]
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消 费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。消息的持久化需要在发送消息时进行参数设置,而惰性队列是直接将接收的消息存入磁盘,只在内存中保留一些指向磁盘位置的索引。
我们知道磁盘的读取速度是很慢的,但是在消费者宕机无法处理消息导致队列消息堆积的话就会占用大量内存空间,这时候就要使用惰性队列了。
队列具备两种模式:default 和 lazy。默认的为 default 模式。
lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sZKDTAU5-1651586549176)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424183528547.png)]
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Un0ZIVcR-1651586549176)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424183731890.png)]
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB
单台 RabbitMQ 服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是 解决实际问题的关键.
下面以搭建三个mq为一个集群为例
vim /etc/hostname
2 .配置各个节点的 hosts 文件,让各个节点都能互相识别对方(在三台主机上都进行配置)
vim /etc/hosts
10.211.55.74 node1
10.211.55.75 node2
10.211.55.76 node3
3.确保各个节点的 erlang的cookie 文件使用的是同一个值
在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
4.启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以 下命令)
rabbitmq-server -detached
5.在节点 2 执行
rabbitmqctl stop_app (rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1 (加入节点1所在的集群)
rabbitmqctl start_app(只启动应用服务)
6.在节点 3 执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2 (加入节点2所在的集群,mq的集群没有主节点之说,只要加入一个在集群中的节点,即是加入了集群)
rabbitmqctl start_app
7.查看集群状态
rabbitmqctl cluster_status
8.给节点2和节点3添加用户和赋予权限 (到此集群就搭建完毕了)
创建账号 rabbitmqctl add_user govd 123456
设置用户角色 rabbitmqctl set_user_tags govd administrator
设置用户权限 rabbitmqctl set_permissions -p “/” govd “." ".” “.*”
9.解除集群节点
比如要解除从集群解除节点2,在节点2执行 :(即重启节点2mq服务,不声明加入集群,然后在节点1遗忘节点2即可)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
在节点1执行:
rabbitmqctl forget_cluster_node rabbit@node2
查看集群状态:
rabbitmqctl cluster_status
即使通过上面的步骤搭建了集群,但是三个mq之间的信息是不互通的;即如果节点1宕机了,那么其持有的消息就会丢失;
即使通过设置队列持久化和消息持久化,在消息未成功持久化之前就宕机依旧会导致消息丢失;
即使成功持久化,节点1宕机的这段时间这些消息将无法处理,这不是我们所期望的;我们希望的是即使某个节点宕机了,其消息仍然可以由另外的节点进行处理;
引入镜像队列(Mirror Queue)的机制,可以将一个节点的队列制作镜像队列存在别的节点上,如果集群中 的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。而且如果此时队列还有一个以上的节点,这个镜像队列还可以变成正常队列,再在另外的节点上复制镜像队列,再次保证消息安全性。
1.启动三台集群节点
2.随便找一个节点添加 policy
3.在 node1 上创建一个队列发送一条消息,队列存在镜像队列
4.停掉 node1 之后发现镜像队列变到了 node2上,节点3的队列成为正常队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5YvAzVg6-1651586549177)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424225601302.png)]
5.就算整个集群只剩下一台机器了 依然能消费队列里面的消息,保证了消息不会无法消费
HAProxy 提供高可用性、负载均衡及基于 TCP HTTP 应用的代理,支持虚拟主机,它是免费、快速并 且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。 HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
即在集群前面加了一层,我们的生产者将发送消息指定的ip是Haproxy的ip,然后Haproxy再帮我们将消息负载均衡的发送给mq集群中的节点;Haproxy自己也做了备机,以防宕机导致不可用。 使用Haproxy和nginx都可以。
1.下载 haproxy(在haproxy的 node1 和 node2)
yum -y install haproxy
2.修改haproxy的 node1 和 node2 的 haproxy.cfg
vim /etc/haproxy/haproxy.cfg
需要修改红色 IP 为mq集群中对应节点ip
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T0w3duqs-1651586549178)(https://gitee.com/GOV_D/my-picture/raw/master/MyPicture/image-20220424230245377.png)]
3.在两台节点启动 haproxy
haproxy -f /etc/haproxy/haproxy.cfg
ps -ef | grep haproxy 查看进程是否启动成功
4.访问地址(使用 vip 地址来访问 rabbitmq 集群)
试想如果前面配置的 HAProxy 主机突然宕机或者网卡失效,那么虽然 RbbitMQ 集群没有任何故障但是 对于外界的客户端来说所有的连接都会被断开结果将是灾难性的为了确保负载均衡服务的可靠性同样显得 十分重要,这里就要引入 Keepalived 它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现 故障转移.
1.下载 keepalived
yum -y install keepalived
2.HAProxy 的节点 node1 配置文件
vim /etc/keepalived/keepalived.conf
把资料里面的 keepalived.conf 修改之后替换
3.节点 node2 配置文件
需要修改 global_defs 的 router_id,如:nodeB
其次要修改 vrrp_instance_VI 中 state 为"BACKUP";
最后要将 priority 设置为小于 100 的值
4.添加 haproxy_chk.sh
(为了防止 HAProxy 服务挂掉之后 Keepalived 还在正常工作而没有切换到 Backup 上,所以这里需要编写一个脚本来检测 HAProxy 务的状态,当 HAProxy 服务挂掉之后该脚本会自动重启 HAProxy 的服务,如果不成功则关闭 Keepalived 服务,这样便可以切换到 Backup 继续工作)
vim /etc/keepalived/haproxy_chk.sh(可以直接上传文件)
修改权限 chmod 777 /etc/keepalived/haproxy_chk.sh
5.启动 keepalive 命令(node1 和 node2 启动)
systemctl start keepalived
6.观察 Keepalived 的日志
tail -f /var/log/messages -n 200
7.观察最新添加的 vip
ip add show
8.node1 模拟 keepalived 关闭状态
systemctl stop keepalived
9.使用 vip 地址来访问 rabbitmq 集群
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。