当前位置:   article > 正文

RabbitMQ详解_rabbitmq控制台如何看

rabbitmq控制台如何看

RabbitMQ

学习目标

1.MQ相关概念
2.RabbitMQ的安装和配置
3.RabbitMQ入门程序
4.RabbitMQ的工作模式
5.Spring整合RabbitMQ
6.SpringBoot整合RabbitMQ
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

第一章 MQ相关概念

1.1 节 什么是MQ

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息;MQ多用于分布式系统之间进行通讯。

分布式系统有两种通讯方式:

  • 远程调用
  • 消息中间件

两种通讯方式过程如图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-S6yDBciX-1692091646666)(./assets/1.png)]

小结

  1. MQ消息队列,存储消息的中间件
  2. 分布式系统通讯方式
    1. 直接远程调用
    2. 消息中间件
  3. 发送消息的称为生产者,接收消息称为消费者

1.2 节 MQ的优势

MQ的优势主要有三点:

  1. 应用解耦
  2. 异步提速
  3. 削峰填谷
1.2.1 应用解耦

分布式系统在使用直接远程调用时通讯过程如下:
在这里插入图片描述

在订单系统中直接远程调用库存系统、支付系统、物流系统会使得订单系统的耦合度非常高,如果在加一个其他系统的话,必须在订单系统中修改代码,这也使得订单系统的可维护性很差。

另一方面如果库存系统挂掉的话,订单系统也会随着挂掉,这使得订单系统的容错性极差。

如果加入MQ消息中间件可以大大改善上面的问题,如下图所示:

在这里插入图片描述

一方面使得订单系统和其他系统得到解耦,另一方面如果库存系统服务器挂了并不影响订单系统,只要等到库存系统服务器恢复后在执行MQ消息就好了;再一方面如果加一个其他系统只要在读取MQ消息就可以了;总体来说让系统解耦,提供系统可维护性,提供系统容错性。

1.2.2 异步提速

接下来再从访问速度角度来看分布式系统直接远程调用过程,如下图所示:

在这里插入图片描述

总的下单时间=100MS+100MS+100MS+30MS=330MS,这时间也太长了吧,响应速度太慢。

如果加入MQ消息中间件,我们发现服务器的响应速度会得到大大提升,如下图所示:

在这里插入图片描述

总的下单时间=3ms+30ms=33ms,服务器的响应速度得到了大大提升,总的下单时间和其他系统得到消息时间以及其他系统运行时间无关,只要订单系统运行完毕并发送完消息就可以响应给客户了;大大增加了用户体验和系统吞吐量(单位时间内处理请求的数目)。

1.2.3 削峰填谷

最后我们来看削峰填谷情况,如下图所示:

在这里插入图片描述

使用MQ就能解决这个问题,过程如下:

在这里插入图片描述

MQ起到了"削峰填谷"的作用,如下所示

在这里插入图片描述

MQ 限制了消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被"削"掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压消息,这就叫做"填谷"。

1.2.4 小结
  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

1.3 节 MQ的劣势

接下来我们通过下图展示MQ的劣势:

在这里插入图片描述

  • 系统的可用性降低:系统引入的依赖越多,系统稳定性越差,一旦MQ宕机,就会对业务造成影响,如何保证MQ的高可用。
  • 系统复杂性提高:MQ的加入增加了系统的复杂度,以前系统是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
  • 一致性问题:A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,如何保证消息数据处理的一致性?

1.4 MQ应用条件

既然MQ有优势也有劣势,那么MQ的应用条件应满足什么?

  1. 生产者不需要从消费者获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完继续往后走,即所谓异步称为了可能。

  2. 允许短暂的不一致性。

  3. 确实是用了有效果。即解耦,提速,削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

1.5节 MQ常见产品

目前市面上MQ产品较多,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka等,也有直接使用redis充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义自定义协议,社区封装了http协议支持
客户端语言支持官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言Java,C,C++,Python,PHP,Perl,.net等Java,C++(不成熟)官方支持Java,社区产出多种API,如PHP,Python等
单机吞吐量万级(其次)万级(最差)十万级(最好)十万级(次之)
消息延迟微秒级毫秒级毫秒级毫秒以内
功能特性并发能力强,性能积极好,延时低,社区活跃,管理界面丰富老牌产品,成熟度高,文档较多MQ功能比较完备,扩展性佳只支持主要的MQ功能,毕竟是为大数据领域准备的。

1.6节 RabbitMQ简介

1.6.1 AMQP 协议

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件限制。2006年,AMQP规范发布。协议内容如下图所示:

在这里插入图片描述

  • Publisher: 消息生产者,负责将消息发送给MQ消息队列;
  • Exchange:交换机,负责颁发消息到Queue;
  • Queue:存储消息的队列;
  • Consumer:消息消费者,负责从队列中获取消息
1.6.2 RabbitMQ 介绍

2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0发布。RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

RabbitMQ 基础架构图如下图:

在这里插入图片描述

RabbitMQ相关概念:

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
  • Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟分组中,类似于网络中namespace概念。当多个不同的用户使用同一个RabbitMQ server提供服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
  • Connection:publisher/consumer和broker之间的TCP链接。
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑链接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了 channel id 帮助客户端和message broker 识别channel,所以channel之间是完全隔离的,Channel作为轻量级的Connection极大较少了操作系统建立TCP connection 的开销。
  • Exchange:message 到达broker的第一站,根据分发规则,匹配查询表中的routing key , 分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe)and fanout(multicast) 。
  • Queue:消息最终被送到这里等待consumer取走。
  • Binding :exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
1.6.3 RabbitMQ的工作模式

RabbitMQ 提供了6种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式,Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ;暂不介绍)。官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

在这里插入图片描述

在这里插入图片描述

1.6.4 JMS
  • JMS即 Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
  • JMS 是 JavaEE 规范的一种。
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS实现包,但是开源社区有。
1.6.5 小结
  1. RabbitMQ 是基于AMQP协议使用Erlang语言开发的一款消息队列产品。
  2. RabbitMQ提供了6种工作模式,我们学习5种。
  3. AMQP是协议,JMS是JavaEE规范接口。

第二章 RabbitMQ安装

RabbitMQ 官方地址:http://www.rabbitmq.com/,访问该网址可以查看到RabbitMQ安装教程。

2.1 节 RabbitMQ Linux 系统安装

【1】安装依赖环境:在线安装依赖环境

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

  • 1
  • 2

【2】安装Erlang:上传

erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm

# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

  • 1
  • 2
  • 3

如果出现如下错误

在这里插入图片描述

说明gblic 版本太低。我们可以查看当前机器的gblic 版本

strings /lib64/libc.so.6 | grep GLIBC
  • 1

在这里插入图片描述

当前最高版本2.12,需要2.15.所以需要升级glibc

使用yum更新安装依赖

sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
  • 1

下载rpm包

wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

安装rpm包

sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
  • 1

安装完毕后再查看glibc版本,发现glibc版本已经到2.17了

strings /lib64/libc.so.6 | grep GLIBC
  • 1

在这里插入图片描述

【3】安装RabbitMQ

# 安装
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm

# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

【4】开启管理界面及配置

# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

【5】启动

service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
  • 1
  • 2
  • 3

设置配置文件

cd /usr/share/doc/rabbitmq-server-3.6.5/

cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

  • 1
  • 2
  • 3
  • 4

2.2 节 基于Doker 安装RabbitMQ

我们在Centos7虚拟机中使用Docker来安装。

【1】下载镜像

在线拉取

docker pull rabbitmq:3-management
  • 1

使用命令加载镜像即可:

docker load -i mq.tar
  • 1

【2】安装MQ,执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=guest \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

第三章 RabbitMQ 配置

3.1 节 管理页面介绍

首先我们来认识一下RabbitMQ管理页面的整体布局,如下图所示:

3.2 节 添加用户

接下来点击Admin->Add a User 按如下图操作添加一个用户

在这里插入图片描述

3.3 节 添加虚拟机

接下来,我们添加一个虚拟机,虚拟机都是以/开头,我们创建/psjjmq虚拟机,如下所示:

在这里插入图片描述

最后,我们为虚拟机分配用户权限,将/psjjmq虚拟机分配给psjj用户,如下所示:

在这里插入图片描述

最终结果如下:

在这里插入图片描述

3.4 节 复制配置文件

打开概述页面,我们发现没有rabbitmq.config配置文件,如下图所示:

在这里插入图片描述

接下来我们复制配置文件,跳转rabbitmq默认安装目录

cd /usr/share/doc/rabbitmq-server-3.6.5/
  • 1

将目录下的rabbitmq.config.example文件复制到/etc/rabbitmq/rabbitmq.config中,如下所示:

cp ./rabbitmq.config.example  /etc/rabbitmq/rabbitmq.config

  • 1
  • 2

在这里插入图片描述

最后重启rabbitmq服务

service rabbitmq-server restart
  • 1

问题解决了,最终结果如下:
在这里插入图片描述

第四章 RabbitMQ 入门

入门案例需求:使用简单模式完成消息传递
在这里插入图片描述

实现思路:

  1. 创建工程
  2. 添加依赖
  3. 编写生产者发送消息
  4. 编写消费者接收消息

4.1节 创建工程

创建maven父工程rabbitmq-day01,然后在该工程下创建rabbitmq-producer(消息生产者)和rabbitmq-consumer(消息消费者)两个子模块,如下图所示:
在这里插入图片描述

然后在两个子模块中添加rabbitmq客户端依赖,如下所示:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

4.2 节 编写入门程序

首先我们在rabbitmq-producer模块编写生产者发送消息程序:

/**
 * 生产者发送消息
 */
public class ProducerHelloWorld {
    public static void main(String[] args) throws Exception {
        //1.创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置连接参数
        factory.setHost("192.168.15.142");//默认值localhost
        factory.setPort(5672);//默认值5672
        factory.setVirtualHost("/psjjmq");//虚拟机 默认值/
        factory.setUsername("psjj"); //用户名默认值guest
        factory.setPassword("123456"); //密码默认值guest
        //3.创建连接
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        String queueName = "hello_world";
        //如果没有一个名字叫hello_world的队列,则会创建,否则则不会创建
        channel.queueDeclare(queueName,true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            2. routingKey:路由名称
            3. props:配置信息
            4. body:发送消息数据

         */
        String message = "hello world";

        //6.发送消息
        channel.basicPublish("",queueName,null,message.getBytes());

        //7.释放资源
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

接下来我们在rabbitmq-consumer模块编写消费者接收消息程序

/**
 * 消息消费者
 */
public class ConsumerHelloWorld {
    public static void main(String[] args) throws Exception {
        //1.创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置连接参数
        factory.setHost("192.168.15.142");//默认值localhost
        factory.setPort(5672);//默认值5672
        factory.setVirtualHost("/psjjmq");//虚拟机 默认值/
        factory.setUsername("psjj"); //用户名默认值guest
        factory.setPassword("123456"); //密码默认值guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        channel.basicConsume("hello_world",true,consumer);
        //不要关闭资源
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

4.3 节 小结

生产者流程:

  1. 创建ConnectionFactory工厂
  2. 设置工厂参数:ip、端口号、虚拟机、用户名、密码
  3. 获得物理连接connection
  4. 获得逻辑连接channel
  5. 声明队列channel.queueDeclare
  6. 发送消息给队列channel.basicPublish
  7. 关闭资源

消费者流程:

  1. 创建ConnectionFactory工厂
  2. 设置工厂参数:ip、端口号、虚拟机、用户名、密码
  3. 获得物理连接connection
  4. 获得逻辑连接channel
  5. 声明队列channel.queueDeclare
  6. 创建消费者对象 Consumer consumer = new DefaultConsumer(channel){};
  7. 接收消息回调消费者对象channel.basicConsume(“队列名”,true,consumer);

第五章 RabbitMQ 的工作模式

RabbitMQ五种常用的工作模式有:简单模式、工作队列模式、 发布与订阅模式(广播模式),路由模式、主题模式(通配符模式)。详细讲解如下。

5.1 节 工作队列模式

5.1.1 工作队列模式说明

工作模式的执行过程如下图所示:

在这里插入图片描述

  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
5.1.2 工作队列模式实现

代码实现过程同入门程序类似,不同的消费者端有两个;生产者代码如下:

package top.psjj.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,工作队列模式
 */
public class ProducerWorkQueue {
    public static void main(String[] args) throws Exception{
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.15.142");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/psjjmq");
        connectionFactory.setUsername("psjj");
        connectionFactory.setPassword("123456");

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建通道
        Channel channel = connection.createChannel();
        //4.创建队列 队列名、是否持久化, 是否独占已经关闭连接是否还在,没有consumer是否自动删除,内部参数
        channel.queueDeclare("work_queues",true,false,false,null);
        //5.发送消息
        for (int i = 1; i <=10 ; i++) {
            String body = "~~~工作队列消息~~~"+i;
            //交换机,routingKey,配置信息,内容
            channel.basicPublish("","work_queues",null,body.getBytes());
        }
        //6.关闭资源
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

消费者代码如下:

package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 工作队列模式消息消费者
 */
public class ConsumerWorkQueueA {
    public static <handleDelivery> void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.15.142");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/psjjmq");
        connectionFactory.setUsername("psjj");
        connectionFactory.setPassword("123456");

        //2.获取新连接
        Connection connection = connectionFactory.newConnection();
        //3.创建通道
        Channel channel = connection.createChannel();
        //4.声明队列:队列名,是否持久化,否独占已经关闭连接是否还在,没有consumer是否自动删除,内部参数
        channel.queueDeclare("work_queues",true,false,false,null);
        //5.获得消息消费者
        Consumer consumer = new DefaultConsumer(channel){
            //唯一标识,获得交换机routingKey信息,配置信息,发送内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        //6.接收消息,队列名,是否启动确认,消息对象回调
        channel.basicConsume("work_queues",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 工作队列模式消息消费者
 */
public class ConsumerWorkQueueB {
    public static <handleDelivery> void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.15.142");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/psjjmq");
        connectionFactory.setUsername("psjj");
        connectionFactory.setPassword("123456");

        //2.获取新连接
        Connection connection = connectionFactory.newConnection();
        //3.创建通道
        Channel channel = connection.createChannel();
        //4.声明队列:队列名,是否持久化,否独占已经关闭连接是否还在,没有consumer是否自动删除,内部参数
        channel.queueDeclare("work_queues",true,false,false,null);
        //5.获得消息消费者
        Consumer consumer = new DefaultConsumer(channel){
            //唯一标识,获得交换机routingKey信息,配置信息,发送内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        //6.接收消息,队列名,是否启动确认,消息对象回调
        channel.basicConsume("work_queues",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

运行两个消息消费者,在运行消息生产者,我们发现,两个消费者消费消息各是一部分,证明多个消费者之间是抢占关系。

5.1.3 小结
  1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个, 只需要有一个节点成功发送即可。

5.2 节 发布与订阅模式(广播模式)

5.2.1 发布与订阅模式说明

在这里插入图片描述

发布订阅模式也叫广播模式,在订阅模型中,多了一个Exchange角色,而且过程略有变化:

  1. P:生产者,不在将消息发给队列,而是将消息发送给X交换机。
  2. C:消费者,消息接收者,一直等待消息到来
  3. Queue:消息队列,接收消息,缓存消息
  4. Exchange(x) :交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

**注意:**Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

5.2.2 发布与订阅模式实现

生产者代码实现过程:

package top.psjj.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 发布订阅模式:消息生产者
 */
public class ProducerPubSub {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setUsername("psjj");
        factory.setPassword("123456");
        factory.setVirtualHost("/psjjmq");

        //2.获得连接
        Connection connection = factory.newConnection();
        //3.获得通道
        Channel channel = connection.createChannel();
        //4.声明交换机
        String exchangeName = "my_fanout";
        /**
         * 参数:
         *      1.交换机名称
         *      2.交换机类型
         *          direct:定向 路由模式
         *          fanout 扇形 订阅广播模式
         *          topic : 通配符 主题模式
         *      3.是否持久化
         *      4.是否自动删除
         *      5.internal:内部使用 一般false
         *      6.arguments:参数
         */
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6.创建队列
        String queueName1 = "queue1";
        String queueName2 = "queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //7.绑定队列和交换机:队列名称,交换机名称,routingKye fanout类型设置为""
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        //8.发送消息
        String body = "发布订阅消息";
        channel.basicPublish(exchangeName,"",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

消费者代码实现过程:

package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 发布订阅模式:消息消费者
 */
public class ConsumerPubSub1 {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");
        //2.创建连接
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.接收消息
        String queueName = "queue1";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 发布订阅模式:消息消费者
 */
public class ConsumerPubSub2 {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");
        //2.创建连接
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.接收消息
        String queueName = "queue2";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
5.2.3 小结
  1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
  2. 发布订阅模式与工作队列模式的区别:
  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

5.3 节 路由模式

5.3.1 路由模式说明

路由模式说明:

  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  2. 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  3. Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

整体过程如下图所示:

在这里插入图片描述

图解:

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
5.3.2 路由模式实现

案例需求:

生产者发送消息,routingKey为error消息由消费者1接收,routtingKey为info、error、warning消息由消费者2接收,具体代码如下:

生产者发送消息:

package top.psjj.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 路由模式消息生产者
 */
public class ProducerRouting {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");
        //2.创建连接
        Connection connection = factory.newConnection();
        //3.创建通道
        Channel channel = connection.createChannel();
        //4.创建交换机
        String exchangeName = "my_direct";
        //参数:交换机名称、交换机类型、是否持久化,是否自动删除,内部使用一般为false,参数
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //5.创建队列
        String queue1 = "directQueue1";
        String queue2 = "directQueue2";
        channel.queueDeclare(queue1,true,false,false,null);
        channel.queueDeclare(queue2,true,false,false,null);
        //6.绑定队列和交换机:队列名,交换机名,routingKey
        channel.queueBind(queue1,exchangeName,"error");
        channel.queueBind(queue2,exchangeName,"info");
        channel.queueBind(queue2,exchangeName,"error");
        channel.queueBind(queue2,exchangeName,"warning");

        String body = "~~~路由模式消息内容~~";

        //7.发送消息
        //channel.basicPublish(exchangeName,"error",null,body.getBytes());
	    channel.basicPublish(exchangeName,"warning",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

消费者接收消息:

package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 路由模式消息消费者
 */
public class ConsumerRouting1 {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");
        //2.新建连接
        Connection connection = factory.newConnection();
        //3.获得通道
        Channel channel = connection.createChannel();
        //4.创建消费者对象
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("唯一标识:"+consumerTag);
                System.out.println("交换机"+envelope.getExchange());
                System.out.println("routingkey:"+envelope.getRoutingKey());
                System.out.println(new String(body));
            }
        };
        //5.接收消息
        channel.basicConsume("directQueue1",true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 路由模式消息消费者
 */
public class ConsumerRouting2 {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");
        //2.新建连接
        Connection connection = factory.newConnection();
        //3.获得通道
        Channel channel = connection.createChannel();
        //4.创建消费者对象
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("唯一标识:"+consumerTag);
                System.out.println("交换机"+envelope.getExchange());
                System.out.println("routingkey:"+envelope.getRoutingKey());
                System.out.println(new String(body));
            }
        };
        //5.接收消息
        channel.basicConsume("directQueue2",true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

运行上述代码,当发送消息时设置routingKey为error时,消费者1,消费者2都能接收到消息,当发送消息时设置routingKey为warning时,只有消费者2能接收到消息;证明路由模式成功。测试修改代码如下:

//channel.basicPublish(exchangeName,"error",null,body.getBytes());
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
  • 1
  • 2
5.3.3 小结

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

5.4 节 主题模式(通配符模式)

5.4.1 主题模式说明

主题模式又叫通配符模式;Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

通配符具体运行流程如下图所示:

在这里插入图片描述

  • Q1:表示能接收routingKey满足*.orange.*的消息
  • Q2:表示能接收routingKey满足*.*.rabbit 或者lazy.#的消息
5.4.2 主题模式实现

通过通配符模式完成如下需求:

生产者发送消息的routingKey为top.psjj.java 或者www.top.psjj.java 消费者1接收满足routingKey为*.psjj.*条件的消息,消费者2接收满足routingKey为#.psjj.#条件的消息;当发送消息时routingKey为top.psjj.java时,消费者1、消费者2都能接收到消息;当发送消息时routingKey为www.top.psjj.java时,只有消费者2能接收到消息。

生产者代码:

package top.psjj.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 *  通配符模式 消息生产者
 */
public class ProducerTopic {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");

        //2.创建连接
        Connection connection = factory.newConnection();
        //3.获得通道
        Channel channel = connection.createChannel();
        //4.创建交换机
        String exchangeName = "my_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //5.创建队列
        String queue1 = "topic_queue1";
        String queue2 = "topic_queue2";
        channel.queueDeclare(queue1,true,false,false,null);
        channel.queueDeclare(queue2,true,false,false,null);
        //6.绑定队列和交换机
        channel.queueBind(queue1,exchangeName,"*.psjj.*");
        channel.queueBind(queue2,exchangeName,"#.psjj.#");
        //7.发送消息
        String body = "``topic消息``";
        //channel.basicPublish(exchangeName,"top.psjj.java",null,body.getBytes());
        channel.basicPublish(exchangeName,"www.top.psjj.java",null,body.getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

消费者代码:

package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 通配符模式:消息消费者
 */
public class ConsumerTopic1 {
    public static void main(String[] args) throws Exception {
        //1.创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");

        //2.创建连接
        Connection connection = factory.newConnection();
        //3.创建通道
        Channel channel = connection.createChannel();
        //4.创建消费者对象
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息内容:"+new String(body));
            }
        };
        //5.接收消息
        channel.basicConsume("topic_queue1",true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
package top.psjj.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 通配符模式:消息消费者
 */
public class ConsumerTopic2 {
    public static void main(String[] args) throws Exception {
        //1.创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.15.142");
        factory.setPort(5672);
        factory.setVirtualHost("/psjjmq");
        factory.setUsername("psjj");
        factory.setPassword("123456");

        //2.创建连接
        Connection connection = factory.newConnection();
        //3.创建通道
        Channel channel = connection.createChannel();
        //4.创建消费者对象
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息内容:"+new String(body));
            }
        };
        //5.接收消息
        channel.basicConsume("topic_queue2",true,consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
5.4.3 小结

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

5.5 小结

  1. 简单模式 HelloWorld:一个生产者、一个消费者,不需要设置交换机(使用默认的交换机direct类型)。
  2. 工作队列模式 Work Queue :一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
  3. 发布订阅模式 Publish/subscribe :需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消 息发送到绑定的队列。
  4. 路由模式 Routing :需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机 后,交换机会根据 routing key 将消息发送到对应的队列。
  5. 通配符模式 Topic :需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

第六章 Spring整合RabbitMQ(了解)

6.1 节 整合说明

使用Spring整合RabbitMQ可以大大简化开发步骤,生产者整合过程如下:

  1. 创建生产者工程
  2. 添加依赖
  3. 配置整合
  4. 编写代码发送消息

消费者整合

  1. 创建消费者工程
  2. 添加依赖
  3. 配置整合
  4. 编写消息监听器

6.2 节 整合实现

生产者整合过程如下:

【1】创建子模块spring-rabbitmq-producer,并添加如下依赖

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.2.15.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.2.15.RELEASE</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

【2】配置rabbitmq.properties的基本信息

rabbitmq.host=192.168.15.142
rabbitmq.port=5672
rabbitmq.username=psjj
rabbitmq.password=123456
rabbitmq.virtual-host=/psjjmq
  • 1
  • 2
  • 3
  • 4
  • 5

【3】整合Spring文件spring-rabbitmq-producer.xml,内容如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties" />

    <!--注入connectionFactory-->
    <rabbit:connection-factory id="connectionFactory"
                               host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               virtual-host="${rabbitmq.virtual-host}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
    />

    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定义简单类型队列,不绑定交换机则绑定默认交换机,默认交换机类型为direct,名字为"",路由建为队列名称
    id:bean的名称
    name:queue名称
    auto-declare:自动创建
    auto-delete:自动删除,最后一个消费者和该队列断开连接后,自动删除队列
    exclusive:是否独占
    durable:是否持久化
    -->
    <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true" />

    <!--定义广播模式消息队列:所有队列都能收到消息-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true" />
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true" />
    <!--定义广播类型交换机:并绑定上述两个队列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1" />
            <rabbit:binding queue="spring_fanout_queue_2" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--定义路由模式消息队列-->
    <rabbit:queue id="spring_direct_queue_1" name="spring_direct_queue_1" auto-declare="true" />
    <rabbit:queue id="spring_direct_queue_2" name="spring_direct_queue_2" auto-declare="true" />
    <!--定义路由模式交换机:并绑定上述两个队列-->
    <rabbit:direct-exchange id="spring_direct_exchange" name="spring_direct_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_direct_queue_1" key="info" />
            <rabbit:binding queue="spring_direct_queue_2" key="info" />
            <rabbit:binding queue="spring_direct_queue_2" key="error" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义通配符模式消息队列-->
    <rabbit:queue id="spring_topic_queue_1" name="spring_topic_queue_1" auto-declare="true" />
    <rabbit:queue id="spring_topic_queue_2" name="spring_topic_queue_2" auto-declare="true" />
    
    <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="*.psjj.*" queue="spring_topic_queue_1" />
            <rabbit:binding pattern="#.psjj.#" queue="spring_topic_queue_2" />
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--注入rabbitMQ对象-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

【4】编写测试类

package top.psjj.producer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * spring整合RabbitMQ生产者测试
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    //注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test //简单模式
    public void helloWorld(){
        rabbitTemplate.convertAndSend("spring_queue","hello world");
    }
    @Test//广播模式
    public void fanoutTest(){
        rabbitTemplate.convertAndSend("spring_fanout_exchange","","广播消息");
    }
    @Test//路由模式
    public void directTest(){
        rabbitTemplate.convertAndSend("spring_direct_exchange","info","路由消息");
    }
    @Test//通配符模式
    public void topicTest(){
        rabbitTemplate.convertAndSend("spring_topic_exchange","top.psjj.java","通配符消息");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

消费者整合过程如下:

【1】创建子模块spring-rabbitmq-consumer,并添加如下依赖

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.2.15.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.2.15.RELEASE</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

【2】配置rabbitmq.properties的基本信息

rabbitmq.host=192.168.15.142
rabbitmq.port=5672
rabbitmq.username=psjj
rabbitmq.password=123456
rabbitmq.virtual-host=/psjjmq
  • 1
  • 2
  • 3
  • 4
  • 5

【3】整合Spring文件spring-rabbitmq-consumer.xml,内容如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties" />

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--注入队列监听器-->
    <bean id="springQueueListener" class="top.psjj.consumer.SpringSimpleListener"/>
    <!--配置监听器容器以及队列相关信息-->
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue" />
    </rabbit:listener-container>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

【4】编写监听器

package top.psjj.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
 * @Auther: 胖叔讲java
 * @Date: 2023/2/17 - 02 - 17 - 1:08
 * @Decsription: top.psjj.consumer
 * @version: 1.0
 */
public class SpringSimpleListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("简单模式:"+new String(message.getBody()));
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

【5】编写测试

package top.psjj.consumer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test1(){
        boolean flag = true;
        while (true){

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

注意:由于消费端代码都一样这里这写一个。

6.3 节 小结

  • 使用 Spring 整合 RabbitMQ 将组件全部使用配置方式实现,简化编码
  • Spring 提供 RabbitTemplate 简化发送消息 API
  • 使用监听机制简化消费者编码

第七章 SpringBoot整合RabbitMQ

7.1 节 整合说明

使用SpringBoot整合RabbitMQ可以大大简化开发步骤,生产者整合过程如下:

  1. 创建生产者工程
  2. 添加场景启动器
  3. 配置连接信息
  4. 编写配置类
    1. 注入Exchange交换机对象
    2. 注入Queue队列对象
    3. 注入Binding对象确定交换机和队列绑定关系
  5. 编写代码发送消息
    1. 注入RabbitTemplate对象,发送消息

消费者整合

  1. 创建消费者工程
  2. 添加场景启动器
  3. 配置连接信息
  4. 编写监听器
    1. 通过@RabbitListener注解监听队列,@RabbitListener作用于方法上,该方法的参数时Message消息对象;该方法监听到队列有消息时会自动触发。

7.2 节 整合实现

生产者整合过程如下:

【1】创建SpringBoot父工程springboot-rabbitmq-day01,pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <modules>
        <module>springboot-rabbitmq-procuder</module>
        <module>springboot-rabbitmq-consumer</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>top.psjj</groupId>
    <artifactId>springboot-rabbitmq-day01</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-day01</name>
    <description>Demo project for Spring Boot</description>
    <packaging>pom</packaging>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

【2】创建子模块生产者模块springboot-rabbitmq-procuder,添加如下依赖

<dependencies>
    <!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

【3】配置连接信息

# 配置Rabbitmq的基本信息
spring:
  rabbitmq:
    host: 192.168.15.142
    port: 5672
    username: psjj
    password: 123456
    virtual-host: /psjjmq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

【4】编写配置类

/**
 * rabbitmq配置类
 */
@Configuration
public class RabbitMQConfig {
    //定义交换机名字
    public static final  String EXCHANGE_NAME = "boot_topic_exchange";
    //定义队列名字
    public static final String QUEUE_NAME = "boot_topic_queue1";

    //注入交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //注入队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    /**
     * 绑定队列与交换机关系
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

【5】编写测试

/**
 * 消息生产者测试
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    //注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.java","~~~springboot-topic-消息~~~");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

消费者整合过程如下:

【1】创建子模块生产者模块springboot-rabbitmq-consumer,添加如下依赖

<dependencies>
    <!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

【2】配置连接信息

# 配置Rabbitmq的基本信息
spring:
  rabbitmq:
    host: 192.168.15.142
    port: 5672
    username: psjj
    password: 123456
    virtual-host: /psjjmq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

【3】编写配置类

package top.psjj.consumer.mq.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息消费者
 */
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "boot_topic_queue1")
    public void ListenerQueue(Message message){
        System.out.println(new String(message.getBody()));
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

【4】编写启动器

/**
 * springBoot启动器
 */
@SpringBootApplication
public class RabbitMQConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQConsumerApplication.class,args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

7.3 节 小结

使用SpringBoot整合RabbitMQ可以大大简化开发步骤,生产者整合过程如下:

  1. 创建生产者工程
  2. 添加场景启动器
  3. 配置连接信息
  4. 编写配置类
    1. 注入Exchange交换机对象
    2. 注入Queue队列对象
    3. 注入Binding对象确定交换机和队列绑定关系
  5. 编写代码发送消息
    1. 注入RabbitTemplate对象,发送消息

消费者整合

  1. 创建消费者工程

  2. 添加场景启动器

  3. 配置连接信息

  4. 编写监听器

    1. 通过@RabbitListener注解监听队列,@RabbitListener作用于方法上,该方法的参数时Message消息对象;该方法监听到队列有消息时会自动触发。

第八章 总结

  1. MQ概念
  2. MQ优缺点
  3. AMQP协议于JMS规范区别
  4. RabbitMQ底层架构
  5. RabbitMQ五种工作模式
  6. Spring整合RabbitMQ
  7. SpringBoot整合RabbitMQ

第九章 模拟面试题

  1. 消息中间件概念?

答:消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件。异步处理流量削峰、限流、缓冲、排队、最终一致性消息驱动等需求的场景都可以使用消息中间件。

  1. 简述一下RabbitMQ的工作流程?

答:生产者工作流程如下:

  1. Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。
  2. Producer声明一个交换器并设置好相关属性。
  3. Producer声明一个队列并设置好相关属性。
  4. Producer通过路由键将交换器和队列绑定起来。
  5. Producer发送消息到Broker,其中包含路由键、交换器等信息。
  6. 相应的交换器根据接收到的路由键查找匹配的队列。
  7. 如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
  8. 关闭信道。
  9. 管理连接。

消费者工作流程如下:

  1. consumer先连接到Broker,建立连接Connection,开启一个信道(Channel)。

  2. 向Broker请求消费响应的队列中消息,可能会设置响应的回调函数。

  3. 等待Broker回应并投递相应队列中的消息,接收消息。

  4. 消费者确认收到的消息,ack。

  5. RabbitMq从队列中删除已经确定的消息。

  6. 关闭信道。

  7. 关闭连接。

  8. 如何自定义消息中间件?

答:使用 BlockingQueue(阻塞队列)实现;当队列容器已满时,生产者线程被阻塞,直到队列未满后才可以继续put;当队列容器为空时,消费者线程被阻塞,直至队列非空时才可以继续take。

在这里插入图片描述

  1. 主流消息中间件及选型?

当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka这三款。

作为消息队列,要具备以下几个特性:

  1. 消息传输的可靠性:保证消息不会丢失。
  2. 支持集群,包括横向扩展,单点故障都可以解决。
  3. 性能要好,要能够满足业务的性能需求。

**RabbitMQ:**RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。

优点:

  1. 轻量级,快速,部署使用方便。
  2. 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
  3. RabbitMQ的客户端支持大多数的编程语言。

缺点:

  1. 如果有大量消息堆积在队列中,性能会急剧下降。
  2. RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
  3. RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。

RocketMQ

优点

  1. RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。
  2. RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。经过了历次的双11考验,性能,稳定性可靠性没的说。
  3. RocketMQ几乎具备了消息队列应该具备的所有特性和功能。
  4. java开发,阅读源代码、扩展、二次开发很方便
  5. 对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。
  6. 性能比RabbitMQ高一个数量级,每秒处理几十万的消息。

缺点:跟周边系统的整合和兼容不是很好。

Kafka

  1. Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。

  2. 跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafka。

  3. Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。

  4. Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息。

  5. 如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。

  6. 但是由于是异步的和批处理的,延迟也会高,不适合电商场景。

  7. 消息队列的优缺点?

答:优势有:

1. 应用解耦:提高系统容错性和可维护性
2. 异步提速:提升用户体验和系统吞吐量
3. 削峰填谷:提高系统稳定性
  • 1
  • 2
  • 3

劣势有:

  1. 系统的可用性降低:系统引入的依赖越多,系统稳定性越差,一旦MQ宕机,就会对业务造成影响,如何保证MQ的高可用。

  2. 系统复杂性提高:MQ的加入增加了系统的复杂度,以前系统是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?

  3. 一致性问题:A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,如何保证消息数据处理的一致性?

  4. JMS规范和AMQP协议

答:JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。

AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于JMS,兼容JMS协议。

  1. 请说明一下RabbitMQ的工作模式?

答:1.simple模式(即最简单的收发模式):消息产生消息,将消息放入队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IoNWDhgB-1692091646688)(./assets/25.png)]

消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

2.work工作模式(资源的竞争)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sKXpbuEg-1692091646689)(./assets/28.png)]

消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

3.publish/subscribe发布订阅(共享资源)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3agb1kbG-1692091646690)(./assets/29.png)]

每个消费者监听自己的队列;

生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

4.routing路由模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z7cVriv6-1692091646690)(./assets/30.png)]

消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

根据业务功能定义路由字符串

从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

5.topic 主题模式(路由模式的一种)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1tuwbnxn-1692091646691)(./assets/31.png)]

星号井号代表通配符

*匹配一个 #匹配0个或者多个

路由功能添加模糊匹配

消息产生者产生消息,把消息交给交换机

交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)

8.Connection 和Channel关系?

答:生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。

9.什么不直接使用TCP连接,而是使用信道?

答:RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。当信道本身的流量很大时,一个Connection就会产生性能瓶颈,流量被限制。需要建立多个Connection分摊信道。信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。

10.说说Broker服务节点、Queue队列、Exchange交换器?

答:Broker可以看做RabbitMQ的服务节点。一般请下一个Broker可以看做一个RabbitMQ服务器。Queue:RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。

  1. 什么是RoutingKey路由键?

答:生产者将消息发送给交换器的时候,会指定一个RoutingKey,用来指定这个消息的路由规则,这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

  1. Binding绑定?

答:通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,这样RabbitMq就知道如何正确路由消息到队列了。

  1. 交换器4种类型?

答:主要有以下4种。

  • fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
  • direct:把消息路由到BindingKey和RoutingKey完全匹配的队列中。
  • topic: 匹配规则:
    • RoutingKey 为一个 点号’.': 分隔的字符串。比如: java.xiaoka.show
    • BindingKey和RoutingKey一样也是点号“.“分隔的字符串。
    • BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个
  • headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。
  1. 消息怎么路由?

答:消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种:

  • fanout:如果交换器收到消息,将会广播到所有绑定的队列上
  • direct:如果路由键完全匹配,消息就被投递到相应的队列
  • topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符
  1. 消息如何分发?

答:若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。

  1. 了解Virtual Host吗?

答:每一个RabbitMQ服务器都能创建虚拟的消息服务器,也叫虚拟主机(virtual host),简称vhost。默认为“/”。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/976554?site
推荐阅读
相关标签
  

闽ICP备14008679号