赞
踩
最大处理量如果是一秒一万条订单,一秒钟来了两万条,可以先存在消息队列里面,按照能力去消费处理
下单后,需要去调用很多其他系统,使用我们的发布订阅,让需要接受这条消息的服务监听这个queue
在我们一些需要异步调用的场景中,回调
生产者
交换机(需要重点理解)接受生产者的消息,并按照规则推到队列里面,这些规则的配置可以实现不同场景的需求
队列
消费者
docker
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone rabbitmq:management
3.8.8 https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8
22.3 https://www.erlang-solutions.com/downloads/
# 安装erlang rpm -ivh esl-erlang_22.3.1-1_centos_7_amd64.rpm warning: esl-erlang_22.3.1-1_centos_7_amd64.rpm: Header V4 RSA/SHA1 Signature, key ID a14f4fca: NOKEY error: Failed dependencies: 执行以下命令: yum install epel-release yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl yum install socat -y #安装RabbitMQ rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm #添加开机启动 RabbitMQ 服务 chkconfig rabbitmq-server on #启动服务 /sbin/service rabbitmq-server start #查看服务状态 /sbin/service rabbitmq-server status #停止服务(选择执行) /sbin/service rabbitmq-server stop #开启 web 管理插件,rabbitmq 默认不开启 rabbitmq-plugins enable rabbitmq_management # 现在登录如果使用ip是无法登录的 # 添加配置文件,去掉 ip 限制 cd /etc/rabbitmq vim rabbitmq-env.conf # Specifies new style config file location CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf vim rabbitmq.conf loopback_users = none /sbin/service rabbitmq-server restart #创建账号 rabbitmqctl add_user admin 123 #设置用户角色 rabbitmqctl set_user_tags admin administrator #设置用户权限 # set_permissions [-p <vhostpath>] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" #户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 #当前用户和角色 rabbitmqctl list_users # 关闭防火墙 # 查看防火墙状态: systemctl status firewalld.service # 关闭防火墙 systemctl stop firewalld.service # 开机禁用防火墙 systemctl disable firewalld.service
还是国际惯例,咱们来一个 hello world,实现的功能也很简单,创建一个生产者,发送一条 hello world 的消息,再创建一个 消费者,消费这条消息,并在控制台打印
我们创建一个 maven 的简单项目,后面再去整合 SpringBoot, 只需要引入两个依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
创建一个生产者
/** * 生产者:发消息 */ public class Producer { // 队列名称 public static final String QUEUE_NAME = "hello"; // 发消息 public static void main(String[] args) throws IOException, TimeoutException { // 创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 工厂 IP 连接 RabbitMQ 的队列 factory.setHost("172.16.0.28"); // 用户名 factory.setUsername("admin"); // 密码 factory.setPassword("123"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化(默认 false,内存) * 3.该队列是否值供一个消费者进行消费,是否进行消费共享, true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者断开连接后 该队列是否自动删除 false 不自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发消息 String message = "hello world"; /** * 发送一个消费 * 1.发送到哪个交换机 * 2.路由的 key 值是哪个,本次是队列的名称 * 3.其他参数信息 * 4.发送消息的消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕"); } }
消费者
/** * 消费者 */ public class Consumer { // 队列名称 public static final String QUEUE_NAME = "hello"; // 接收消息 public static void main(String[] args) throws IOException, TimeoutException { // 创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 工厂 IP 连接 RabbitMQ 的队列 factory.setHost("172.16.0.28"); // 用户名 factory.setUsername("admin"); // 密码 factory.setPassword("123"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 声明 接收消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; // 声明 取消消息的回调 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息 消费被中断"); }; /** * 消费者消费消息: * 1。 消费哪个队列 * 2. 消费成功后是否要自动应答,true 代表自动应答, false 代表手动应答 * 3。消费者未成功消费的回调 * 4。消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
我们启动下
我们来简单梳理下,在生产者中我们主要做的是,定义一个 队列,并往这个队列中发送消息,消费者中则是指定监听对应的 queue
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了
批量应答的理解,不建议使用,可能会应答没有处理完的消息
发生点在工作线程
没有 ack 的消息会被重新放回队列被别的消费者消费
文字说明,我们启动两个消费者,消费者c1,c2分别接收消息 m1, m2, 在c1 ack之前把c1关掉,这时m1会被c2重新消费
/** * 消息在手动应答时是不丢失,放回队列中重新消费 */ public class Task02 { // 队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); boolean durable = true; // 声明队列 channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); // 解决中文编码 System.out.println("生产者发送消息: " + message); } } } public class Work3 { // 队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1 等待接收消息处理时间较短"); DeliverCallback deliverCallback = (consumerTag, message) -> { // 沉睡 1 s SleepUtils.sleep(1); System.out.println("接收到消息: " + new String(message.getBody(), "UTF-8")); // 手动应答 /** * 1.消息的标记 tag * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; // 采用手动应答 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag)->{ System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }); } } public class Work4 { // 队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2 等待接收消息处理时间较长"); DeliverCallback deliverCallback = (consumerTag, message) -> { // 沉睡 30 s SleepUtils.sleep(30); System.out.println("接收到消息: " + new String(message.getBody(), "UTF-8")); // 手动应答 /** * 1.消息的标记 tag * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; // 采用手动应答 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }); } }
我们先启动 Task02 ,发送两条消息,后面分别启动 Work4, Work3,Work3 和 Work4根据轮训机制,会分别取到一条消息,然后再 ack 之前,我们把 Work4 关掉,会发现两条消息都被 Work3 消费了
这里的处理发生在 ,生产者发送消息的时候
需要分别设置队列和消息的持久化
这里存在一种情况,消息在落盘之前 宕机了,消息也会丢失,后面会讲到处理方式(需要发布确认)
这一小节来处理上一小节提出的问题,确保消息能被发布
发布确认总共有三种策略,下面我们我们分别说明,代码演示下,重点计算下每种策略所花的时间
首先我们需要开启发布确认
main 函数,下面我们分别写三个方法,分别实现 每种发布确认策略
// 批量发消息的个数
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 1. 单个确认
publishMessageIndividually(); // 发布 1000个单独确认消息耗时 398ms
// 2. 批量确认
// publishMessageBatch(); // 发布 1000个批量确认消息耗时 69ms
// 3. 异步批量确认
// publishMessageAsync(); // 发布 1000个异步确认消息耗时 33ms
}
串行,一条消息发布确认后才可以开始下一条消息
没发送一个消息调用一次 channel.waitForConfirms();
// 单个确认 public static void publishMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); // 开启发布确认 channel.confirmSelect(); // 开始时间 long begin = System.currentTimeMillis(); // 批量发消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); // 单个消息马上进行发布确认 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布 " + MESSAGE_COUNT + "个单独确认消息耗时 " + (end - begin) + "ms"); }
计算发送的消息,达到一定量之后调用一次 channel.waitForConfirms();
本质上还是同步,而且会存在某些消息没有被发布的问题,这个实现其实个人感觉有点鸡肋
// 批量发送确认 public static void publishMessageBatch() throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); // 开启发布确认 channel.confirmSelect(); // 开始时间 long begin = System.currentTimeMillis(); // 批量确认大小 int batchSize = 100; // 未确认消息个数 // 批量发消息 批量发布确认 for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); // 判断达到100条消息的时候,批量确认一次 if (i % batchSize == 0) { // 发布确认 channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("发布 " + MESSAGE_COUNT + "个批量确认消息耗时 " + (end - begin) + "ms"); }
这里是通过回调函数来异步确认
// 异步发布确认 public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); // 开启发布确认 channel.confirmSelect(); /** * 线程安全有序的一个哈希表 适用于高并发的情况下 * 1. 轻松的将序号和消息进行关联 * 2. 轻松批量删除条目 只要给到序号 * 3. 支持高并发(多线程) */ ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); // 开始时间 long begin = System.currentTimeMillis(); // 准备消息的监听器 监听哪些消息成功了 哪些消息失败了 // 消息确认成功 回调函数 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { // 2. 删除已经确认的消息,剩下的就是未确认的消息 if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息: " + deliveryTag); }; // 消息确认失败 回调函数 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { // 3. 打印一下未确认的消息都有哪些 String message = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息是 " + message + "未确认的消息: " + deliveryTag); }; /** * 1. 监听哪些消息成功了 * 2. 监听哪些消息失败了 */ channel.addConfirmListener(ackCallback, nackCallback); // 异步通知 // 批量发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); // 1. 此处记录下所有要发送的消息 消息的总和 outstandingConfirms.put(channel.getNextPublishSeqNo(), message); } // 结束时间 long end = System.currentTimeMillis(); System.out.println("发布 " + MESSAGE_COUNT + "个异步确认消息耗时 " + (end - begin) + "ms"); }
有两个点需要说明
TODO 这里可以补充下哈,但还是感谢尚硅谷老师
这一小节会介绍几种常见交换机绑定队列的方式和几种常见交换机
前面我们没有手动去指定交换机
默认会给我们提供一个无名交换机
类似的,如果我们没有给队列命名,我们采用的也就是临时队列
绑定关系则是指的,路由与队列之间的映射关系
下面我们来介绍不同类型的交换机
广播,会把接收到的消息 广播到它知道的所有队列中
/** * 发消息 */ public class EmitLog { // 交换机名称 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息: " + message); } } } /** * 消息接收 */ public class ReceiveLogs01 { // 交换机名称 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个队列 临时队列 // 队列的名称是随机的 // 当消费者断开与队列的连接的时候 队列就自动删除 String queueName = channel.queueDeclare().getQueue(); /** * 绑定交换机与队列 */ channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息,把接收到消息打印在屏幕上......"); // 接收消息 // 消费者取消消息时回调接口 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs01接收到的消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } /** * 消息接收 */ public class ReceiveLogs02 { // 交换机名称 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个队列 临时队列 // 队列的名称是随机的 // 当消费者断开与队列的连接的时候 队列就自动删除 String queueName = channel.queueDeclare().getQueue(); /** * 绑定交换机与队列 */ channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息,把接收到消息打印在屏幕上......"); // 接收消息 // 消费者取消消息时回调接口 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs02接收到的消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
我们可以看到,我们发送的m1和 m2,会被 两个队列全部接收
发送的时候必须指定路由规则,exchange需要根据routingkey把消息发送给每一个匹配的queue
如果多个队列具有相同的 routingkey,和 fanout 的情况就会类似
public class DirectLog { // 交换机名称 private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息: " + message); } } } public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个队列 channel.queueDeclare("console", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "info"); channel.queueBind("console", EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect01接收到的消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume("console", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个队列 channel.queueDeclare("disk", false, false, false, null); channel.queueBind("disk", EXCHANGE_NAME, "error"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect02接收到的消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume("disk", true, deliverCallback, consumerTag -> { }); } }
这里在测试的时候,我们需要向不同的 routingKey 发消息,对应的消息就会根据 routingKey 进入到不同的队列
可以理解为 是在 direct 的基础上加上了模糊匹配的规则,模糊匹配规则有如下两条
public class EmitLogTopic { // 交换机名称 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Scanner scanner = new Scanner(System.in); /** * Q1-->绑定的是 * 中间带 orange 带 3 个单词的字符串(*.orange.*) * Q2-->绑定的是 * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit) * 第一个单词是 lazy 的多个单词(lazy.#) * */ Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } /** * 声明主题交换机 及相关队列 * * 消费者 c1 */ public class ReceiveLogsTopic01 { // 交换机名称 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明一个队列 String queueName = "Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody(), "UTF-8")); System.out.println("接收队列: " + queueName + " 绑定键: " + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } /** * 声明主题交换机 及相关队列 * <p> * 消费者 c1 */ public class ReceiveLogsTopic02 { // 交换机名称 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明一个队列 String queueName = "Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody(), "UTF-8")); System.out.println("接收队列: " + queueName + " 绑定键: " + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
基于注解和编程
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
如果spring-boot-starter-web
则无需重复引入
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
阻塞重试,建议禁用
验证方式,发送消息的时候把 rabbitmq 停用
publisher confirm->生产者把消息成功发送给了 exchange,ack 和 nack
publisher return->exchange路由消息失败会触发
如何开启
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
@Test void testPublisherConfirm() { // 1. 创建 CorrelationData CorrelationData cd = new CorrelationData(); // 2. 给 future 添加 ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 2.1 Future 异常 基本不会出现 log.error("send message fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 2.2 Future 接收到回执的处理逻辑,参数中的 result 就是回执内容 if (result.isAck()) { log.info("发送消息成功,收到 ack"); } else { log.error("发送消息失败,收到nack,reason: {}", result.getReason()); } } }); // 3. 发送消息 rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd); } @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("触发return callback,"); log.info("exchange: {}", returned.getExchange()); log.info("routingKey: {}", returned.getRoutingKey()); log.info("message: {}", returned.getMessage()); log.info("replyCode: {}", returned.getReplyCode()); log.info("replyText: {}", returned.getReplyText()); } }); }
这个案例routingkey是匹配不到 queue 的,所有会返回 ack,然后触发 returnCallback
不建议开启 publisher return ,最多仅仅开启 publisher confirm
如果在开启持久化的同时开启 ack,会在持久化完成后才ack,但是由于持久化是批量的,所以建议 ack 使用异步
直接把消息发到磁盘,而不是先到内存再到磁盘
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
消费处理完消息后的三种回执
三种处理模式
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
// throw new MessageConversionException("故意的"); // reject
throw new RuntimeException(""); // 会重试
}
log.info("消息处理完成");
}
测试方式,先测试 none 模式,会发现直接删掉了。再测试 auto ,分别测试 MessageConversionException
和RuntimeException
,前者删掉,后者触发重试
默认是重新在mq中入队出队
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
可配置在客户端重试
重试三次后,返回 reject 删掉了消息
MessageRecovery
定义
默认是丢弃RejectAndDontRequeueRecoverer
ImmediateRequeueMessageRecoverer
重新入队
RepublishMessageRecoverer
package com.itheima.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
存放没有被消费的消息的队列
概念当中比较重要的是死信的来源,有三个
这三种情况后面会分别模拟,值得说一下的是第三种情况,这里可以看一下之前讲到的 消息未应答时可以重新入队,如果这里配置不入队,就可以被添加到死信队列当中
注意一个点即可,配置的是 普通队列 与 死信交换机之间的关系
/** * 死信队列 生产者 */ public class Producer { // 普通交换机的名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 死信消息 设置 ttl 单位是 ms AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .expiration("10000") .build(); for (int i = 0; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); } } } ** * 普通队列消费者 */ public class Consumer01 { // 普通交换机的名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; // 死信交换机的名称 public static final String DEAD_EXCHANGE = "dead_exchange"; // 普通队列的名称 public static final String NORMAL_QUEUE = "normal_queue"; // 死信队列的名称 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信和普通交换机, 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明普通队列和死信队列 Map<String, Object> arguments = new HashMap<>(); // 过期时间 不在这里设置 改为在生产者设置消息的 ttl // arguments.put("x-message-ttl", 10000); // 正常队列设置死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 设置死信 routing-key arguments.put("x-dead-letter-routing-key", "lisi"); // 设置正常队列长度的限制 // arguments.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); // 绑定普通交换机与队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); // 绑定死信交换机与队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { String s = new String(message.getBody(), "UTF-8"); // if (s.equals("info5")) { // System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8") + "此消息被拒绝"); // channel.basicReject(message.getEnvelope().getDeliveryTag(), false); // } else { System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // } }; // 开启手动应答 channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> { }); } } /** * 死信队列消费者 */ public class Consumer02 { // 死信队列的名称 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, consumerTag -> { }); } }
我们可以看到 普通队列与死信交换机之间的关系
情况一模拟:TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.expiration("10000")
.build();
设置发送的消息的 ttl
模拟方式很简单,先启动 c1 然后关闭,然后启动消费者
情况2 超出队列大小
我们运行一次c2 ,把死信队列里面的消息消费掉
重新开始测试,为避免干扰我们去掉消息的ttl
设置队列最大长度为6,所以按照推测,如果发送11条消息,会有5条(超出部分)进入到死信队列
注:我们这里需要删除原来的队列,因为队列的参数被修改了
管理面板中删除即可
我们再次启动 c1 然后关闭 c1再开启 p
结果符合预期
情况3:
我们首先还是排除干扰,先开启c2 消费掉死信中的消息,然后删除队列normal,再然后注释掉 队列长度的配置
模拟方式也很简单,我们把 info5 这条消息 ,basicReject 给拒绝掉,看这条消息会不会进入到我们的死信队列
延迟队列的应用场景是很多的,订单十分钟内未付款取消等等
延迟队列的实现很简单,其实利用前面我们说到的消息的 ttl 属性就可以实现了
这里说一下 队列设置 ttl 和消息设置 ttl 的区别
这里的整合我们用 SpringBoot
版本 2.3.8.RELEASE (大版本尽量一致)
<dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Swagger 配置类
@Configuration @EnableSwagger2 public class SwaggerConfig { public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo() { return new ApiInfoBuilder() .title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("enjoy6288", "http://atguigu.com", "1551388580@qq.com")) .build(); } }
配置类
@Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; // 声明 xExchange @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } // 声明 xExchange @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列 A ttl 为 10s 并绑定到对应的死信交换机 @Bean("queueA") public Queue queueA() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 声明队列 A 绑定 X 交换机 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明队列 B ttl 为 40s 并绑定到对应的死信交换机 @Bean("queueB") public Queue queueB() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } //声明队列 B 绑定 X 交换机 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } //声明死信队列 QD @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE); } //声明死信队列 QD 绑定关系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
控制层 生产者
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
GET http://localhost:8080/ttl/sendMsg/aaa
创建一条新的队列QC,不在队列上配置 ttl, 在消息上配置 ttl
@Component public class MsgTtlQueueConfig { public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_C = "QC"; //声明队列 C 死信交换机 @Bean("queueC") public Queue queueB() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //没有声明 TTL 属性 return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } //声明队列 B 绑定 X 交换机 @Bean public Binding queueBindingC(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } }
生产者
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
###
GET http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
###
GET http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
存在问题,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
(Callback相关的不用管哈)
rabbitmq_delayed_message_exchange 解压存放到 plugins 目录
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean("delayedQueue") public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } //自定义交换机 我们在这里定义的是一个延迟交换机 @Bean("delayedExchange") public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
我们指定创建延迟交换机
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new
Date(), delayTime, message);
}
现在正常了
官网下载
下载完成后不要勾选启动
先执行安装插件
rabbitmq-plugins enable rabbitmq_management
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。