当前位置:   article > 正文

RabbitMQ实战教程

rabbitmq实战

RabbitMQ实战教程

1.MQ引言

1.1 什么是MQ

MQ (Message Queue): 翻译为消息队列,通过典型的生产者消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为消息中间件通过利用 高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2MQ有哪些

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ, 炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

1.3 不同MQ的特点

1. ActiveMQ
    ActiveMQ是Apache出品, 最流行的,能力强劲的开源消息总线。它是一-个完全 支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎! ! 吞吐量不高。
2. Kafka
	Kafka是L inkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特 点是基于Pu11的模式来处理消息消费,
追求高吞吐量,一开始的目的就是用于日志收集和传输。θ .8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
优点:拥有强大的性能及吞吐量,兼容性很好
缺点:由于“攒一波再处理”导致延迟比较高,有可能消息重复消费
3. RocketMQ
	RocketMQ是阿里开源的消息中间件,它是纯Java开发, 具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起
源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、bing1og分发等场景。收费。
优点:性能好,稳定可靠,有活跃的中文社区,特点响应快
缺点:兼容性较差,但随意影响力的扩大,该问题会有改善
4. RabbitMQ
	Rabbi tMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和
发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其
次。 一致 稳定 可靠性 

优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置
缺点:性能和吞吐量较差,不易进行二次开发
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
	RabbitMQ比Kafka可靠,Kafka更适 合10高吞吐的处理,一-般应用在大数据 日志处理或对实时性(少量延迟),可靠性 (少量丢数据)要求稍低的场景使用,比如ELK日志收集。
  • 1

2.RabbitMQ的引言

2.1 RabbitMQ

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
  • 1

官网地址: http://www.rabbitmq.com/

image-20200703172326906

AMQP协议
	AMQP (advanced message queuing protocol)、 在2003年时被提出, 最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一 种协议,更准确的说是一种binary wire-level protocol (链接协议)。这是其和JMS的本质差别,AMQP不认API层进行限定, 而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
  • 1
  • 2

image-20200703172628141

2.2RabbitMQ的安装

2.2.1 下载

官网地址: http://www.rabbitmq.com/

安装借鉴

2.2.2 下载的安装包

image-20200703173518971

2.2.3 安装的步骤

https://blog.csdn.net/summation/article/details/100055809

# 1.将rabbitmq安装包 上传到1inux系统中
erlang-22.0.7-1.e17 .x86. _64. rpm
rabbitmq-server-3.7. 18-1. el7.noarch. rpm 
# 2.安装Erlang依赖包
rpm -ivh erlang-22.0.7-1.el7.x86. 64. rpm
# 3.安装Rabbi tMQ安装包(需要联网)
yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm 
注意:默认安装完成后配置文件模板在:/usr /share/doc/ rabbi tmq-server-3.7.18/ rabbi tmq . config. example目录中,需要将配置文件复制到/etc/rabbi tmq/目录中,并修改名称为r abbi tmq . config
# 4.复制配置文件
cCp /usr/share/doc/ rabbi tmq-seFver-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 5.查看配置文件位置
1s /etc/ rabbitmq/rabbitmq.config
# 6.修改配置文件(参见下图:)
vim /etc/rabbitmq/ rabbitmq.config
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

image-20200703173917346

将上图中配置文件中的红色部分去掉%%,以及最后的,逗号,修改为下图:

image-20200703174153652

7.执行如下命令,启动rabbitmq中的插件管理

rabbitmq-plugins enable rabbitmq. management

出现如下说明:

Enabling plugins on node rabbit@localhost:

rabbitmq_ management

The following plugins have been configured:

rabbitmq. management

rabbitmq. management_ agent

rabbitmq_ web_ dispatch

Applying plugin configuration to rabbit@localhost...

The following plugins have been enabled:

rabbitmq_ management

rabbitmq. management _ agent

rabbitmq_ _web_ dispatch

set 3 plugins. 

offline change; changes will take effect at broker restart .

8.启动RabbitMQ的服务

systemctl start rabbitmq-server

出现如下说明:
Enabling plugins on node rabbit@localhost:
r abbitmq_ management
The following plugins have been conf igured:
rabbi tmq. management
rabbi tmq. management_ agent
rabbitmq_ web_ dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
r abbitmq_ management
rabbi tmq. management _ agent
rabbitmq_ _web_ dispatch
set 3 plugins. 
offline change; changes will take effect at broker restart .

8.启动RabbitMQ的服务

systemctl start rabbitmq-server

systemctl restart rabbitmq- server

systemctl stop rabbit- server

9.查看服务状态(见下图:)

systemctl status rabbi tmq-server
●rabbitmq-server . service - RabbitMQ broker
Loaded: loaded ( /usr /1ib/systemd/ sys tem/ r abbitmq-server . service; disabled; vendor preset: disabled)
Active: active (running) since 三2019-09-25 22:26:35 CST; 7s ago
Main PID: 2904 (beam. smp)
Status:” Initialized"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

image-20200703175132485

10.关闭防火墙服务

systemctl disable firewalld,

Removed symlink /etc/ systemd/ system/multi-user . target . wants/firewalld. service. 

Removed symlink /etc/systemd/system/ dbus-org. fedoraproject . FirewallD1. service .

systemctl stop firewalld

11.访问web管理界面

http://自己的IP:15672/

用户名 密码 都是 guest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3.RabbitMQ配置

3.1 RabbitMQ管理命令行

# 1.服务启动相关
systemctl startlrestart|stop Istatus rabbitmq-server
# 2.管理命令行用来在不使用web管理 界面情况下命令操作RabbitHQ
rabbitmqctl help 可以查看更多命令
# 3.插件管理命令行
rabbitmqplugins enable llist Idisable
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

image-20200704095240139

4.RibbitMQ的第一个程序

4.1 RabbitMQ支持的消息模型:

image-20200704100229979

image-20200704100243607

image-20200704100252952

3.7.x 支持1 -6 3.5.x 支持1-5 3.8.x 支持1-7

4.2 引入依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.2</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
public class RabbitMQUtils {

    private static ConnectionFactory connectionFactory;

    static {
        //重量资源 类加载只执行一次
        connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq的主机
        connectionFactory.setHost("39.96.42.195");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户名
        connectionFactory.setUsername("ems");
        //设置访问虚拟主机的密码
        connectionFactory.setPassword("123");
    }
    //定义提供连接对象的方法
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接工具的方法
    public static void closeConnectionAndChannel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
第一种模型(直连)

image-20200704100631368

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

1.开发生产者
        //1. 创建一个连接mq的connectionFactory连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置信息
        //设置连接rabbitmq的主机
        connectionFactory.setHost("39.96.42.195");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户名
        connectionFactory.setUsername("ems");
        //设置访问虚拟主机的密码
        connectionFactory.setPassword("123");
        //2.通过连接工厂创建连接 获取连接对象
        Connection connection = connectionFactory.newConnection();
        //3 获取连接中的通道
        Channel channel = connection.createChannel();
        //4 通过channel 发送数据
        String message = "hello RabbitMQ";
        /**
         * 通道绑定对应的消息队列
         * @see com.rabbitmq.client.AMQP.Queue.Declare
         * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
         * @param queue 队列的名称,如果不存在要自动创建
         * @param durable 用来定义队列的特性是否要持久化 true 持久化队列  false 不持久化队列
         * @param exclusive 是否独占队列 true 独占队列 false 不独占
         * @param autoDelete 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
         * @param arguments 额外的附加参数
         * @return a declaration-confirm method to indicate the queue was successfully declared
         * @throws java.io.IOException if an error is encountered
         */
        channel.queueDeclare("hello", false, false, false, null);
        /**
         *发布消息
         * @param exchange 交换机名称
         * @param routingKey 路由键
         * @param props 传递消息的额外设置
         * @param body 消息的具体内容
         * @throws java.io.IOException if an error is encountered
         */
        channel.basicPublish("", "hello", null, message.getBytes());
        //关闭连接
        channel.close();
        connection.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
2.消费者
        //1. 创建一个connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置信息
        connectionFactory.setHost("39.96.42.195");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");
        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3 通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String queueName = "hello";
        channel.queueDeclare(queueName, false, false, false, null);
        /**
         * 消费消息
         * @param queue 消费哪个队列的消息 队列名称
         * @param autoAck 开启消息的队列确认机制
         * @param callback 消费时的回调接口
         * @return the consumerTag generated by the server
         */
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            //最后一个参数: 消息队列中取出的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body) = " + new String(body));
            }
        });
        //关闭(因为一直监听,不建议关闭)
//        channel.close();
//        connection.close();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
第二种模型:

Work queues, 也被称为(Task queues) ,任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:**让多个消费者绑定到一一个队列,共同消费队列中的消息。**队列中的消息一旦消费, 就会消失,因此任务是不会被重复执行的。

image-20200704124506431

角色:
●P:生产者:任务的发布者
●C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
●C2:消费者-2:领取任务并完成任务,假设完成速度快
  • 1
  • 2
  • 3
  • 4
生产者:
    public static void main(String[] args) throws IOException {
        //获取链接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取通道对象
        Channel channel = connection.createChannel();
        //通过通道声明队列
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 1; i <= 20; i++) {
            //生产消息
            channel.basicPublish("", "work", null, (i + "hello work queue").getBytes());
        }

        //关闭资源
        RabbitMQUtils.closeConnectionAndChannel(channel, connection);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
消费者-1:
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String queueName = "work";
        channel.queueDeclare(queueName, true, false, false, null);
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1: " + new String(body));
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
消费者-2:
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String queueName = "work";
        channel.queueDeclare(queueName, true, false, false, null);
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-2: " + new String(body));
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
测试结果:

image-20200704130822474

image-20200704130851957

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
  • 1
消息确认机制

image-20200704134427067

变动代码
        //每次只能消费一个消息
        channel.basicQos(1);
       // 消息自动确认true 消费者自动向rabbitmq确认消息消费 false 不会自动确认消息
     	channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                @Override
            	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 											byte[] body) throws IOException {
                    System.out.println("消费者-1: " + new String(body));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //手动确认 参数1: 队列中的那个具体消息 手动确认消息标识 参数2 false 每次确认一个 是否开启多个消息同时确认
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
       	
       	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

image-20200704140200515

第三种模型(fanout)

fanout 扇出 也称为广播

image-20200704142741626

在广播模式下,消息发送流程是这样的:

●可以有多个消费者

●每个消费者有自己的queue (队列)

●每个队列都要绑定到Exchange (交换机)

●生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

●交换机把消息发送给绑定过的所有队列

●队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者
   public static void main(String[] args) throws IOException {
        //获取链接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取通道对象
        Channel channel = connection.createChannel();
        /**
         * 通过通道声明交换机
         * @param exchange 交换机名称
         * @param type 交换机类型    fanout 广播类型
         * @return a declaration-confirm method to indicate the exchange was successfully declared
         * @throws java.io.IOException if an error is encountered
         */
        String exchangeName = "logs";
        channel.exchangeDeclare(exchangeName, "fanout");
        //发送消息
        channel.basicPublish("logs","",null,"fanout type message".getBytes());
        //关闭资源
        RabbitMQUtils.closeConnectionAndChannel(channel, connection);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
消费者-1:
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String exchangeName = "logs";
        channel.exchangeDeclare(exchangeName,"fanout");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,exchangeName,"");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
消费者-2
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String exchangeName = "logs";
        channel.exchangeDeclare(exchangeName,"fanout");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,exchangeName,"");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-2: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
消费者-3
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String exchangeName = "logs";
        channel.exchangeDeclare(exchangeName,"fanout");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,exchangeName,"");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-3: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
运行结果

image-20200704143014343

image-20200704143025655

image-20200704143036907

第四种模型(Routing)

Routing之订阅模型-Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange.

在Direct模型下:

●队列与交换机的绑定,不能是任意绑定了,而是要指定-个RoutingKey (路由key)

●消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey 。

●Exchange不再把消息交给每一 个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全- 致,才会接收到消息

流程:

image-20200704143541764

图解:

●P:生产者,向Exchange发送消息, 发送消息时,会指定一个routing key。

●X: Exchange (交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列

●C1:消费者,其所在队列指定了需要routing key为error的消息

●C2:消费者,其所在队列指定了需要routing key为info、 error, warning的消息

生产者:
    public static void main(String[] args) throws IOException {
        //获取链接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取通道对象
        Channel channel = connection.createChannel();
        /**
         * 通过通道声明交换机
         * @param exchange 交换机名称
         * @param type 交换机类型    direct 路由模式
         * @return a declaration-confirm method to indicate the exchange was successfully declared
         * @throws IOException if an error is encountered
         */
        String exchangeName = "logs_direct";
        channel.exchangeDeclare(exchangeName, "direct");
        //发送消息
        String routingKey = "error";
        channel.basicPublish(exchangeName,routingKey,null,("这是direct模型发布的基于route key: [" + routingKey + "] 发送的消息").getBytes());
        //关闭资源
        RabbitMQUtils.closeConnectionAndChannel(channel, connection);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
消费者-1:
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 声明交换机
        String exchangeName = "logs_direct";
        channel.exchangeDeclare(exchangeName,"direct");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,exchangeName,"error");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
消费者-2:
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String exchangeName = "logs_direct";
        channel.exchangeDeclare(exchangeName,"direct");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,exchangeName,"info");
        channel.queueBind(queueName,exchangeName,"error");
        channel.queueBind(queueName,exchangeName,"warning");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-2: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
运行结果:

image-20200704151118660

image-20200704151132837

第五种模型(Topic)

Routing之订阅模型Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey -般都是由一个或多个单词组成,多个单词之间以 ”.” 分割,例如: item . insert

image-20200704151529856

#通配符
* (star) can substitute for exactly one word.	匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配一 个或多个词
#如
audit.# 	匹配audit. irs . corporate或者audit.irs 等
audit.*		只能匹配audit.irs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生产者:

    public static void main(String[] args) throws IOException {
        //获取链接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取通道对象
        Channel channel = connection.createChannel();
        /**
         * 通过通道声明交换机
         * @param exchange 交换机名称
         * @param type 交换机类型    topic
         * @return a declaration-confirm method to indicate the exchange was successfully declared
         * @throws IOException if an error is encountered
         */
        String exchangeName = "topics";
        channel.exchangeDeclare(exchangeName, "topic");
        //发送消息
        String routingKey = "user.save.delete";
        channel.basicPublish(exchangeName,routingKey,null,("这是topic动态路由模型发布的基于route key: [" + routingKey + "] 发送的消息").getBytes());
        //关闭资源
        RabbitMQUtils.closeConnectionAndChannel(channel, connection);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

消费者-1:

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 声明交换机
        String exchangeName = "topics";
        channel.exchangeDeclare(exchangeName,"topic");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列 动态通配符形式 route key
        channel.queueBind(queueName,exchangeName,"user.*");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

消费者-2:

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //通过 connection 创建一个 channel
        Channel channel = connection.createChannel();
        // 通道绑定对象
        String exchangeName = "topics";
        channel.exchangeDeclare(exchangeName,"topic");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,exchangeName,"user.#");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-2: " + new String(body));
            }
        });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

运行结果:

image-20200704153704303

image-20200704153716330

5. SpringBoot整合RabbitMQ

5.0 搭建初始环境

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4

2.配置配置文件

spring:
  application:
    name: springboot-rabbitmq
  rabbitmq:
    host: 39.96.42.195
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

RabbitTemplate 用来简化操作 使用时候直接在项目中注入即可使用

注意 : 队列只会在消费端创建

第一种模型
生产者
    @Test
    public void testHello() {
        rabbitTemplate.convertAndSend("hello", "hello world");
    }
  • 1
  • 2
  • 3
  • 4
消费者
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloConsumer {
    @RabbitHandler
    public void receive(String message){
        System.out.println("message = " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
第二种模型:
生产者
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work模型");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
消费者
@Component
public class WorkConsumer {
    //消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    //消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
第三种模型
生产者
    @Test
    public void testFanout() {
        rabbitTemplate.convertAndSend("logs","","fanout的模型发送的消息");
    }
  • 1
  • 2
  • 3
  • 4
消费者
@Component
public class FanoutConsumer {
    //消费者
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout")     //绑定交换机
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    //消费者
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout")     //绑定交换机
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
第四种模型
生产者
    @Test
    public void testRoute(){
        rabbitTemplate.convertAndSend("directs","error","发送info的路由信息");
    }
  • 1
  • 2
  • 3
  • 4
消费者
@Component
public class RouteConsumer {
    //消费者
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),     //绑定交换机
                    key = {"info","error","warn"} //路由信息
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    //消费者
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),     //绑定交换机
                    key = {"error"} //路由信息
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
第五种模型
生产者
    @Test
    public void testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save 路由消息");
    }
  • 1
  • 2
  • 3
  • 4
消费者
@Component
public class TopicConsumer {
    //消费者
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(name = "topics",type = "topic"),     //绑定交换机
                    key = {"user.save","user.*"} //路由信息
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    //消费者
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(name = "topics",type = "topic"),     //绑定交换机
                    key = {"order.#","product.#","user.*"} //路由信息
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

6.MQ的应用场景

6.1异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1 .串行的方式 2 .并行的方式

  • 串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有-一个问题是,邮件,短信并不是必须的,它只是-一个通知,而这种做法让客户端等待没有必要等待的东西.

    image-20200704164819430

  • 并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

    image-20200704164859010

  • 消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.消息队列:引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

image-20200704165014013

  • 由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

6.2 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

image-20200704165158967

这种做法有一个缺点:

当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合.引入消息队列

image-20200704165241998

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统:订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.

6.3流量削峰

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:

1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀-次都没有 成功过呢^^)

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

image-20200704170000713

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,,则直接抛弃用户请求或跳转到错误页面.

2.秒杀业务根据消息队列中的请求信息,再做后续处理.

7. RabbitMQ的集群

7.1集群架构

7.1.1普通集群(副本集群)
All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, thoughthey are visible and reachable from all nodes. To replicate queues across nodes in a cluster --摘自 官网
  • 1

默认情况下: RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问

image-20200704172512603

核心解决问题:当集群中的某一时刻master节点宕机,可以对Queue中的信息,进行备份

7.1.2镜像集群
This guide covers mirroring (queue contents replication) of classic queues -摘自官网
By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings,which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. -摘自官网
  • 1
  • 2

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性。

image-20200704175457891

为什么使用MQ?MQ的优点

简答

  • 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
  • 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
  • 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
  • 日志处理 - 解决大量日志传输。
  • 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

详答

主要是:解耦、异步、削峰。

解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。

削峰:减少高峰时期对服务器压力。

消息队列有什么优缺点?RabbitMQ有什么优缺点?

优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

缺点有以下几个:

系统可用性降低

本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;

系统复杂度提高

加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

一致性问题

A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

你们公司生产环境用的是什么消息中间件?

这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。

举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。

但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。

然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。

另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。

而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。

除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。

但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。

而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。

另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。

但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。

因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

image-20200810092430835

image-20200810092447249

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

MQ 有哪些常见问题?如何解决这些问题?

MQ 的常见问题有:

  1. 消息的顺序问题
  2. 消息的重复问题

消息的顺序问题

消息有序指的是可以按照消息的发送顺序来消费。

假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?

image-20200810092724416

解决方案:

(1)保证生产者 - MQServer - 消费者是一对一对一的关系

image-20200810092802302

缺陷:

  • 并行度就会成为消息系统的瓶颈(吞吐量不够)
  • 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。 (2)通过合理的设计或者将问题分解来规避。
  • 不关注乱序的应用实际大量存在
  • 队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。

消息的重复问题

造成消息重复的根本原因是:网络不可达。

所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

什么是RabbitMQ?

RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件

rabbitmq 的使用场景

(1)服务间异步通信

(2)顺序消费

(3)定时任务

(4)请求削峰

RabbitMQ基本概念

  • Broker: 简单来说就是消息队列服务器实体
  • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
  • Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
  • Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
  • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
  • VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
  • Producer: 消息生产者,就是投递消息的程序
  • Consumer: 消息消费者,就是接受消息的程序
  • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

RabbitMQ的工作模式

一.simple模式(即最简单的收发模式)

image-20200810093721286

1.消息产生消息,将消息放入队列

2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

二.work工作模式(资源的竞争)

image-20200810093504830

1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

三.publish/subscribe发布订阅(共享资源)

image-20200810093649110

1、每个消费者监听自己的队列;

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

四.routing路由模式

image-20200810093813171

1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

2.根据业务功能定义路由字符串

3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

五.topic 主题模式(路由模式的一种)

image-20200810093850401

1.星号井号代表通配符

2.#号代表多个单词,* 号代表一个单词

3.路由功能添加模糊匹配

4.消息产生者产生消息,把消息交给交换机

5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)

如何保证RabbitMQ消息的顺序性?

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

消息如何分发?

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能

消息怎么路由?

消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种:

fanout:如果交换器收到消息,将会广播到所有绑定的队列上

direct:如果路由键完全匹配,消息就被投递到相应的队列

topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

消息基于什么传输?

由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;

但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;

假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

发送方确认模式

将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。

一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

这里并没有用到超时机制,RabbitMQ **仅通过 Consumer 的连接中断来确认是否需要重新发送消息。**也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

如何保证RabbitMQ消息的可靠传输?

消息不可靠的情况可能是消息丢失,劫持等原因;

丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;

生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;

rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;

如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

消息队列丢数据消息持久化。

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。

这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。

这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢?

这里顺便说一下吧,其实也很容易,就下面两步

  1. 将queue的持久化标识durable设置为true,则代表是一个持久的队列
  2. 发送消息的时候将deliveryMode=2

这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

为什么不应该对所有的 message 都使用持久化机制?

首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。

其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。

所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

如何保证高可用的?RabbitMQ 的集群

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

消息积压处理办法:临时紧急扩容:

先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

设计MQ思路

比如说这个消息队列系统,我们从以下几个角度来考虑一下:

首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。

其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。

能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。

保证消息的幂等性

https://www.huaweicloud.com/articles/42c04053358a8179df34eaa40987e3e1.html

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号