当前位置:   article > 正文

MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程_springboot2.x rabbitmq

springboot2.x rabbitmq

提示:以下是本篇文章正文内容,不足之处欢迎指出


一、MQ引言

1.1 什么是MQ

MQ (Message Quene):翻译为 消息队列 ,通过典型的 生产者消费者 模型生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松的实现系统间解耦。别名为消息中间件—通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些

当今市面上有很多主流的消息中间件,如老牌的 ActiveMQRabbitMQ,炙手可热的 Kafka,阿里巴巴自主开发 RocketMQ 等。

1.3 不同MQ特点

  1. ActiveMQ
    ActiveMQ是Apache出品,最流行的。能力强劲的开源消息总线。它是一个完全支持JMS规范的消息中间件。丰富的APT,多种集群架构模式让ActiveMQ流在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
  2. Kafka
    Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
  3. RocketMQ
    RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
  4. RabbitMQ
    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由〈包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

总结:RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性〈少量延迟),可靠性〔少量丢数据)要求稍低的场景使用,比如ELK日志收集。

二、RabbitMQ的引言

2.1 RabbitMQ

RabbitMQ,基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

2.2 RabbitMQ的安装

2.2.1 下载

官网下载地址:https://www.rabbitmq.com/download.html
在这里插入图片描述
我们直接选择Linux环境下进行安装,官网可能出现下载网速慢的情况,这里已经准备好了安装包和相关依赖

2.2.2 下载的安装包

在这里插入图片描述

2.2.3 安装

  1. 将rabbitMQ安装包以及依赖上传到Linux系统中
    在这里插入图片描述
  2. 安装Erlang依赖包
rpm -ivh erlang-22.0.7-1.e17.x86_64.rpm
(若安装过程中报错,显示libcrypto.so.10(OPENSSL_1.0.2)(64bit) is needed by erlang-22.0.7-1.el7.x86_64错误,则先安装openssl-libs-1.0.2k-19.el7.x86_64.rpm依赖,在进行erlang安装)
rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force
  • 1
  • 2
  • 3
  1. 安装RabbitMQ socket依赖和安装包
 rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
 rpm -ivh --nodeps rabbitmq-server-3.7.18-1.el7.noarch.rpm 
  • 1
  • 2

在这里插入图片描述

  1. 复制rabbitMQ配置文件
find / -name rabbitmq.config.example
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
  • 1
  • 2
  1. 修改配置文件
vim /etc/rabbitmq/rabbitmq.config
  • 1

在这里插入图片描述
修改为如下图所示
在这里插入图片描述

  1. 启动rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management

出现如下说明:
Enabling plugins on node rabbit@localhost:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

set 3 plugins.
Offline change; changes will take effect at broker restart.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  1. 启动RabbitMQ的服务
启动:systemctl start rabbitmq-server
重启:systemctl restart rabbitmq-server
停止:systemctl stop rabbitmq-server
  • 1
  • 2
  • 3
  1. 查看服务状态
systemctl status rabbitmq-server
  • 1
  1. 关闭防火墙服务
systemctl stop firewalld
  • 1
  1. 访问web管理页面:虚拟机 IP地址+端口号(首次登录时账号密码均为guest)
    在这里插入图片描述

三、RabbitMQ配置

3.1 RabbitMQ管理命令行

1. 服务启动相关
systemctl start|restart|stop status rabbitmq-server
2. 管理命令行 用来在不使用web管理界面情况下命令操作RabbitMQ
rabbitmqctl help  可以查看更多命令
3. 插件管理命令行
rabbitmqplugins enable|list|disable
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.2 web管理界面介绍

3.2.1 overview概览

在这里插入图片描述

connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才
可以完成消息的生产和消费,在这里可以查看连接情况`

channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

Exchanges:交换机,用来实现消息的路由

Queues:队列,即消息队列,消息存放在队列中,等待消费,
消费后被移除队列。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3.2.2 Admin用户和虚拟主机管理

  1. 添加用户
    在这里插入图片描述
上面的Tags选项,其实是指定用户的角色,可选的有以下几个:

超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,
策略(policy)进行操作。

监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息
(进程数,内存使用情况,磁盘使用情况等)

策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的
相关信息(上图红框标识的部分)。

普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

其他

无法登陆管理控制台,通常就是普通的生产者和消费者。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 创建虚拟主机
虚拟主机
为了让各个用户可以互不干扰的工作,RabbitMQ添加了
虚拟主机(Virtual Hosts)的概念。其实就是一个独立的
访问路径,不同用户使用不同路径,各自有自己的
队列、交换机,互相不会影响。相当于关系型中的数据库
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

  1. 虚拟主机绑定用户
    在这里插入图片描述
    在这里插入图片描述

四、RabbitMQ的第一个程序

4.1 AMQP协议的回顾

在这里插入图片描述

4.2 RabbitMQ支持的消息模型

可上官网查看具体详情
在这里插入图片描述
在这里插入图片描述

4.3 引入依赖

<!--引入rabbitmq的相关依赖-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4.4 第一种模型(直连)

在这里插入图片描述

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,
可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 开发生产者
public class Provider {

    // 生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置连接rabbitmq主机
        connectionFactory.setHost("192.168.88.100");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接哪个虚拟机
        connectionFactory.setVirtualHost("/emsVirtual");
        // 设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");

        // 获取连接对象
        Connection connection = connectionFactory.newConnection();
        // 获取连接中的通道
        Channel channel = connection.createChannel();

        // 通道绑定对应消息队列
        // 参数1:队列名称 如果队列不存在则自动创建
        // 参数2:用来定义队列特性是否需要持久化 true 持久化队列  false 不持久化
        // 参数3:exclusive:是否独占队列  true 独占队列 false 不独占
        // 参数4:autoDelete:是否在消费完成后自动删除队列  true 自动删除 false 不自动删除
        // 参数5:额外附加参数
        channel.queueDeclare("hello", false, false, false, null);

        // 发布消息
        // 参数1:交换机名称  参数2:队列名称  参数3:传递消息额外设置  参数4:消息的具体内容
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

        // 关闭通道
        channel.close();
        // 关闭连接
        connection.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

在这里插入图片描述

  1. 开发消费者
// 消费者
public class Customer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置连接rabbitmq主机
        connectionFactory.setHost("192.168.88.100");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接哪个虚拟机
        connectionFactory.setVirtualHost("/emsVirtual");
        // 设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");

        // 获取连接对象
        Connection connection = connectionFactory.newConnection();
        // 获取连接中的通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);

        // 消费消息
        // 参数1:消费哪个队列的消息 队列名称
        // 参数2:开始消息的自动确认机制
        // 参数3:消费时的回调接口
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            // 最后一个参数:消息队列中取出的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

在这里插入图片描述

  1. 封装工具类
public class RabbitMQUtils {

    private static ConnectionFactory connectionFactory;

    // 静态代码块,在类加载时执行,只执行一次
    static {
    	connectionFactory = new ConnectionFactory();
        // 设置连接rabbitmq主机
        connectionFactory.setHost("192.168.88.100");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接哪个虚拟机
        connectionFactory.setVirtualHost("/emsVirtual");
        // 设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");
    }

    // 定义提供连接对象的方法
    public static Connection getConnection() {
        try {
            // 获取连接对象
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    // 关闭通道和连接
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

4.2 第二种模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消耗速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
在这里插入图片描述

角色:
 	P:生产者:任务的发布者
 	C1:消费者1,领取任务并且完成任务,假设完成速度较慢
 	C2:消费者2,领取任务并且完成任务,假设完成速度快
  • 1
  • 2
  • 3
  • 4
  1. 开发生产者
public class Provider {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();

        // 通过通道声明队列
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 0; i < 10; i++) {
            // 生产消息
            channel.basicPublish("", "work", null, (i + "hello work quene").getBytes());
        }

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 开发消费者1
public class Customer1 {

    public static void main(String[] args) throws IOException {

        // 获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", true, false, false, null);

        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1-" + new String(body));
            }
        });

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 开发消费者2
public class Customer2 {

    public static void main(String[] args) throws IOException {

        // 获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", true, false, false, null);

        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2-" + new String(body));
            }
        });

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 测试结果
    在这里插入图片描述
    在这里插入图片描述
    总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

  2. 消息确认机制:
    完成一项任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始了一项长期任务,但只完成了一部分就死了,会发生什么情况。在我们当前的代码中,一旦RabbitMQ将消息传递给使用者,它就会立即将其标记为删除。在这种情况下,如果您杀死一个worker,我们将丢失它刚刚处理的消息。我们还将丢失发送给该特定工作进程但尚未处理的所有消息。但我们不想失去任何任务。如果一个worker死了,我们希望把任务交给另一个工人。

// 开发消费者1
public class Customer1 {

    public static void main(String[] args) throws IOException {

        // 获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 保证消息队列中每次只能处理一个
        channel.basicQos(1);
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1-" + new String(body));
                // 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确实
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
// 开发消费者2
public class Customer2 {

    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 保证消息队列中每次只能处理一个
        channel.basicQos(1);
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2-" + new String(body));
                //手动确认  参数1:手动确认消息标识  参数2:false 每次确认一个
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
设置通道一次只能消费一个消息
关闭消息的自动确认,开启手动确认消息
  • 1
  • 2

在这里插入图片描述
在这里插入图片描述

4.3 第三种模型(fanout)

fanout 扇出 也称为广播
在这里插入图片描述

在广播模式下,消息发送流程是这样的:

	1.可以有多个消费者
	2.每个消费者有自己的queue(队列)
	3.每个队列都要绑定到Exchange(交换机)
	4.生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
	5.交换机把消息发送给绑定过的所有队列
	6.队列的消费者都能拿到消息。实现一条消息被多个消费者消费
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 开发生产者
public class Provider {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();

        // 将通道声明指定交换机  // 参数1:交换机名称  参数2:交换机类型  fanout 广播类型
        channel.exchangeDeclare("logs", "fanout");
        // 发送消息
        channel.basicPublish("logs", "", null, "fanout type message".getBytes());
        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  1. 开发消费者1
public class Customer1 {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare("logs", "fanout");

        // 临时队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queueName, "logs", "");

        // 消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 开发消费者2
public class Customer2 {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare("logs", "fanout");

        // 临时队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queueName, "logs", "");

        // 消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
            }
        });
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 测试结果
    在这里插入图片描述

4.4 第四种模型(Routing)

4.4.1 Routing 之订阅模型-Direct(直连)

Fanout模式中。一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到 Direct 类型的 Exchange
  • 1
  • 2
Direct模型下:
	队列与交换机的绑定,不能在是任意绑定了,而是要指定一个 RoutingKey (路由key);
	消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKeyExchange不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 与消息的RoutingKey也完全一致,才会接收到消息。
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

图解:

	P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
	XExchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
	C1:消费者,其所在队列指定了需要routing key 为 error 的消息
	C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 开发生产者
public class Provider {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();
        // 通过通道声明交换机  参数1:交换机名称   参数2:direct   路由模式
        channel.exchangeDeclare("logs_direct", "direct");
        // 发送消息
        String routingkey = "info";
        channel.basicPublish("logs_direct", routingkey, null, ("这是direct模型发布的基于route key:[" + routingkey + "]发送的消息").getBytes());

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 开发消费者1
public class Customer1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机名称
        String exchangeName = "logs_direct";

        // 通道声明交换机以及交换的类型
        channel.exchangeDeclare("logs_direct", "direct");

        // 创建一个临时队列
        String queue = channel.queueDeclare().getQueue();

        // 基于route key绑定队列和交换机
        channel.queueBind(queue, exchangeName, "error");

        // 获取消费的消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  1. 开发消费者2
public class Customer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机名称
        String exchangeName = "logs_direct";

        // 通道声明交换机以及交换的类型
        channel.exchangeDeclare("logs_direct", "direct");

        // 创建一个临时队列
        String queue = channel.queueDeclare().getQueue();

        // 基于route key绑定队列和交换机
        channel.queueBind(queue, exchangeName, "info");
        channel.queueBind(queue, exchangeName, "error");
        channel.queueBind(queue, exchangeName, "warning");

        // 获取消费的消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  1. 测试生产者发送Route key为error的消息时
    在这里插入图片描述

  2. 测试生产者发送Route key为info的消息时
    在这里插入图片描述

4.4.2 Routing 之订阅模型-Topic

Topic 类型的 ExchangeDirect 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 RoutingKey 的时候使用通配符!这种模型 RoutingKey 一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:‘item.insert’
在这里插入图片描述

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
		# (hash) can substitute for zero or more words.  匹配零个、一个或多个词
# 如:
		audit.#    匹配audit、audit.irs 、或者audit.irs.corporate等
    	audit.*   只能匹配 audit.irs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 开发生产者
public class Provider {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();
        // 通过通道声明交换机以及交换机类型  参数1:交换机名称   参数2:topic
        channel.exchangeDeclare("topics", "topic");
        // 发送消息
        String routingkey = "user.save";
        channel.basicPublish("topics", routingkey, null, ("这是topic模型发布的基于route key:[" + routingkey + "]发送的消息").getBytes());

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  1. 开发消费者1 (Routing Key中使用*通配符方式)
// Routing Key中使用*通配符方式
public class Customer1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 通道声明交换机以及交换的类型
        channel.exchangeDeclare("topics", "topic");

        // 创建一个临时队列
        String queue = channel.queueDeclare().getQueue();

        // 基于route key绑定队列和交换机  动态通配符形式
        channel.queueBind(queue, "topics", "user.*");

        // 获取消费的消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  1. 开发消费者2 (Routing Key中使用#通配符方式)
// Routing Key中使用#通配符方式
public class Customer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 通道声明交换机以及交换的类型
        channel.exchangeDeclare("topics", "topic");

        // 创建一个临时队列
        String queue = channel.queueDeclare().getQueue();

        // 基于route key绑定队列和交换机  动态通配符形式
        channel.queueBind(queue, "topics", "user.#");

        // 获取消费的消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
            }
        });
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 测试结果
    在这里插入图片描述
    在这里插入图片描述
    总结:Routing Key中使用 user.# 通配符方式时,代表包含user(如:user.save.delete,save.user)的都可以匹配,且长度不限;Routing Key中使用 user.* 通配符方式时,代表长度为2(如:user.save)的routingkey,且第一位只能为user。

五、SpringBoot中使用RabbitMQ

5.1 搭建初试环节

  1. 引入依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  1. 配置配置文件
spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 192.168.88.100
    port: 5672
    username: ems
    password: 123456
    virtual-host: /emsVirtual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5.2 hello world模型使用

  1. 开发生产者
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // hello world
    @Test
    public void test01() {
        rabbitTemplate.convertAndSend("hello","hello world");
    }
    // 生产端没有指定交换机只有routingKey和Object。
	//消费方产生hello队列,放在默认的交换机(AMQP default)上。
	//而默认的交换机有一个特点,只要你的routerKey的名字与这个
	//交换机的队列有相同的名字,他就会自动路由上。 
	//生产端routingKey 叫hello ,消费端生产hello队列。
	//他们就路由上了
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  1. 开发消费者
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 测试结果
    在这里插入图片描述

5.3 work模型使用

  1. 开发生产者
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // work
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", i + " work模型");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 开发消费者
@Component
public class WorkCustomer {

    // 消费者1
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message) {
        System.out.println("messge1 = " + message);
    }

    // 消费者1
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message) {
        System.out.println("messge2 = " + message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 测试结果
    在这里插入图片描述
    说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要外配置

5.4 Fanout 模型使用

  1. 开发生产者
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // fanout 广播
    @Test
    public void testFanout() {
        rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的消息");
    }
}    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  1. 开发消费者
@Component
public class FanoutCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  // 创建临时队列
                    exchange = @Exchange(value = "logs" , type = "fanout")  // 绑定的交换机
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " +message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  // 创建临时队列
                    exchange = @Exchange(value = "logs" , type = "fanout")  // 绑定的交换机
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " +message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 测试结果
    在这里插入图片描述

5.5 Route模型使用

  1. 开发生产者
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //route 路由模式
    @Test
    public void testRoute() {
        rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  1. 开发消费者
@Component
public class RouteCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  // 创建临时队列
                    exchange = @Exchange(value = "directs", type = "direct"),  //自定义交换机名称和类型
                    key = {"info", "error", "warning"}
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  // 创建临时队列
                    exchange = @Exchange(value = "directs", type = "direct"),  //自定义交换机名称和类型
                    key = {"info"}
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 测试结果
    发送info的key的路由信息
    在这里插入图片描述

    发送error的key的路由信息
    在这里插入图片描述

5.6 Topic模型使用

  1. 开发生产者
@SpringBootTest(classes = RabbitMqDemo01Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //topic 动态路由 订阅模式
    public void testTopic() {
        rabbitTemplate.convertAndSend("topics", "suer.save", "user.save 路由消息");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 开发消费者
@Component
public class TopicCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(name = "topics", type = "topic"),
                    key = {"user.save", "user.*"}
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(name = "topics", type = "topic"),
                    key = {"order.#", "product.#", "user.*"}
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 测试结果
    user.save 路由消息
    在这里插入图片描述

    order.save 路由消息
    在这里插入图片描述

六、SpringBoot中使用RabbitMQ

6.1 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种。1、串行的方式 2、并行的方式

  • 串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件、短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
    在这里插入图片描述

  • 并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式可能提高处理的时间。
    在这里插入图片描述

  • 消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高了处理时间,但是,邮件和短信对我们正常使用没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回,消息队列:引入消息队列后,把发送邮件、短信不是必须的业务逻辑异步处理。
    在这里插入图片描述

6.2 应用解耦

场景:双11是购物狂阶,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
在这里插入图片描述
这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合,引入消息队列
在这里插入图片描述

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
  • 库存系统:订阅下单的消息,获取下单消息,进行库操作。就算库存系统出现故障,消息队列也能保证消息的可靠传递,不会导致消息丢失。

6.3 流量削峰

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:

  1. 可以控制活动人数,超过一定阈值的订单直接丢弃
  2. 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
    在这里插入图片描述
    用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。
    秒杀业务根据消息队列中的请求信息,在做后续处理。

至此,RabbitMQ的学习以及整合SpringBoot已经完成!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/659602
推荐阅读
相关标签
  

闽ICP备14008679号