赞
踩
概念
RabbitMQ 消息队列。消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
消息包含消息体和标签,消息题表示带有业务逻辑的数据,如json数据,标签表示这这个消息的key,比如交换机名称和路由键。
消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道
RabbitMQ中的消息都只能存储在队列中,RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 CRound-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
生产者将消息发送给交换器时, 需要一个RoutingKey。交换器和队列绑定时需要一个BindingKey。当BindingKey和RoutingKey相匹时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。
BindingKey并不是在所有的情况下都能生效,它依赖于交换器类型。如fanout 类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。
生产者java代码实现
- //1.0 建立连接
- ConnectionFactory factory = new ConnectionFactory();
-
- //设置mq服务器地址信息
- factory.setHost("172.1.1.183");
- factory.setPort(5672);
- factory.setVirtualHost("/fei");
- factory.setUsername("root");
- factory.setPassword("1234");
-
- // 2.0 建立连接
- Connection connection = factory.newConnection();
-
- // 3.0 建立通道
- Channel channel = connection.createChannel();
-
- //4.0 建立队列
- String queueName = "simple.queue";
- // 参数含义 队列名称,是否持久化,是否独占,没有消费是否丢弃,参数列表
- channel.queueDeclare(queueName, false, false, false, null);
-
- // 5.0 发送消息
- String message = "hello, word!";
- channel.basicPublish("", queueName, null, message.getBytes());
- System.out.println("发送消息成功:【" + message + "】");
-
- //5.0 关闭通道
- channel.close();
- connection.close();
消费者java代码的实现
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("172.1.1.183");
- factory.setPort(5672);
- factory.setVirtualHost("/fei");
- factory.setUsername("root");
- factory.setPassword("1234");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
-
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
-
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
-
- // 4.订阅消息
- channel.basicConsume(queueName, true, new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 5.处理消息
- String message = new String(body);
- System.out.println("接收到消息:【" + message + "】");
- }
- });
- System.out.println("等待接收消息。。。。");
-
- }
springboot集成rabbitmq代码实现
- @RestController
- @RequestMapping("/rabbitmq")
- public class RabbitMQController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @RequestMapping("/simpleMode")
- public ResponseResult simple(){
-
- // 参数1 队列名称 参数2 消息内容
- rabbitTemplate.convertAndSend("simpleMode","hello word");
- return new ResponseResult(200,"生产消息成功");
- }
- }
消费者
- @Component
- @RabbitListener(queuesToDeclare = @Queue("simpleMode"))
- public class SimpleConsumer {
- Logger logger =LoggerFactory.getLogger(SimpleConsumer.class);
- @RabbitHandler
- public void reception(String message){
- logger.info("简单模式接收到队列 simpleMode 消息为-->{}",message);
- }
- }
work queues 模型,创建一个工作队列,用于在多个工作者之间分配耗时的任务。也就是说,多个消费者可以消费同一个消息队列里面的消息,并且是通过轮询的形式,比如说有1,2,3,4这四个消息,按这个模型平均分配的原则,C1和C2分别可以消费两条消息,且都是按顺序的
生产者消息
- @RequestMapping("/workQueue")
- public ResponseResult workQueue(){
- for(int i =0;i<20;i++){
- rabbitTemplate.convertAndSend("workQueue","work->"+i);
- logger.info("工作队列模式 向队列workQueue发送消息为-->work->{}",i);
- }
- return new ResponseResult(200,"生产消息成功");
-
- }
消费者
- @Component
- public class workQueueConsumer {
- Logger logger = LoggerFactory.getLogger(workQueueConsumer.class);
- @RabbitListener(queuesToDeclare = @Queue("workQueue"))
- public void workQueue1(String message){
- logger.info("工作队列模式,消费者1号收到workQueue队列消息为 --> {}",message);
- }
- @RabbitListener(queuesToDeclare = @Queue("workQueue"))
- public void workQueue2(String message){
- logger.info("工作队列模式,消费者2号收到workQueue队列消息为 --> {}",message);
- }
- }
多个消费者可以同时接收到消息并进行消费
生产者代码
- @RequestMapping("/fanoutQueue")
- public ResponseResult fanoutQueue(){
- for(int i =0;i<5;i++){
- rabbitTemplate.convertAndSend("fanout_exchange","","fanout+"+i);
- logger.info("发布订阅模式 向队列workQueue发送消息为-->fanout->{}",i);
- }
- return new ResponseResult(200,"生产消息成功");
- }
交换机以及绑定代码
-
- @Configuration
- public class RabbitConfig {
- private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";
- private static final String FANOUT_QUEUE_A = "fanout_queue_a";
- private static final String FANOUT_QUEUE_B = "fanout_queue_b";
- private static final String FANOUT_QUEUE_C = "fanout_queue_c";
- // private final String FANOUT_QUEUE_NAME3 = "fanout_queue3";
-
- //创建交换机
- @Bean
- public FanoutExchange fanoutExchange()
- {
- return ExchangeBuilder.
- fanoutExchange(FANOUT_EXCHANGE_NAME) //交换机类型 ;参数为名字
- // .topicExchange(FANOUT_EXCHANGE_NAME)
- .durable(true)//是否持久化,true即存到磁盘,false只在内存上
- .build();
- }
-
- //创建队列
- @Bean
- public Queue fanoutA()
- {
- return new Queue(FANOUT_QUEUE_A);
- }
- @Bean
- public Queue fanoutB()
- {
- return new Queue(FANOUT_QUEUE_B);
- }
- @Bean
- public Queue queueC(){
- return new Queue(FANOUT_QUEUE_C);
- }
-
- // 将三个队列绑定到交换机上【队列订阅交换机】 ------------------------------------------------------------------
- @Bean
- Binding bindingExchangeA() {
- return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
- }
- @Bean
- Binding bindingExchangeB() {
- return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
- }
- @Bean
- Binding bindingExchangeC() {
- return BindingBuilder.bind(queueC()).to(fanoutExchange());
- }
消费者代码
-
- @Component
- public class FanoutConsumer {
- Logger logger = LoggerFactory.getLogger(FanoutConsumer.class);
- @RabbitListener(queues = "fanout_queue_a")
- public void fanoutMessage1(String message){
- logger.info("发布订阅模式,消费者1号收到fanout_queue队列消息为 --> {}",message);
- }
- @RabbitListener(queues = "fanout_queue_b")
- public void fanoutMessage2(String message){
- logger.info("发布订阅模式,消费者2号收到fanout_queue队列消息为 --> {}",message);
- }
- @RabbitListener(queues = "fanout_queue_c")
- public void fanoutMessage3(String message){
- logger.info("发布订阅模式,消费者3号收到fanout_queue队列消息为 --> {}",message);
- }
- }
Routing 模型,也称直连模型,可以对消息指明路由键名称,消息队列绑定路由键名称,这样交换机就能根据路由键名称将消息匹配到相应的消息队列上。
生产者代码
- /**
- * Routing 模型 路由模式
- * @return
- */
- @RequestMapping("/routing")
- public ResponseResult routing() {
- rabbitTemplate.convertAndSend("direct_exchange", "info","路由消息hello info!");
- rabbitTemplate.convertAndSend("direct_exchange", "dev","路由消息hello dev!");
- rabbitTemplate.convertAndSend("direct_exchange", "test","路由消息hello test!");
- logger.info("路由模式 向队列workQueue发送消息为-->route->{}","路由消息hello word!");
- return new ResponseResult(200,"路由生产者成功");
- }
交换机绑定代码
- @Configuration
- public class RabbitConfig {
- private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";
- private static final String DIRECT_EXCHANGE_NAME = "direct_exchange";
- private static final String FANOUT_QUEUE_A = "fanout_queue_a";
- private static final String FANOUT_QUEUE_B = "fanout_queue_b";
- private static final String FANOUT_QUEUE_C = "fanout_queue_c";
- // private final String FANOUT_QUEUE_NAME3 = "fanout_queue3";
-
- //创建交换机
- @Bean
- public FanoutExchange fanoutExchange()
- {
- return ExchangeBuilder.
- fanoutExchange(FANOUT_EXCHANGE_NAME) //交换机类型 ;参数为名字
- // .topicExchange(FANOUT_EXCHANGE_NAME)
- .durable(true)//是否持久化,true即存到磁盘,false只在内存上
- .build();
- }
- @Bean
- public DirectExchange directExchange() {
- /**
- * directExchange的参数说明:
- * 1. 交换机名称
- * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
- * 3. 是否自动删除 false:不自动删除 true:自动删除
- */
- return ExchangeBuilder
- .directExchange(DIRECT_EXCHANGE_NAME)
- .durable(true)
- .build();
- }
-
-
- //创建队列
- @Bean
- public Queue fanoutA()
- {
- return new Queue(FANOUT_QUEUE_A);
- }
- @Bean
- public Queue fanoutB()
- {
- // 参数1队列名和 参数2否持久化
- return new Queue(FANOUT_QUEUE_B,true);
- }
- @Bean
- public Queue queueC(){
- return new Queue(FANOUT_QUEUE_C,true);
- }
-
- // 将三个队列绑定到交换机上【队列订阅交换机】 ------------------------------------------------------------------
- @Bean
- Binding bindingExchangeA() {
- return BindingBuilder.bind(fanoutA()).to(directExchange()).with("info");
- }
- @Bean
- Binding bindingExchangeB() {
- return BindingBuilder.bind(fanoutB()).to(directExchange()).with("test");
- }
- @Bean
- Binding bindingExchangeC() {
- return BindingBuilder.bind(queueC()).to(directExchange()).with("dev");
- }
-
- }
消费端
- @Component
- public class RoutingConsumer {
- Logger logger = LoggerFactory.getLogger(RoutingConsumer.class);
- @RabbitListener(queues = "fanout_queue_a")
- public void fanoutMessage1(String message){
- logger.info("路由模式,消费者a号收到fanout_queue队列消息为 --> {}",message);
- }
- @RabbitListener(queues = "fanout_queue_b")
- public void fanoutMessage2(String message){
- logger.info("路由模式,消费者b号收到fanout_queue队列消息为 --> {}",message);
- }
- @RabbitListener(queues = "fanout_queue_c")
- public void fanoutMessage3(String message){
- logger.info("路由模式,消费者c号收到fanout_queue队列消息为 --> {}",message);
- }
- }
在Routing原有模型的基础上进行了增强,可以使用通配符的形式来匹配路由键。
和路由模式差不多,只不过改边了一下路由键的通配符。这里不进行阐述。
rabbitmq由 生产者→交换机→队列→消费者这些组成,消息丢失可以再这些部分传递或者使用中丢失,所以保证消息的可靠性可以从以下三点保证。
确认模式(confirm):可以监听消息是否从生产者成功传递到交换机。
退回模式(return):可以监听消息是否从交换机传递到队列。
消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。
三种模式刚好监听完RabbitMQ的一整套流程。即我们能够由这三种模式得到消息的传递及处理的结果。
在配置文件中配置
publisher-confirm-type: correlated
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- // 参数1 相关配置信息 参数2 交换机是否收到rabbitmq的消息 参数3 失败的原因
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- if(b){
- logger.info("生产者消息发送成功!!!");
- }else {
- logger.info("生产者消息发送到rabbitmq失败:原因为 -->{}",s);
- //失败后处理的业务逻辑
- }
- }
- });
- //消息进行发送
- rabbitTemplate.convertAndSend("fanout_exchange","","fanout+"+i);
配置文件配置
publisher-returns: true
代码实现
- //定义退回模式的回调方法
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- /**
- * @param returnedMessage 失败后将失败信息封装到参数中
- */
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- logger.info("消息对象:{}",returnedMessage.getMessage());
- logger.info("错误码:{}",returnedMessage.getReplyCode());
- logger.info("错误信息:{}",returnedMessage.getReplyText());
- logger.info("交换机:{}",returnedMessage.getExchange());
- logger.info("路由键:{}",returnedMessage.getRoutingKey());
- //处理消息
- }
- });
-
- rabbitTemplate.convertAndSend("fanout_exchange","","fanout+"+i);
- logger.info("发布订阅模式 向队列workQueue发送消息为-->fanout->{}",i);
默认情况下是开启自动确认,当消费者出现异常,消息会在消息队列中清空,可以选择手动确认。
配置文件中开启配置
listener: simple: acknowledge-mode: manual
代码中实现手动确认
- Logger logger = LoggerFactory.getLogger(FanoutConsumer.class);
- @RabbitListener(queues = "fanout_queue_a")
- public void fanoutMessage1(Message message, Channel channel) throws InterruptedException, IOException {
-
- //消息投递序号,消息每次投递该值都会+1
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try
- {
- //int i = 1 / 0;//模拟处理消息出现bug
- logger.info("发布订阅模式,消费者1号收到fanout_queue队列消息为 --> {}",message);
- //签收消息
- /**
- * 参数1:消息投递序号
- * 参数2:是否一次可以签收多条消息
- */
- channel.basicAck(deliveryTag,true);
- }catch (Exception e)
- {
- System.out.println("消息消费失败");
- Thread.sleep(2000);
- //拒签消息
- /**
- * 参数1:消息投递序号
- * 参数2:是否可以一次拒签多条消息
- * 参数3:拒签后消息是否重回队列
- */
- channel.basicNack(deliveryTag,true,true);
- }
-
- }
消费限流其实就是规定消费端获取消息的快慢限制。首先需要配置开启手动确认和消费端最多拉取消息的数量。
- listener:
- simple:
- #手动开启ack
- acknowledge-mode: manual
- #消费端最多拉取消息的数量
- prefetch: 10
由于消费者的处理消息速度不同,可能导致有些性能高的消费者处于等待状态,多个消息平均分发给消费者所以称为公平分发。
不公平分发则是消费者消费完可以继续消费消息。将prefetch设置为1即可。
当消息到达存活时间后还没有被消费,就会被移除队列。
消息存活时间又分为队列存活时间和单个消息存活时间。
队列存活时间,当QueueBuilder.ttl(时间)就可设置。
就是说在队列中指定可以设置的最大优先级,然后再对单条设置一个优先级数;可以在创建队列中设置,从new Queue(队列名)更改为QueueBuilder.maxpriority(10)设置返回队列。
死信队列和普通队列没有太大区别,只不过交换机更换为死信交换机deadLetterExchange。
死信队列用于:
1.队列消息长度到达限制。
2.消费者拒签消息,并且不把消息重新放回原队列。
3.消息到达存活时间未被消费。
消息进入延迟队列不会立马进行消费,只有达到规定时间才能被消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。