当前位置:   article > 正文

Rabbit MQ_rabbitmq 绑定多个队列

rabbitmq 绑定多个队列

Rabbit MQ

什么是中间件

中间件是分布式网络环境中,将不同技术架构的服务相互连接起来处理业务,实现了多种标准的协议接口,可以对接不同语言技术开发的服务,达到系统的解耦合

什么是分布式系统?

一个业务处理需要多个服务系统相互协同处理解决,这些服务不是在一个系统中,而是分开的多个相互独立的系统;

好处:

  1. 合理分配服务资源,针对不同的服务,提供相应的硬件服务器资源
  2. 各个系统服务相互独立,出现故障不会影响其他服务系统不可用,降低耦合性
  3. 系统的技术开发更灵活,可以使用多种语言技术

问题:

  1. 技术栈变多,学习成本维护高
  2. 项目系统的复杂性提高,可能会出现更多复杂的问题

基于消息中间件的分布式架构

消息中间件:

  1. 提供可靠的消息传递机制进行系统服务之间的通讯

优点:

  1. 跨系统平台进行消息传递通讯
  2. 处理高并发的流量削峰
  3. 数据的并发异步处理

AMQP协议

Advanced Message Queuing Protocol 高级消息队列协议

提供统一的一套标准的消息队列协议,是应用层的一个开放标准,面向消息中间件;

基于此协议可以对接不同技术语言的服务

  1. 支持分布式事务

  2. 支持消息的持久化

  3. 处理消息的高性能和高可靠

面试题
  1. 为什么消息中间件不用http协议而是使用AMQP协议?
    • http的请求响应报文比较复杂,携带的cookie和加密解密,状态码等功能有些鸡肋,只应用消息的传递,存储和分发功能不需要这么繁重复杂的协议,而是需要高性能,简洁快速,有效的协议
    • 对于消息传递需要有可靠的持久化支持,解决消息丢失等问题,而http连接不会对消息进行持久化处理;
模型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eXclIJXE-1662295572445)(\p\RabbitMQ工作模型.png)]

简单命令

docker 进入容器内部

docker exec -it b1da550086cc /bin/bash
  • 1

设置用户密码

rabbitmqctl add_user admin admin
  • 1

设置用户管理员权限

rabbitmqctl set_user_tags admin administrator
  • 1

退出容器不关闭容器

按住Ctrl+P+Q

概念
  1. Producer生产者:生产消息的一方

  2. Consumer消费者:消费消息的一方

  3. Exchange交换机

    消息并不是直接投递到消息队列中,而是有交换机分配消息到队列中,有四种类型direct(默认)fanout, topic, 和 headers

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IhSGUH6h-1662295572447)(\p\24007899.jpg)]

    生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效

    RabbitMQ 中通过 Binding(绑定)Exchange(交换器)Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YgPDl9v4-1662295572448)(p\70553134.jpg)]

    生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。

  4. Queue(消息队列)

    用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

    多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。

  5. Broker(消息中间件的服务节点)

    一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0XMCXu96-1662295572449)(\p\67952922.jpg)]

  6. Exchange Types(交换器类型)
    1. fanout

      把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。

    2. *direct

      把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中

      如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为"Info”或者"debug”,消息只会路由到Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。

      direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XUY71D3y-1662295572450)(p\37008021.jpg)]

    3. topic

      direct类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

      • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
      • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
      • BindingKey 中可以存在两种特殊字符串“”和“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xe9gU1yL-1662295572450)(\p\73843.jpg)]

      • 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2;
      • 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
      • 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
      • 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中;
      • 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。
    4. headers**

      headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在

工作模式

简单模式 Simple

一个生产者 -> 一个队列 -> 一消费者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zdAZ4nNp-1662295572451)(p\工作模式-simple.png)]

public class Producer {
    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立连接Connection
            connection = connectionFactory.newConnection("生产者");
            //获取通道Channel
            channel = connection.createChannel();
            //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息
            String queueName = "queue1";
            /**
             * 队列名字
             * 是否要持久化
             * 排他性,是否是个独占队列
             * 是否自动删除,最后一个消费者消费完后是否删除队列
             * 携带的附属参数
             */
            channel.queueDeclare(queueName, false, false, false, null);
            //准备消息内容
            String message = "hello";
            //发送消息给队列
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}





public class Customer {
    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立连接Connection
            connection = connectionFactory.newConnection("生产者");
            //获取通道Channel
            channel = connection.createChannel();
            //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息
            channel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收消息失败...");
                }
            });
            return;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
面试题
  1. 为什么RabbitMQ是基于通道Channel去处理而不是基于连接Connection?

    TCP的连接断开需要消耗很多资源开销,影响性能,

    于是在使用Connection长连接基础上,创建多个信道channel处理消息,性能更高,

    信道之间相互独立隔离;

  2. 可以存在没有交换机的队列吗?

    不可能的,虽然没有指定具体的交换机,但是会有个默认的交换机;

工作模式 Work
  1. 轮询模式 : 一个消费者一条,按均分配

    自动应答

    不会因为服务器处理速度慢而少分配发送消息,还是一样的均分

  2. 公平分发模式 : 根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配

    要手动应答

发布订阅模式

相关应用场景:邮件群发,群聊,广播

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1y5IzzU3-1662295572452)(\p\1913282-20220730231211734-1176611840.png)]

package com.ung.rabbit.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author ung
 * @Description: 生产者
 */
public class Producer {
    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立连接Connection
            connection = connectionFactory.newConnection("生产者");
            //获取通道Channel
            channel = connection.createChannel();
            //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息

            //创建交换机
            String exchangeName = "fanout_exchange";
            //类型
            String type ="fanout";
            // 3.声明交换机对象
            channel.exchangeDeclare(exchangeName, type);
            //路由键
            String routekey ="";

            String queueName1 = "queue1";
            String queueName2 = "queue2";
            String queueName3 = "queue3";
            /**
             * 队列名字
             * 是否要持久化
             * 排他性,是否是个独占队列
             * 是否自动删除,最后一个消费者消费完后是否删除队列
             * 携带的附属参数
             *
             *
             * 创建队列
             */
            channel.queueDeclare(queueName1, false, false, false, null);
            channel.queueDeclare(queueName2, false, false, false, null);
            channel.queueDeclare(queueName3, false, false, false, null);


            // 5.绑定到交互机
            // fanout_exchange 绑定了 3个队列
            channel.queueBind(queueName1,  exchangeName,routekey);//指定交换机
            channel.queueBind( queueName2,  exchangeName, routekey);
            channel.queueBind( queueName3,  exchangeName, routekey);

            //准备消息内容
            String message = "hello fanout";
            //发送消息给队列
            channel.basicPublish(exchangeName, routekey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
package com.ung.rabbit.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ung
 * @Description: 消费者
 */
public class Customer {

    public static Runnable runnable  = new Runnable() {
        @Override
        public void run() {
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.10.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            connectionFactory.setVirtualHost("/");
            Connection connection = null;
            Channel channel = null;
            try {
                //建立连接Connection
                connection = connectionFactory.newConnection("生产者");
                //获取通道Channel
                channel = connection.createChannel();
                //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息
                String queueName = Thread.currentThread().getName();
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println(queueName+"收到消息是: " + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("接收消息失败...");
                    }
                });
                return;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                //关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };


    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
路由模式
  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。
  2. 消息生产者将消息发送给交换机,交换机按照路由判断,将路由到的RouteKey的消息,推送与之绑定的队列,交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WXPGX5yZ-1662295572453)(p\1913282-20220730231303785-1294379445.png)]

package com.ung.rabbit.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author ung
 * @Description: 生产者
 */
public class Producer {
    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立连接Connection
            connection = connectionFactory.newConnection("生产者");
            //获取通道Channel
            channel = connection.createChannel();
            //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息

            //创建交换机
            String exchangeName = "direct_exchange";
            //类型
            String type ="direct";
            // 3.声明交换机对象
            channel.exchangeDeclare(exchangeName, type);
            //路由键
            String routekey1 ="k1";
            String routekey2 ="k2";
            String routekey3 ="k3";

            String queueName1 = "queue1";
            String queueName2 = "queue2";
            String queueName3 = "queue3";
            /**
             * 队列名字
             * 是否要持久化
             * 排他性,是否是个独占队列
             * 是否自动删除,最后一个消费者消费完后是否删除队列
             * 携带的附属参数
             *
             *
             * 创建队列
             */
            channel.queueDeclare(queueName1, false, false, false, null);
            channel.queueDeclare(queueName2, false, false, false, null);
            channel.queueDeclare(queueName3, false, false, false, null);


            // 5.绑定到交互机
            // fanout_exchange 绑定了 3个队列
            channel.queueBind(queueName1,  exchangeName,routekey1);//指定交换机
            channel.queueBind( queueName2,  exchangeName, routekey2);
            channel.queueBind( queueName3,  exchangeName, routekey3);

            //准备消息内容
            String message1 = "hello direct1";
            String message2 = "hello direct2";
            String message3 = "hello direct3";
            //发送消息给队列
            channel.basicPublish(exchangeName, routekey1, null, message1.getBytes());
            channel.basicPublish(exchangeName, routekey2, null, message2.getBytes());
            channel.basicPublish(exchangeName, routekey3, null, message3.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
package com.ung.rabbit.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ung
 * @Description: 消费者
 */
public class Customer {

    public static Runnable runnable  = new Runnable() {
        @Override
        public void run() {
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.10.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            connectionFactory.setVirtualHost("/");
            Connection connection = null;
            Channel channel = null;
            try {
                //建立连接Connection
                connection = connectionFactory.newConnection("生产者");
                //获取通道Channel
                channel = connection.createChannel();
                //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息
                String queueName = Thread.currentThread().getName();
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("收到消息是: "+queueName + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("接收消息失败...");
                    }
                });
                return;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                //关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };


    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
模糊匹配主题 Topic模式
  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

  2. 此时的自己唯一的Routekey,不是一个确定值,像我们熟悉的正则表达式对应的匹配规则。

  3. 生产者产生消息,把消息交给交换机,交换机根据RouteKey的模糊匹配到对应的队列,由队列监听消费者消费消息。

  4. 命名规则是多个单词用顿号(.)分隔开代表代表一个单词,*代表多个单词,#代表0个或多个

    image

package com.ung.rabbit.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author ung
 * @Description: 生产者
 */
public class Producer {
    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立连接Connection
            connection = connectionFactory.newConnection("生产者");
            //获取通道Channel
            channel = connection.createChannel();
            //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息

            //创建交换机
            String exchangeName = "topic_exchange";
            //类型
            String type ="topic";
            // 3.声明交换机对象
            channel.exchangeDeclare(exchangeName, type);

            String queueName1 = "queue1";
            String queueName2 = "queue2";
            String queueName3 = "queue3";
            /**
             * 队列名字
             * 是否要持久化
             * 排他性,是否是个独占队列
             * 是否自动删除,最后一个消费者消费完后是否删除队列
             * 携带的附属参数
             *
             *
             * 创建队列
             */
            channel.queueDeclare(queueName1, false, false, false, null);
            channel.queueDeclare(queueName2, false, false, false, null);
            channel.queueDeclare(queueName3, false, false, false, null);
            //路由键
            String routekey1 ="k.*";
            String routekey2 ="k.#";
            String routekey3 ="k.k";

            // 5.绑定到交互机
            // fanout_exchange 绑定了 3个队列
            channel.queueBind(queueName1,  exchangeName,routekey1);//指定交换机
            channel.queueBind( queueName2,  exchangeName, routekey2);
            channel.queueBind( queueName3,  exchangeName, routekey3);

            //准备消息内容
            String message1 = "hello topic1";
            String message2 = "hello topic2";
            String message3 = "hello topic3";
            //发送消息给队列
            channel.basicPublish(exchangeName, "k.k", null, message1.getBytes());
//            channel.basicPublish(exchangeName, routekey2, null, message2.getBytes());
//            channel.basicPublish(exchangeName, routekey3, null, message3.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
package com.ung.rabbit.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ung
 * @Description: 消费者
 */
public class Customer {

    public static Runnable runnable  = new Runnable() {
        @Override
        public void run() {
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.10.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            connectionFactory.setVirtualHost("/");
            Connection connection = null;
            Channel channel = null;
            try {
                //建立连接Connection
                connection = connectionFactory.newConnection("生产者");
                //获取通道Channel
                channel = connection.createChannel();
                //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息
                String queueName = Thread.currentThread().getName();
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("收到消息是: "+queueName + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("接收消息失败...");
                    }
                });
                return;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                //关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };


    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

SpringBoot使用RabbitMQ

新建一个springboot项目作为生产者

先导入依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

yml配置文件配置

# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.10.100
    port: 5672
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
fanout发布订阅模式

配置config

声明交换机,队列,绑定交换机和队列

package com.ung.rabbitmq_springboot.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author ung
 * @Description:
 */
@Configuration
public class RabbitMQConfig {
    //声明交换机 fanout模式
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //声明队列 sms.fanout.queue 、email.fanout.queue 、shortMessage.fanout.queue
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Queue shortMessageQueue() {
        return new Queue("shortMessage.fanout.queue", true);
    }

    //交换机和队列进行绑定
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding shortMessageBinding() {
        return BindingBuilder.bind(shortMessageQueue()).to(fanoutExchange());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

service发送消息

package com.ung.rabbitmq_springboot;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * @Author ung
 * @Description:
 */
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void makeOrder(String userId,String productId, int num){
        //根据商品id查库存
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:"+orderId);
        //保存订单
        //分发消息
        /**
         * 1、交换机
         * 2、路由键/队列名
         * 3、消息
         */
        String exchangeName ="fanout_order_exchange";
        String routingKey ="";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

在springboot测试类中启动

@SpringBootTest
class RabbitmqSpringbootApplicationTests {
    @Autowired
    private OrderService orderService;
    @Test
    void contextLoads() {
        orderService.makeOrder("1","1",1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

消息发送成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AtlBFVrd-1662295572454)(\p\image-20220901202410228.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-toEvaZDc-1662295572455)(p\image-20220901202604981.png)]

新建另一个spring boot消费者

一样的依赖和配置文件

创建消费者类,监听队列,注意要注入到spring容器

package com.ung.springboot_rabbitmq_client.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class FanoutEmailConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到email:"+message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
package com.ung.springboot_rabbitmq_client.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
@RabbitListener(queues = {"shortMessage.fanout.queue"})
@Service
public class FanoutShortMessageConsumer {
    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到shortMessage:"+message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
package com.ung.springboot_rabbitmq_client.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
@RabbitListener(queues = {"sms.fanout.queue"})
@Service
public class FanoutSMSConsumer {
    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到sms:"+message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费消息成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KqHk08Du-1662295572455)(p\image-20220901202651330.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vdmd1f35-1662295572456)(p\image-20220901202711468.png)]

direct路由模式

config配置

package com.ung.rabbitmq_springboot.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author ung
 * @Description:
 */
@Configuration
public class RabbitMQDirectConfig {
    //声明交换机 direct模式
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }

    //声明队列 sms.direct.queue 、email.direct.queue 、shortMessage.direct.queue
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.direct.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.direct.queue", true);
    }

    @Bean
    public Queue shortMessageQueue() {
        return new Queue("shortMessage.direct.queue", true);
    }

    //交换机和队列进行绑定
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding shortMessageBinding() {
        return BindingBuilder.bind(shortMessageQueue()).to(directExchange()).with("shortMessage");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

service 里发送消息

package com.ung.rabbitmq_springboot;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * @Author ung
 * @Description:
 */
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrderDirect(String userId, String productId, int num) {
        //根据商品id查库存
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        //保存订单
        //分发消息
        /**
         * 1、交换机
         * 2、路由键/队列名
         * 3、消息
         */
        String exchangeName = "direct_order_exchange";
        String routingKey = "sms";
        rabbitTemplate.convertAndSend(exchangeName, "sms", "sms" + orderId);
        rabbitTemplate.convertAndSend(exchangeName, "email", "email" + orderId);
        rabbitTemplate.convertAndSend(exchangeName, "shortMessage", "shortMessage" + orderId);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

消费者这边

package com.ung.springboot_rabbitmq_client.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
@RabbitListener(queues = {"sms.direct.queue"})
@Service
public class DirectSMSConsumer {
    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到sms:"+message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
package com.ung.springboot_rabbitmq_client.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
@RabbitListener(queues = {"shortMessage.direct.queue"})
@Service
public class DirectShortMessageConsumer {
    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到shortMessage:"+message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
package com.ung.springboot_rabbitmq_client.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
@RabbitListener(queues = {"email.direct.queue"})
@Service
public class DirectEmailConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到email:"+message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
topic主题模式

使用注解方式

符号“#”匹配路由键的一个或多个词,符号“*”匹配路由键的一个词

消费者

package com.ung.springboot_rabbitmq_client.service.topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
//@RabbitListener(queues = {"email.topic.queue"})
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "*.email.*"
))
@Service
public class TopicEmailConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到email:"+message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
package com.ung.springboot_rabbitmq_client.service.topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
//@RabbitListener(queues = {"shortMessage.topic.queue"})
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "shortMessage.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "*.shortMessage.#"
))
@Service
public class TopicShortMessageConsumer {
    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("收到shortMessage:"+message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
package com.ung.springboot_rabbitmq_client.service.topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

/**
 * @Author ung
 * @Description:
 */
//@RabbitListener(queues = {"sms.topic.queue"})
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.sms.#"
))
@Service
public class TopicSMSConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("收到sms:" + message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

生产者

    public void makeOrderTopic(String userId, String productId, int num) {
        //根据商品id查库存
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        //保存订单
        //分发消息
        /**
         * 1、交换机
         * 2、路由键/队列名
         * 3、消息
         */
        String exchangeName = "direct_order_exchange";
        //#.sms.#
        //*.email.*
        //*.shortMessage.#
        String routingKey = "sms";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

高级

1、过期时间 TTL

对消息设置过期时间,在这个时间内都可以被消费者消费,一旦这个时间过去后,就会被自动删除

可以对消息和队列设置TTL

  1. 通过队列的属性设置,队列中的所有消息都是相同的过期时间
  2. 对消息进行单独设置,每条消息的TTL不同
给队列设置TTL

创建交换机和队列,并给队列设置TTL过期时间

package com.ung.rabbitmq_springboot.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author ung
 * @Description:
 */
@Configuration
public class RabbitMQTTLConfig {
    //声明交换机
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttl_order_exchange", true, false);
    }

    //声明队列
    @Bean
    public Queue ttlDirectQueue() {
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//这里一定是int类型 注意是  x-message-ttl的参数
        return new Queue("ttl.direct.queue",true,false,false,args);}
    //绑定队列和交换机
    @Bean
    public Binding ttlDirectBinding() {
        return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with("ttl");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

给交换机发送消息

  public void makeOrderTTLQueue(String userId, String productId, int num) {
        //根据商品id查库存
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttl";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建ttl队列,有ttl标志 可以看到有一个消息在里面,过5秒后自动删除

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5maFKzEg-1662295572457)(p\image-20220904153451005.png)]

给消息设置TTL

声明普通队列

   //声明交换机
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttl_order_exchange", true, false);
    }

    //声明一个普通队列
    @Bean
    public Queue directMessageQueue() {
        return new Queue("ttl.message.direct.queue", true);
    }

    //绑定队列和交换机
    @Bean
    public Binding ttlMessageDirectBinding() {
        return BindingBuilder.bind(directMessageQueue()).to(ttlDirectExchange()).with("ttlMessage");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

发送消息时指定消息的TTL过期时间

 public void makeOrderTTLMessage(String userId, String productId, int num) {
        //根据商品id查库存
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttlMessage";
        //给消息设置过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置过期时间5s
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId,messagePostProcessor);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

可以看到创建了一个队列,并有一个消息,5秒后自动删除

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JY74xdZy-1662295572457)(p\image-20220904154500160.png)]

问题
  1. 如果队列设置了TTL,消息也设置了TTl,那会使用哪一个TTL过期时间?

    哪个TTL时间小,最先过期就使用哪一个

  2. 那TTL队列和TTL消息最大的区别,有什么不同?

    给队列设置TTL过期时间,相当于给这个队列一个TTL的标记,当队列的消息过期后,

    可以进入配置的死信队列中,做下一步处理,

    而消息设置过期时间,在普通队列中会自动删除,不会进入到死信队列中

2、死信队列 DLX

DLX,全称 Dead-Letter-Exchange 也叫死信交换机

当一个消息在队列中变成死信消息后,会被重新发送到死信交换机;绑定DLX死信交换机的队列就是死信队列

那消息如何才能变为死信消息?

  1. 消息被拒绝消费
  2. 消息过期
  3. 队列达到最大长度

队列在代码中之前已经配置好了,再修改的话会报错

创建死信交换机,死信队列,其实就是跟普通交换机,队列一样

package com.ung.rabbitmq_springboot.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author ung
 * @Description:
 */
@Configuration
public class RabbitMQDeadConfig {
    //声明交换机
    @Bean
    public DirectExchange deadDirectExchange() {
        return new DirectExchange("dead_order_exchange", true, false);
    }


    //声明一个普通队列
    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }

    //绑定队列和交换机
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

最关键的设置

在普通队列中设置绑定死信交换机和死信队列路由键

package com.ung.rabbitmq_springboot.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author ung
 * @Description:
 */
@Configuration
public class RabbitMQTTLConfig {
    //声明交换机
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttl_order_exchange", true, false);
    }

    //声明队列
    @Bean
    public Queue ttlDirectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是int类型 注意是  x-message-ttl
        //设置死信交换机
        args.put("x-dead-letter-exchange", "dead_order_exchange");//注意是  x-dead-letter-exchange
        //设置死信队列路由键
        args.put("x-dead-letter-routing-key", "dead");//注意是  x-dead-letter-routing-key
        return new Queue("ttl.direct.queue", true, false, false, args);
    }

    //声明一个普通队列
    @Bean
    public Queue directMessageQueue() {
        return new Queue("ttl.message.direct.queue", true);
    }

    //绑定队列和交换机
    @Bean
    public Binding ttlMessageDirectBinding() {
        return BindingBuilder.bind(directMessageQueue()).to(ttlDirectExchange()).with("ttlMessage");
    }

    //绑定队列和交换机
    @Bean
    public Binding ttlDirectBinding() {
        return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with("ttl");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

跟过期时间队列一样的发送消息

可以看到死信队列创建,原来的TTL队列有DLX的标志

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f816rwvj-1662295572463)(p\image-20220904160934569.png)]

过5秒后,TTL队列里的消息过期,就会到死信队列里

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iBwJmRvn-1662295572464)(p\image-20220904161008336.png)]

设置队列的最大长度
    //声明队列
    @Bean
    public Queue ttlDirectQueue() {
        Map<String, Object> args = new HashMap<>();

        //设置队列最大长度
        args.put("x-max-length", 5);//x-max-length

        args.put("x-message-ttl", 5000);//这里一定是int类型 注意是  x-message-ttl
        //设置死信交换机
        args.put("x-dead-letter-exchange", "dead_order_exchange");//注意是  x-dead-letter-exchange
        //设置死信队列路由键
        args.put("x-dead-letter-routing-key", "dead");//注意是  x-dead-letter-routing-key
        return new Queue("ttl.direct.queue", true, false, false, args);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

发送11条消息

设置队列最大长度5 后,超出的消息会被发送到死信队列中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OHKAMrtU-1662295572465)(p\image-20220904161948827.png)]

3、磁盘监控

RabbitMQ的内存大小

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zhhPpvOK-1662295572466)(p\image-20220904163212972.png)]

当出现警告的时候,可以通过配置去修改和调整

参考帮助文档:http://www.rabbbitmq.com/configure.html

命令的方式

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
  • 1
  • 2
  • 3
  • 4

fraction/value 为内存阈值。默认情况是:0.4/2GB 代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。

通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效

使用配置文件的方式修改

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ju5Loj3A-1662295572466)(p\image-20220904163456880.png)]

内存换页

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p8xG6j8u-1662295572467)(p\image-20220904164600449.png)]

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/598264
推荐阅读
相关标签
  

闽ICP备14008679号