当前位置:   article > 正文

RabbitMQ消息模型之Work消息模型

RabbitMQ消息模型之Work消息模型

Work消息模型

* work模型:
*      多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
*      作用:提高消息的消费速度,避免消息的堆积
*      默认采用轮询的方式分发消息
*      如果某个消费者处理消息慢,会导致消息堆积
  • 1
  • 2
  • 3
  • 4
  • 5
生产者
package com.example.demo02.mq.work;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @author Allen
 * 4/10/2024 9:37 PM
 * @version 1.0
 * @description: work模式发送者
 *
 * work模型:
 *      多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
 *      作用:提高消息的消费速度,避免消息的堆积
 *      默认采用轮询的方式分发消息
 *      如果某个消费者处理消息慢,会导致消息堆积
 */
public class WorkSender {
    public static void main(String[] args) throws Exception {
//        1:获取连接
        Connection connection = ConnectionUtils.getConnection();
//        2:创建通道
        Channel channel = connection.createChannel();
//        3:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
        channel.queueDeclare("work.queue", false, false, false, null);
//        4:发送100条消息
for (int i = 0; i < 100; i++) {
            String msg = "work模式消息" + i;
            //休眠i*5毫秒
            TimeUnit.MILLISECONDS.sleep(i * 5);
            // 参数1:交换机名称 参数2:队列名称 参数3:消息的其他属性 参数4:消息的内容
            channel.basicPublish("", "work.queue", null, msg.getBytes());
            System.out.println("work模式发送消息:" + msg);
        }
//        5:关闭通道
        channel.close();
//        6:关闭连接
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
消费者1

能者多劳角色)

package com.example.demo02.mq.work;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/10/2024 9:37 PM
 * @version 1.0
 * @description: work模式消费者1号
 */
public class WorkReciver1 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
        channel.queueDeclare("work.queue", false, false, false, null);
        // 4:定义消费者,消费消息
        // 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象
        Consumer consumer = new DefaultConsumer(channel) {
            // 消费者接收消息调用此方法
            // 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取消息
                String msg = new String(body);
                System.out.println("work模式消费者1号接收消息:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume("work.queue", false, consumer);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
消费者2

消费能力差

package com.example.demo02.mq.work;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @author Allen
 * 4/10/2024 9:37 PM
 * @version 1.0
 * @description: work模式消费者1号
 */
public class WorkReciver2 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
        channel.queueDeclare("work.queue", false, false, false, null);
            //如果此消费者性能较差,配置能者多劳:指定一次获取几条信息,消息消费成功后 ack之后 mq才会发送下一条消息
            channel.basicQos(1);
        // 4:定义消费者,消费消息
        // 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象
        Consumer consumer = new DefaultConsumer(channel) {
            // 消费者接收消息调用此方法
            // 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //模拟二号消费者处理消息慢
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 获取消息:执行业务
                    String msg = new String(body);
                    System.out.println("work模式消费者2号接收消息:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数1:队列名称 参数2:ACK是否自动确认 参数3:消费者对象
        //必须手动确认消息,否则会报406错误
        channel.basicConsume("work.queue", false, consumer);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
结果:

能者多劳

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/424736
推荐阅读
相关标签
  

闽ICP备14008679号