当前位置:   article > 正文

RabbitMQ之快速入门_rabbitmq unacked怎么设置多少

rabbitmq unacked怎么设置多少

1.项目结构

在这里插入图片描述

application.yml

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
#    listener:
#      simple:
#        prefetch: 1    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.基本消息模型

在这里插入图片描述

生产者生产消息发送给队列,消费者监听到队列有消息就进行消费(底层实现是生产者发送消息给默认交换机,再由交换机转发到队列)

package com.yzm.rabbitmq_01.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    public static final String HELLO_QUEUE = "hello_queue";

    /**
     * 消息队列
     * durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
     * exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
     * autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
     */
    @Bean
    public Queue helloQueue() {
        return new Queue(HELLO_QUEUE, true, false, false);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

消息发布

package com.yzm.rabbitmq_01.sender;

import com.yzm.rabbitmq_01.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息发布
 */
@RestController
public class HelloSender {

    private final AmqpTemplate template;

    public HelloSender(AmqpTemplate template) {
        this.template = template;
    }

    @GetMapping("/send")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            template.convertAndSend(RabbitConfig.HELLO_QUEUE, msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

消息监听

package com.yzm.rabbitmq_01.receiver;

import com.yzm.rabbitmq_01.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听
 */
@Component
@RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
public class HelloReceiver {

    @RabbitHandler
    public void receive(String message) {
        System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

启动项目
RabbitMQ服务器上能看到hello_queue队列
在这里插入图片描述
其中:
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量。
Unacked:表示待确认数量;队列分配消息给消费者时,给该条消息一个待确认状态,当消费者确认消息之后,队列才会移除该条消息。
Total:表示待消费数和待确认数的总和

访问:http://localhost:8080/send
在这里插入图片描述

新增一个消费者
注意:这里的监听方式跟之前的不一样,接收的参数是Message消息主体,这种比较推荐

package com.yzm.rabbitmq_01.receiver;

import com.yzm.rabbitmq_01.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听
 */
@Component
public class HelloReceiver2 {

    /**
     * 使用 Message 接收消息,不能使用HelloReceiver那种监听队列方式
     * 而是直接在方法上监听队列,不然会报错 No method found for class java.lang.String
     */
    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
    public void receive2(Message message) {
        System.out.println(" [ 消费者@2 ] Received ==> '" + new String(message.getBody()) + "'");
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

重新启动,访问:localhost:8080/send
在这里插入图片描述
10条消息被2个消费者平均分配了

3 竞争消费者模式

在这里插入图片描述

队列的消息分配方式默认是平均分配,即第一条消息分配给一个消息者,第二条消息就分配给另一个消息者,以此类推…

上面示例有2个消费者监听,由于只是简单的打印语句,所以看不出有什么问题。
我进行修改一下,通过设置线程休眠时间来表示消费者处理消费的任务时间

/**
 * 消息监听
 */
//@Component 注释掉,不用这种监听方式了
@RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
public class HelloReceiver {

    @RabbitHandler
    public void receive(String message) {
        System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
    }

}

/**
 * 消息监听
 */
@Component
public class HelloReceiver2 {

    private int count1=1;
    private int count2=1;

    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
    public void receive1(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

运行结果:
在这里插入图片描述
现在就能很明显的看出,消费者1号很快地处理完消息后就处于空闲状态;而消费者2号却一直很忙碌。当消息数量成千上万的时候,由消费者2号处理的消息会堆积很多,达不到时效性。

针对这种问题,rabbitmq提供了一种解决方案。
设置prefetch参数=1,实现原理是:队列只会分配一条消息给对应的监听消费者,收到消费者的确认回复之后才会重新分配另一条消息。

启动prefetch功能(方式一)

@Configuration
public class RabbitConfig {

    public static final String HELLO_QUEUE = "hello_queue";
    public static final String PREFETCH_ONE = "prefetchOne";
    
    /**
     * 消息队列
     * durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
     * exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
     * autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
     */
    @Bean
    public Queue helloQueue() {
        return new Queue(HELLO_QUEUE, true, false, false);
    }

    @Bean(name = PREFETCH_ONE)
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOne(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 手动确认
        // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        return factory;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

同时修改消费者,containerFactory = RabbitConfig.PREFETCH_ONE

@Component
public class HelloReceiver2 {

    private int count1 = 1;
    private int count2 = 1;

    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
    public void receive1(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

运行结果:
在这里插入图片描述
1号处理了8条消息,2号2条,工作效率提高了不少

启动prefetch功能(方式二,全局)

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        prefetch: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
@Component
public class HelloReceiver2 {

    private int count1 = 1;
    private int count2 = 1;

//    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
    public void receive1(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

//    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }

    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
    public void receive3(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@3号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(queues = RabbitConfig.HELLO_QUEUE)
    public void receive4(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@4号 ] 处理消息数:" + count2++);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

运行结果:
在这里插入图片描述

第二种方式是全局配置,应用于所有监听消费者

相关链接

首页
上一篇:Linux安装
下一篇:消息确认机制ACK

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/391305
推荐阅读
相关标签
  

闽ICP备14008679号