当前位置:   article > 正文

RabbitMQ Tutorials_rabbitmq-tutorials

rabbitmq-tutorials

消息队列

1.MQ相关概念

1.1 什么是MQ

MQ(message queue),从字面意思上看,本质是个队列FIFO 先入先出,只不过队列中存放的内容是

message 而已,还是一种跨进程的通信机制,用于上下游传递消息

1.2 为什么使用MQ

流量削峰

  • 处理能力有限,如果只能处理1W请求,1W的之内都能即时响应,现在由于活动2W用户请求不能放弃掉1W用户,所以消息可以到消息队列,只不过处理用户下单的时间比之前长

应用解耦

  • 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而原先系统不需要做任何修改。

异步处理

  • 有些服务间调用是异步的
  • 例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,
  • 以前一般有两种方式:
    • A 过一段时间去调用 B 的查询 api 查询
    • 或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务
  • 这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题
    • A 调用 B 服务后,只需要监听 B 处理完成的消息
    • 当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务

1.3 常用MQ优缺点

ActiveMQ

  • 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
  • 缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用

Kafka

  • 优点
    • 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高
    • 时效性 ms 级可用性非常高
    • kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
    • 消费者采 用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次
    • 有优秀的第三方Kafka Web 管理界面 Kafka-Manager
    • 在日志领域比较成熟,被多家公司和多个开源项目使用
  • 缺点
    • 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
    • 社区更新较慢

RocketMQ

  • RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景
  • 优点
    • 单机吞吐量十万级,可用性非常高
    • 分布式架构,消息可以做到 0 丢失
    • MQ 功能较为完善,还是分布式的,扩展性好
    • 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码
  • 缺点
    • 支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟
    • 社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

RabbitMQ

  • 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一
  • 优点:
    • 由于 erlang 语言的高并发特性,性能较好;吞吐量到万级
    • MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:
      • Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等
    • 支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
  • 缺点
    • 商业版需要收费,学习成本较高

1.4 MQ的选择

大型项目:RocketMQ

日志收集(就是跟大数据量打交道):kafka

小/中型项目:RabbitMQ

社区活跃度:

RabbitMQ > RocketMQ > kafka

消息持久化:

RabbitMQ、RocketMQ、kafka都支持持久化

高并发:

kafka = RocketMQ > RabbitMQ

吞吐量:

kafka = RocketMQ > RabbitMQ

2.RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

2.1 前置概念

2.1.1 AMQP核心概念

虚拟主机(virtual host)或(vhost
  • 一组交换机、队列和绑定器被称为 虚拟主机(vhost
  • RabbitMQ server 可以说就是一个消息队列服务器实体(Broker
    • Broker当中可以有多个用户,而用户只能在虚拟主机的粒度进行权限控制,所以RabbitMQ中需要多个虚拟主机
    • 每一个RabbitMQ服务器都有一个默认的虚拟主机
交换机(exchange
  • 指定消息按什么规则,路由到哪个队列。它可以被理解成具有路由表的路由程序。(发送消息的实体
  • 交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。
  • Exchange的类型
    • Direct:直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
    • Topic:这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
    • Fanout:交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
    • Headers:消息头交换机使用消息头的属性进行消息路由,相当于模糊匹配(like header%),一对多
队列(queue
  • 队列是消息载体,每个消息都会被投入到一个或多个队列
  • 试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求(接收消息的实体)。
绑定器(bind
  • 作用:把exchange和queue按照路由规则绑定起来
    • 将交换器和队列连接起来,并且封装消息的路由信息

2.1.2 程序中连接与消息使用的两个关键概念

连接(Connection)
  • 与RabbitMQ Server建立的一个连接
    • 由ConnectionFactory创建
    • 每个connection只与一个物理的Server进行连接,此连接是基于Socket进行连接的
  • AMQP一般使用TCP
通道 (Channel)
  • 消息通道(主要进行相关定义,发送消息,获取消息,事务处理等)
  • 在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

2.2 七种工作模式

  • 简单模式:一个生产者,一个消费者
  • work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一
  • 订阅模式:一个生产者发送的消息会被多个消费者获取
  • 路由模式: 发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
  • topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词
  • RPC模式:使用RabbitMQ构建RPC系统:客户端和可伸缩RPC服务器
  • 发布确认:与发布者进行可靠的发布确认

2.3 安装

# 1.拉取镜像
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker pull rabbitmq:3.8-management
# 2.查看镜像
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker images
# 3.启动
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
a5ca877169d4eda31a9c404f94565c118f1cc64de2cbb398ff17a66e72c682cd
# 4.查看启动情况
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker ps
CONTAINER ID   IMAGE                     COMMAND                  CREATED         STATUS         PORTS                                                                                                         NAMES
a5ca877169d4   rabbitmq:3.8-management   "docker-entrypoint.s…"   4 seconds ago   Up 3 seconds   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbitmq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
问题
  • 防火墙问题
  • 启动web管理界面
# 进入rabbitmq
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker exec -it rabbitmq bash
# 启动web管理界面
root@a5ca877169d4:/# rabbitmq-plugins enable rabbitmq_management
  • 1
  • 2
  • 3
  • 4

2.3.1 添加新用户

# 创建账号
root@a5ca877169d4:/# rabbitmqctl add_user admin admin
Adding user "admin" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.

# 设置用户角色
root@a5ca877169d4:/# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

# 设置用户权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 用户 admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
root@a5ca877169d4:/# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...

# 查看当前用户和角色
root@a5ca877169d4:/# rabbitmqctl list_users
Listing users ...
user    tags
admin   [administrator]
guest   [administrator]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.3 Hello World

在这里插入图片描述

  • 在本教程的这一部分中,我们将用 Java 编写两个程序;
  • 发送单个消息的生产者和接收消息并将其打印出来的消费者
  • 在下图中,“P”是我们的生产者,“C”是我们的消费者
    • 中间的盒子是一个队列——RabbitMQ 代表消费者保留的消息缓冲区

2.3.1 依赖

<!--指定 jdk 编译版本--> 
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build> 
<dependencies>
    <!--rabbitmq 依赖客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!--操作文件流的一个依赖-->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

2.3.2 消息生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
   
    private final static String QUEUE_NAME = "hello";

    public static 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/425999
推荐阅读
相关标签
  

闽ICP备14008679号