当前位置:   article > 正文

消息中间件--RabbitMQ学习(十四)---高级特性之消费端限流_消息中间件的限流

消息中间件的限流

消费端限流

什么是消费端的限流?

  • 假设一个场景,首先,我们 Rabbitry服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况
  • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据
  • Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume或者 channe设置Qos的值)未被确认前,不进行消费新的消息。
  • void Basicqos(uint prefetchsize, ushort prefetchcount, bool global)
  • prefetch size:0
  • prefetchcount:会告诉 Rabbitmq不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该 consumer将 block掉,直到有消息ack
  • global: true\false是否将上面设置应用于 channel简单点说,就是上面限制是 channel级别的还是 consumer级别
  • prefetchsize和 globali这两项, rabbitmq没有实现,暂且不研究prefetch_ count在no_ask= false的情况下生效,即在自动应答的情况下这两个值是不生效的。

消费端代码实现

public class Consumer {

    public static void main(String[] args) throws Exception{
        //1 创建一个connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.159");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2通过连接工场创建连接
        Connection connection = connectionFactory.newConnection();
        //3通过connection创建channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.#";
        String queueName = "test_qos_queue";
        channel.exchangeDeclare(exchangeName,"topic",true,false,null);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);

        //第一个参数:限制消息的大小  第二个:限制一次接收的消息的个数  第三次:是否进行批量
        channel.basicQos(0,1,false);
        //如果要使用限流方式  必须关闭自动签收下面第二行的false
        channel.basicConsume(queueName,false,new MyConsumer(channel));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

生产端代码

public class Producter {
    public static void main(String[] args) throws Exception{
        //1 创建一个connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.159");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2通过连接工场创建连接
        Connection connection = connectionFactory.newConnection();
        //3通过connection创建channel
        Channel channel = connection.createChannel();
        //开启消息的确认模式
        channel.confirmSelect();
        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.save";

        //发送消息
        String msg = "hello";
//        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes());
        for(int i=0;i<5;i++){
            channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

自定义消费者

public class MyConsumer extends DefaultConsumer {

    private Channel channel;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("consumerTag:  "+consumerTag);
        System.err.println("envelope  "+envelope);
        System.err.println("properties  "+properties);
        System.err.println("body  "+new String(body));

        //确认签收消息,并返回确认接收到消息,会返回broker一个应答
//        channel.basicAck(envelope.getDeliveryTag(),false);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号