当前位置:   article > 正文

3、RabbitMQ_工作模式

3、RabbitMQ_工作模式

一、简单模式

简介

简单模式 HelloWorld。一个生产者、一个消费者,不需要设置交换机使用默认的交换机。

代码示例

  • 生产者
    public class Producer {
        //队列名称
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] args) {
    
            //建立连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置目标主机ip
            factory.setHost("192.168.47.128");
            //设置账号名密码
            factory.setUsername("yf");
            factory.setPassword("123456");
    //        修改端口的设置
    //        factory.setPort();
    
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //通道和队列的连接
                /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
                  参数说明:
                     queue:队列名称
                     durable:是否持久化
                     exclusive:是否独占,是否一个消费者监听一个队列
                     autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列
                     arguments:参数
    
                 */
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                for (int i = 0; i < 10; i++) {
                    //需要发送的消息
                    String message = "Hello RabbitMQ!"+i;
                    //通过最基础的发布
                    /*
                    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
                    参数说明:
                        exchange:指定交换机,如果使用默认模式,就使用“”
                        routingKey:路由名称
                        props:配置信息
                        body:发送的消息(要求字节数组)
                     */
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                    System.out.println(" [x] Sent '" + message + "'");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 消费者
    /**
     * 服务端,接收信息
     */
    public class Consumer {
    
        //指定接收队列名称
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            //设置目标主机ip
            factory.setHost("192.168.47.128");
            //设置用户密码
            factory.setUsername("yf");
            factory.setPassword("123456");
            //建立连接
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    String msg = new String(message.getBody(), "UTF-8");
                    System.out.println(" [x] Received '" + msg + "'");
                }
            };
    
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

二、工作队列模式

简介

  • 工作队列与简单模式相比,一个生产者、多个消费者(排它关系),多个消费端共同消费同一个队列中的消息
  • 使用场景:对于消息生产速度大于消费速度场景,可以增加消费者减少单个消费者压力

代码示例

  • 生产者
    public class Producer {
        //队列名称
        private final static String QUEUE_NAME = "work_queues";
    
        public static void main(String[] args) {
    
            //建立连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置目标主机ip
            factory.setHost("192.168.47.128");
            //设置账号名密码
            factory.setUsername("yf");
            factory.setPassword("123456");
    //        修改端口的设置
    //        factory.setPort();
    
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //通道和队列的连接
                /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
                  参数说明:
                     queue:队列名称
                     durable:是否持久化
                     exclusive:是否独占,是否一个消费者监听一个队列
                     autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列
                     arguments:参数
    
                 */
    //            channel.basicQos(1);// 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                for (int i = 0; i < 10; i++) {
                    //需要发送的消息
                    String message = "Hello RabbitMQ!"+i;
                    //通过最基础的发布
                    /*
                    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
                    参数说明:
                        exchange:指定交换机,如果使用默认模式,就使用“”
                        routingKey:路由名称
                        props:配置信息
                        body:发送的消息(要求字节数组)
                     */
                    channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                    System.out.println(" [x] Sent '" + message + "'");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
  • 消费者
    /**
     * 服务端,接收信息
     */
    public class Consumer1 {
    
        //指定接收队列名称
        private final static String QUEUE_NAME = "work_queues";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            //设置目标主机ip
            factory.setHost("192.168.47.128");
            //设置用户密码
            factory.setUsername("yf");
            factory.setPassword("123456");
            //建立连接
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.basicQos(1); // 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    String msg = new String(message.getBody(), "UTF-8");
                    System.out.println(" [x] Received '" + msg + "'");
                }
            };
            /* 这种写法和上面是一样的 使用的是lambda表达式
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String msg = new String(message.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + msg + "'");
            };
             */
    
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

小结

  • 分发机制:轮询分发机制
    • 也就是说当生产者生产了10条消息,2个消费者分别消费5条消息。
  • 应用场景:同一条消息,在多个消费者之间只能有一个消费,应用于只需要单节点消费的场景
    • 发送验证码
    • 发送生日提醒

三、发布订阅模式(Publish/Subscribe)

简介

  • 在订阅模型中,多了一个Exchange 角色:
  • Exchange:交换机(X)。接收生产者发送的消息; 处理投递消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。具体操作根据交换机类型来定义:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

代码示例

  • 生产者

  • 1
  • 消费者

  • 1

该文章还没写完,先发布出来,后面会持续更新!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/347063
推荐阅读
相关标签
  

闽ICP备14008679号