赞
踩
包括:
基础使用(一个生产者,一个消费者)
WORK 模式 (一个生产者,多个消费者)
FANOUT 模式 队列绑定到交换机
DIRECT 模式 队列绑定到交换机 ,再绑定路由键
TOPIC 模式 队列绑定到交换机 再定义匹配路由键
发送方确认,消费方手动应答
我这里选择的是docker 安装 ,非常快捷
docker pull rabbitmq:management
1.使用默认guest账户 /密码登录
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=用户名 -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 rabbitmq:management
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置:application.yml
对,你没有看错 ,我这里暂时只配了 host 主机 因为我,rabbitmq 安装不在本地 …
为什么我不用配置其他信息?
答:springboot自定装配 rabbitmq有其默认属性,例如host 为 127.0.0.1 账户密码为guest 端口为5672等等,如果是本地demo 可以实现零配置简单运行使用rabbitmq,但是生产环境,还是老老实实改rabbitmq服务器配置(端口,账户密码等 为了安全)
spring:
rabbitmq:
host: xxxx
main:
allow-bean-definition-overriding: true
我这里演示一个生产者,一个消费者
注意 Queue 在 org.springframework.amqp.core 包下即可
队列
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @version 1.0 * @date 2020/7/14 21:20 * @desc 配置一个队列 */ @Configuration public class EasyRabbitConfig { @Bean public Queue easyQueue() { return new Queue("rabbit_easy_queue"); } }
生产者
注入RabbitTemplate 即可快乐的使用我们的小白兔了(RabbitMQ)
import com.leilei.common.Vehicle; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author lei * @version 1.0 * @date 2020/7/14 21:21 */ @Service public class EasyProviderServer { @Autowired private RabbitTemplate rabbitTemplate; public void sendEasyMessage() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("rabbit_easy_queue",new Vehicle(i,i+"车车")); } } }
消费者
import com.leilei.common.Vehicle; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author lei * @version 1.0 * @date 2020/7/14 21:24 */ @Component public class EasyConsumer { @RabbitListener(queues = "rabbit_easy_queue") @RabbitHandler public void process(Vehicle vehicle) { System.out.println("简单消费者接收到车车消息: " + vehicle); } }
测试
测试时,咱进入我们的rabbirmqWEB 管理页面查看一下
通过第一个例子,我们来说明一下基本问题
测试完成后,队列依然存在
发现的问题:队列未被删除
原因:
回到最开始,定义队列开始
点进去看 ,原来其默认配置了队列持久化
我这里演示,一个生产者 多个消费者
队列
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:30
* @desc 工作模式 一个生产者 多个消费者
*/
@Configuration
public class WorkRabbitConfig {
@Bean
public Queue easyQueue() {
return new Queue("rabbit_work_queue");
}
}
生产者发送消息
/** * @author lei * @version 1.0 * @date 2020/7/14 21:21 */ @Service public class WorkProviderServer { @Autowired private RabbitTemplate rabbitTemplate; public void sendWorkMessage() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("rabbit_work_queue",new Vehicle(i,i+"work车车")); } } }
消费者接收消息并消费
/** * @author lei * @version 1.0 * @date 2020/7/14 21:24 */ @Component public class WorkConsumer { @RabbitListener(queues = "rabbit_work_queue") public void work1(Vehicle vehicle) { System.out.println("消费者1--work--接收到车车消息: " + vehicle); } @RabbitListener(queues = "rabbit_work_queue") public void work2(Vehicle vehicle) { System.out.println("消费者2--work--接收到车车消息: " + vehicle); } }
测试
一个生产者 多个消费者 生产者生产的消息被平分到消费者 例如 十条消息 有两个消费者 则每个消费者会消费五次
又叫无路由键交换机模式
交换机基础使用 ,队列绑定到交换机,当发送消息到交换机时,绑定到该交换机的队列都会监听到
配置 队列 交换机 绑定关系等
import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @version 1.0 * @date 2020/7/14 21:58 * @desc 发布订阅模式 配置两个队列一个交换机 */ @Configuration public class FanoutExchangeConfig { /** * 队列一 * @return */ @Bean public Queue FanoutQueueOne() { return new Queue("rabbit_fanout_queue_one"); } /** * 队列二 * @return */ @Bean public Queue FanoutQueueTwo() { return new Queue("rabbit_fanout_queue_two"); } /** * 交换机 声明为FanoutExchange类型 */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_exchange"); } /** * 绑定队列一到交换机 * @param FanoutQueueOne 上方定义的队列一方法名 根据此方法名参数 器会自动注入对应bean * @param fanoutExchange 上方定义的交换机方法名 * @return */ @Bean public Binding bindingFanoutExchangeA(Queue FanoutQueueOne, FanoutExchange fanoutExchange) { return BindingBuilder.bind(FanoutQueueOne).to(fanoutExchange); } /** * 绑定队列二到交换机 * @param FanoutQueueTwo 上方定义的队列二方法名 根据此方法名参数 器会自动注入对应bean 当 * 然也可以省略参数 直接在bind中指定队列构建方法名 例如 FanoutQueueTwo() * * @param fanoutExchange 上方定义的交换机方法名 * @return */ @Bean public Binding bindingFanoutExchangeB(Queue FanoutQueueTwo, FanoutExchange fanoutExchange) { return BindingBuilder.bind(FanoutQueueTwo).to(fanoutExchange); } }
生产者生产消息
@Service
public class FanoutExchangeProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendFanoutExchangeMessage() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertSendAndReceive("fanout_exchange","",new Vehicle(i,i+"发布订阅车车"));
}
}
}
消费者
import com.leilei.common.Vehicle; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author lei * @version 1.0 * @date 2020/7/14 22:14 * @desc 发布订阅消费者 一个对了绑定两个消费者 */ @Component public class FantoutExchangeConsumer { @RabbitListener(queues = "rabbit_fanout_queue_one") public void consumerOne(Vehicle vehicle) { System.out.println("rabbit_fanout_queue_one队列 消费者1:收到消息---" + vehicle); } @RabbitListener(queues = "rabbit_fanout_queue_one") public void consumerOne2(Vehicle vehicle) { System.out.println("rabbit_fanout_queue_one队列 消费者2:收到消息---" + vehicle); } //-------------一个队列绑定两个消费者 -------------------------------- @RabbitListener(queues = "rabbit_fanout_queue_two") public void consumerTwo(Vehicle vehicle) { System.out.println("rabbit_fanout_queue_two队列 消费者1:收到消息---" + vehicle); } @RabbitListener(queues = "rabbit_fanout_queue_two") public void consumerTwo2(Vehicle vehicle) { System.out.println("rabbit_fanout_queue_two队列 消费者2:收到消息---" + vehicle); } }
测试:
发布订阅模式 订阅到交换机的队列 都会获取发布的消息,例如我生产者生产消息循环20次 则 队列一 队列二均会被消费10次
注意,我这里使用了convertSendAndReceive ,所以消息是有序的
又叫直连路由键交换机模式,其会直连指定一个路由键与队列 与交换机进行绑定
配置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @version 1.0 * @date 2020/7/14 22:32 * @desc 路由模式 在发布订阅基础上 添加路由键 吧消息交给符合指定路由键的队列 我这里定义两个队列绑到一个交换机上 对应两个不同的路由键 */ @Configuration public class DirectExchangeConfig { /** * 队列一 * @return */ @Bean public Queue directQueueOne() { return new Queue("rabbit_direct_queue_one"); } /** * 队列二 * @return */ @Bean public Queue directQueueTwo() { return new Queue("rabbit_direct_queue_two"); } /** * 定义交换机 direct类型 * @return */ @Bean public DirectExchange myDirectExchange() { return new DirectExchange("direct_exchange"); } /** * 队列 绑定到交换机 再指定一个路由键 * directQueueOne() 会找到上方定义的队列bean * @return */ @Bean public Binding DirectExchangeOne() { return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with("lei_routingKey_one"); } /** * 队列 绑定到交换机 再指定一个路由键 * @return */ @Bean public Binding DirectExchangeTwo() { return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with("lei_routingKey_two"); } }
生产者生产消息
@Service public class DirectExchangeProvider { @Autowired private RabbitTemplate rabbitTemplate; /** * 三个参数 交换机 路由键 消息 */ public void sendDirectMessageOne() { for (int i = 0; i < 5; i++) { if (i % 2 == 0) { rabbitTemplate.convertAndSend("direct_exchange", "lei_routingKey_one", new Vehicle(i, i + "路由键lei_routingKey_one车车")); } else { rabbitTemplate.convertAndSend("direct_exchange", "lei_routingKey_two", new Vehicle(i, i + "路由键lei_routingKey_two车车")); } } } }
消费者
/** * @author lei * @version 1.0 * @date 2020/7/14 22:49 * @desc 路由模式消费者 我这里rabbit_direct_queue_one 使用两个消费者接受消息 * rabbit_direct_queue_two 使用一个消费者接收消息 */ @Component public class DirectExchangeCousumer { @RabbitListener(queues = "rabbit_direct_queue_one") public void consumerOne(Vehicle vehicle) { System.out.println("rabbit_direct_queue_one队列 消费者1:收到消息---" + vehicle); } @RabbitListener(queues = "rabbit_direct_queue_one") public void consumerTwo(Vehicle vehicle) { System.out.println("rabbit_direct_queue_one队列 消费者2:收到消息---" + vehicle); } @RabbitListener(queues = "rabbit_direct_queue_two") public void consumerDirect(Vehicle vehicle) { System.out.println("rabbit_direct_queue_two队列 :收到消息---" + vehicle); } }
测试:
此效果不明显,我们还可以单独测试两个路由键,使其逻辑清晰点
测试结果贴图
主题模式,其也是交换机模式的一种,与直连路由键交换机的区别在于其可以对交换机做层级匹配,直说可能有点抽象,咱们结合代码
配置
就设定了两个队列 一个 topicExchange交换机 并手动指定了两个路由键 topic.# topic.*
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author PengLei * @date 2020/7/15 0015 9:51 * @desc topic 主题模式 两个队列 一个topicExchange交换机 */ @Configuration public class TopicRabbitConfig { /** * 队列定义 * @return */ @Bean public Queue topicQueueOne() { return new Queue("rabbit_topic_queue_1"); } /** * 队列定义 * @return */ @Bean public Queue topicQueueOTwo() { return new Queue("rabbit_topic_queue_2"); } /** * 定义 TopicExchange 类型交换机 * @return */ @Bean public TopicExchange exchangeTopic() { return new TopicExchange("topic_exchange"); } /** * 队列一绑定到交换机 且设置路由键为 topic.# * @return */ @Bean public Binding bindingTopic1() { return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#"); } /** * 队列一绑定到交换机 且设置路由键为 topic.* * @return */ @Bean public Binding bindingTopic2() { return BindingBuilder.bind(topicQueueOTwo()).to(exchangeTopic()).with("topic.*"); } }
生产者生产消息
@Service
public class TopicRabbitProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendTopMessage() {
for (int i = 0; i < 10; i++) {
if (i%2==0) {
rabbitTemplate.convertSendAndReceive("topic_exchange","topic.lei",new Vehicle(i,i+"一个词路由键车车"));
}else {
rabbitTemplate.convertSendAndReceive("topic_exchange","topic.lei.xxl",new Vehicle(i,i+"多个词路由键车车"));
}
}
}
}
消费者
@Component
public class TopRabbitConsumer {
@RabbitListener(queues = "rabbit_topic_queue_1")
public void listenOne(Vehicle vehicle) {
System.out.println("监听到队列一消息" + vehicle);
}
@RabbitListener(queues = "rabbit_topic_queue_2")
public void listenOTwo(Vehicle vehicle) {
System.out.println("监听到队列二消息" + vehicle);
}
}
测试:
队列一绑定的是 topic.# ,其结果 发送到 路由键 topic.lei.xxl 与 topic.lei 的消息都被接受了 一个词多个词消息都被队列一监听到消费者消费了
队列二 绑定路由键是 topic.* 其结果只有%2==0的消息被接受了,即 topic.lei 仅仅监听到了一个词路由键的消息
* 仅仅会匹配路由键的一个词 # 则可以匹配路由键的多个词
以上为RabbitMQ 中几种模式的使用
但是,我们再开发中或实际生产中,或多或少会出现异常,不会仅仅像我们demo所演示的那般简单以及流畅,那么当出现问题时,例如消息发送成功与否未知,消息未被消息等等
下边,咱们进行简单地操作一下,保证一个生产者消息成功发送,以及消费者消息确认机制
改造咱们的配置 application.yml
spring:
rabbitmq:
#我这里仅写了ip 其余端口账号密码由于是演示 采用默认即可,不必要写
host: xxxx
# 开启消息确认机制 confirm 异步
publisher-confirm-type: correlated
listener:
direct:
# 消息开启手动确认
acknowledge-mode: manual
publisher-returns: true
main:
allow-bean-definition-overriding: true
咱们额外定义一个交换机 以及队列消息 试用下 fanout模式
/** * @author lei * @version 1.0 * @date 2020/7/15 20:15 * @desc 测试confirm 机制,专门创建了一个队列 */ @Configuration public class ConfirmRabbitConfig { @Bean public Queue confirmQueue() { return new Queue("rabbit_confirm_queue"); } @Bean public FanoutExchange confirmExchange() { return new FanoutExchange("confirm_fanout_exchange"); } @Bean public Binding confirmFanoutExchangeBing() { return BindingBuilder.bind(confirmQueue()).to(confirmExchange()); } }
生产者生产消息,配置 confirm消息发送确认机制
package com.leilei.confirm; import com.leilei.common.Vehicle; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author lei * @version 1.0 * @date 2020/7/15 20:16 * @desc */ @Service public class ConfirmServer { @Autowired private RabbitTemplate rabbitTemplate; /** * 配置 confirm 机制 */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 消息相关的数据,一般用于获取 唯一标识 id * @param b 是否发送成功 * @param error 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String error) { if (b) { System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId()); } else { System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error); } } }; /** * 发送消息 参数有:交换机 ,空路由键,消息,并设置一个唯一消息ID */ public void sendConfirm() { rabbitTemplate.convertAndSend("confirm_fanout_exchange", "", new Vehicle(1,"confirm功能的车车"), new CorrelationData("" + System.currentTimeMillis())); //使用咱们上方配置的发送回调方法 rabbitTemplate.setConfirmCallback(confirmCallback); } }
消费者
设置消费者手动应答 先模拟一个正常接收场景
byte数组转对象工具
public static <T> Optional<T> bytesToObject(byte[] bytes) {
T t = null;
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn;
try {
sIn = new ObjectInputStream(in);
t = (T) sIn.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return Optional.ofNullable(t);
}
@Component
public class ConfirmConsumer {
@RabbitListener(queues = "rabbit_confirm_queue")
public void aa(Message message, Channel channel) throws IOException, InterruptedException {
try {
System.out.println("正常收到消息:" + bytesToObject(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 两个布尔值 第二个设为 false 则丢弃该消息 设为true 则返回给队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("消费失败 我此次将返回给队列");
}
}
}
测试:
此为消息成功发送,且消息被成功消费的场景
咱们再模拟一个消费失败的场景
代码修改
测试:
查看RabbitMQ web 管理界面
发现对应队列中 一条消息未被消息,无未应答消息
我们再删除掉异常代码,再次使用生产者发送消息,查看是否会消费两次(之前有一条消息未被消息,正常来说,该消息没有被丢弃则下次会继续投递)
那么,可能有时候,我们消息消费失败了,不需要保存到队列,下次从新投递新的消息,这该怎么处理呢?例如我们发送短信,可能短信没发过来,于是再此尝试 那么如果成功了,也该只啊一条短信过来
我们再消费者中设置消息应答 消息丢弃即可
测试:
查看web管理页面 发现也没有未被消息消息,则说明消息真的被丢弃了
总结
### springboot 整合rabbitmq 整合多种模式 common 为多个模式公共所需 我这里为 一个对象实体 easy 包下 为简单模式 一个生产者 一个消费者 work 包下 为工作模式 一个生产者 多个消费者 多个消费者会轮流获取到队列消息 例如 两个消费者 生产者发送十条消息 则 每个消费者会消费五次 confirm 包下 为rabbitmq 消费发送者确认模式 配合消费者端的手动应答,确保消息被成功发送以及消费 directechange 包下 路由模式 队列绑定到direct交换机再指定路由键 ,生产者发送消息时指定交换机路由键 ,则会被对应队列监听到 fanoutexchange包下 发布订阅模式 队列绑定到fanout交换机 未指定路由键名, 生产者发送消息时指定交换机 路由键指定为空或指定为 "" 则会被所有订阅到交换机的队列监听到 topic 包下 为主题模式 队列绑定到tipic模式交换机 在指定路由键 例如(top.#)或(top.*) # * 区别 * 仅仅会匹配路由键的一个词 如果生产者发送消息到路由键 例如 leilei.one / leilei.two 则会被对应绑定的队列监听到 # 则可以匹配路由键的多个词 如果生产者发送消息到路由键 例如 leilei.one / leilei.one.xxl / leilei.one.xxl.eq 只要是以leilei 路由键为前缀的,无论多少个次都会被监听到 queue 队列默认为持久化状态 发送消息 convertSendAndReceive convertAndSend 区别 使用 convertAndSend 方法时的结果:输出时没有顺序,不需要等待,直接运行 使用 convertSendAndReceive 方法时的结果:输出有顺序 只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有等待间隔时间 确保消息成功发送 confirm 模式 如果失败则可以按照自己逻辑处理保存到数据库失败发送表 可后续做补偿 确保消息成功消费 ack 手动应答 失败时按业务 选择从新投递或者丢弃
rabbitmq初步的学习就到这里了,随着学习的不断深入,再继续更新
附上项目源码:springboot2.3.1整合RabbitMQ
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。