当前位置:   article > 正文

RabbitMQ 消息中间件

RabbitMQ 消息中间件

1、消息中间件
消息(message):
指在服务之间传送的数据。可以是简单的文本消息,也可以使复杂包含嵌入对象的消息

消息队列(message queue):
指用来存放消息的队列,一般采用先进先出的队列方式

消息中间件也可以称消息队列,是利用高效可靠的消息传递机制进行异步的数据传输并基于数据通信来进行分布式系统的集成。

当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。

2、作用
1、消息中间件主要作用

  • 冗余(存储)–有些情况下处理数据的过程会失败,造成数据丢失,可使用消息中间件进行数据持久化
  • 扩展性
  • 解耦
  • 异步通信
  • 削峰:消息队列中的常用场景,比如线上的双十一秒杀,或者春节火车票抢购,会有大量的请求同时进来。这时服务承受的压力是平时的几十甚至上百倍,一般会因为流量过大,应用系统配置承载不了这股瞬间流量,容易造成数据库的崩溃或者导致系统直接挂掉,即传说中的“宕机”现象。为解决这个问题,加入了消息中间件后我们会将那股巨大的力量将其统一转移至MQ中均速处理消息。而不直接涌入我们的后端服务接口,这样可以防止流量洪峰造成系统服务崩溃.
    2、消息中间件的两种模式

1、P2P模式

P2P模式(点对点模式)包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,消息可以在队列中进行异步传输,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。队列支持多个消费者,但是一个消息只能被一个消费者消费。

2、Pub/Sub模式–每个消息可以有多个订阅者

Pub/Sub模式(发布订阅模式)包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。发布者将消息发送到Topic,多个订阅者从 Topic 订阅消息。一个消息可以被多个订阅者消费。

消费者怎么得到消息队列的数据?-

消费者怎么从消息队列里边得到数据?有两种办法:

  • 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
  • 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)
    3、常用中间件介绍与对比
1、Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是追求高吞吐量(单机可达到10万级)时效性是毫秒级的,一开始的目的就是用于日志收集和传输。对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

2、RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、可靠性、安全。在企业系统内对数据一致性、稳定性和可靠性要求很高的场景(时效性是微秒级),对性能和吞吐量(单机万级)的要求还在其次。

Erlang是一种通用的面向并发的编程语言
AMQP协议:是一个进程间传递异步消息的网络协议
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

RabbiMQ集群

RabbiMQ是⽤erlang语言开发的,开源的,集群⾮常⽅便,支持多种语言客户端,包括java、Python、ruby、PHP、C/C++等,但其本身并不⽀持负载均衡,支持高并发。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、高可用性等方面表现不俗。

Ajax 即“Asynchronous Javascript And XML”(异步 JavaScript 和 XML),是指一种创建交互式网页应用的网页开发技术。Ajax = 异步 JavaScript 和 XML

2、RabbitMQ 特点
  • 可靠性—提供了多种技术可以在性能和可靠性之间进行权衡。这些技术包括 持久性机制、投递确认、和高可用性机制

  • 高可用性

  • 多语言客户端

  • 管理界面

  • 插件机制

mq如何实现持久化:

1 队列实现持久化
如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化 。
boolean durable = true;
channel.queueDeclare(ACK_QUEUE_NAME, durable, false, false, null);
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新 创建一个持久化的队列,不然就会出现错误。

当我们将durable参数位置传递true之后,即使重启 rabbitmq 队列也依然存在。

2 消息实现持久化
要想让消息实现持久化需要在消息生产者修改代码,需要像channel的basicPublish方法中props属性中传递MessageProperties.PERSISTENT_TEXT_PLAIN 参数

3、什么是消息队列
MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

(1)单⼀模式。

(2)普通模式(默认的集群模式)。

(3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的高可用⽅案,在对业务可靠性要求较⾼的场合中⽐较适⽤)。要实现镜像模式,需要先搭建⼀个普通集群模式,在这个模式的基础上再配置镜像模式以实现⾼可⽤。

了解集群中的基本概念:

RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。

一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制。

在这里插入图片描述
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立tcp连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;类似存放商品的仓库和商店。是生产商品的工厂和购买商品的用户之间的中转站。
Broker:简单来说就是消息队列服务器实体
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离.
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
user:用户
Binding: exchange 与 queue 之间的虚拟链接,binding 中可以包含 routing key。binding 信息被保存到 exchange 的查询表中,用于消息的分发。

注意:集群中有两种节点
1 内存节点:只保存状态到内存(持久的queue的持久内容将被保存到disk)

2 磁盘节点:保存状态到内存和磁盘。—推荐

内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了

如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。

普通集群准备环境(注意,这⾥三台服务器都关闭防火墙和selinux

192.168.1.38 rabbitmq-1
192.168.1.39 rabbitmq-2
192.168.1.40 rabbitmq-3

三台机器都操作:

  1. 配置hosts⽂件更改三台MQ节点的计算机名分别为rabbitmq-1、rabbitmq-2 和rabbitmq-3,然后修改hosts配置⽂件

root@rabbitmq-1 ~]# vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.1.38 rabbitmq-1
192.168.1.39 rabbitmq-2
192.168.1.40 rabbitmq-3

2.三个节点配置安装rabbitmq软件

安装依赖
[root@rabbitmq-1 ~]# yum install -y epel gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel
yum安装erlang
[root@rabbitmq-1 ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
[root@rabbitmq-1 ~]# yum install erlang-21.3.8.21-1.el7.x86_64
测试;
[root@rabbitmq-1 ~]# erl
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V9.3 (abort with ^G)
1>

安装rabbitmq
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.10 —选择版本
下载下来将其上传到服务器中,三台机器相同操作
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.10/rabbitmq-server-3.7.10-1.el7.noarch.rpm

[root@rabbitmq-1 ~]# yum install -y rabbitmq-server-3.7.10-1.el7.noarch.rpm

erlang 版本选择

https://packagecloud.io/rabbitmq/erlang

rabbitmq 和erlang兼容版本

https://www.rabbitmq.com/which-erlang.html

3.启动
[root@rabbitmq-1 ~]# systemctl daemon-reload
[root@rabbitmq-1 ~]# systemctl start rabbitmq-server
[root@rabbitmq-1 ~]# systemctl enable rabbitmq-server
启动方式二:
[root@rabbitmq-1 ~]# /sbin/service rabbitmq-server status —查看状态
[root@rabbitmq-1 ~]# /sbin/service rabbitmq-server start —启动
每台都操作开启rabbitmq的web访问界面:
[root@rabbitmq-1 ~]# rabbitmq-plugins enable rabbitmq_management

创建用户

注意:在一台机器操作
添加用户和密码
[root@rabbitmq-1 ~]# rabbitmqctl add_user soho soso
Creating user “soho” …
…done.
设置为管理员
[root@rabbitmq-1 ~]# rabbitmqctl set_user_tags soho administrator
Setting tags for user “soho” to [administrator] …
…done.
查看用户
[root@rabbitmq-1 ~]# rabbitmqctl list_users
Listing users …
guest [administrator]
soho [administrator]
…done.

此处设置权限时注意’.‘之间需要有空格 三个’.'分别代表了conf权限,read权限与write权限 例如:当没有给
soho设置这三个权限前是没有权限查询队列,在ui界面也看不见
[root@rabbitmq-1 ~]# rabbitmqctl set_permissions -p “/” soho “." ".” “.*”
Setting permissions for user “soho” in vhost “/” …
…done.

所有机器都操作:开启用户远程登录:

[root@rabbitmq-1 ~]# cd /etc/rabbitmq/
[root@rabbitmq-1 rabbitmq]# cp /usr/share/doc/rabbitmq-server-3.7.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
[root@rabbitmq-1 rabbitmq]# ls
enabled_plugins rabbitmq.config
[root@rabbitmq-1 rabbitmq]# vim rabbitmq.config
修改如下:
在这里插入图片描述
三台机器都操作重启服务服务:
[root@rabbitmq-1 ~]# systemctl restart rabbitmq-server
在这里插入图片描述
4369 – erlang端口
5672 --程序连接端口
15672 – 管理界面ui端口
25672 – server间内部通信口

!!!注意如果是云服务器,切记添加安全组端口放行。
访问:192.168.1.38:15672

在这里插入图片描述
这里需要注意:**

rabbitmq默认管理员用户:guest 密码:guest

新添加的用户为:soho 密码:soso

**开始部署集群三台机器

1.首先创建好数据存放目录和日志存放目录:

[root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/data
[root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/logs
[root@rabbitmq-1 ~]# chmod 777 -R /data/rabbitmq
[root@rabbitmq-1 ~]# chown rabbitmq.rabbitmq /data/ -R
创建配置文件:
[root@rabbitmq-1 ~]# vim /etc/rabbitmq/rabbitmq-env.conf
[root@rabbitmq-1 ~]# cat /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/data/rabbitmq/data
RABBITMQ_LOG_BASE=/data/rabbitmq/logs
重启服务
[root@rabbitmq-1 ~]# systemctl restart rabbitmq-server

2.拷⻉erlang.cookie
Rabbitmq的集群是依附于erlang的集群来⼯作的,所以必须先构建起erlang的集群景象。Erlang的集群中

各节点是经由⼀个cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,⽂件是400的权限。所以必须保证各节点cookie⼀致,不然节点之间就⽆法通信.

如果执行# rabbitmqctl stop_app 这条命令报错:需要执行
#如果执行# rabbitmqctl stop_app 这条命令报错:需要执行

#chmod 400 .erlang.cookie
#chown rabbitmq.rabbitmq .erlang.cookie

(官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。)

[root@rabbitmq-1 ~]# cat /var/lib/rabbitmq/.erlang.cookie
HOUCUGJDZYTFZDSWXTHJ
⽤scp的⽅式将rabbitmq-1节点的.erlang.cookie的值复制到其他两个节点中。
[root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.139:/var/lib/rabbitmq/
[root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.140:/var/lib/rabbitmq/

3.将mq-2、mq-3作为内存节点加⼊mq-1节点集群中

在mq-2、mq-3执⾏如下命令:
[root@rabbitmq-2 ~]# rabbitmqctl stop_app #停止节点,切记不是停止服务
[root@rabbitmq-2 ~]# rabbitmqctl reset #如果有数据需要重置,没有则不用
[root@rabbitmq-2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1 #添加到磁盘节点
Clustering node ‘rabbit@rabbitmq-2’ with ‘rabbit@rabbitmq-1’ …
[root@rabbitmq-2 ~]# rabbitmqctl start_app #启动节点
Starting node ‘rabbit@rabbitmq-2’ …

[root@rabbitmq-3 ~]# rabbitmqctl stop_app
Stopping node ‘rabbit@rabbitmq-3’ …
[root@rabbitmq-3 ~]# rabbitmqctl reset
Resetting node ‘rabbit@rabbitmq-3’ …
[root@rabbitmq-3 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
Clustering node ‘rabbit@rabbitmq-3’ with ‘rabbit@rabbitmq-1’ …
[root@rabbitmq-3 ~]# rabbitmqctl start_app
Starting node ‘rabbit@rabbitmq-3’ …

(1)默认rabbitmq启动后是磁盘节点,在这个cluster命令下,mq-2和mq-3是内存节点,
mq-1是磁盘节点。
(2)如果要使mq-2、mq-3都是磁盘节点,去掉–ram参数即可。
(3)如果想要更改节点类型,可以使⽤命令rabbitmqctl change_cluster_node_type
disc(ram),前提是必须停掉rabbit应⽤
注:
#如果有需要使用磁盘节点加入集群
[root@rabbitmq-2 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-1
[root@rabbitmq-3 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-1

4.查看集群状态

在这里插入图片描述
每台机器显示出三台节点,表示已经添加成功!

5.登录rabbitmq web管理控制台,创建新的队列

打开浏览器输⼊http://192.168.1.38:15672, 输⼊默认的Username:guest,输⼊默认的

Password:guest

登录后出现如图所示的界⾯。

在这里插入图片描述
根据界⾯提示创建⼀条队列

在这里插入图片描述
RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,队列内容不会复制。如果队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

镜像队列是基于普通的集群模式的。

创建镜像集群

rabbitmq set_policy :设置策略

[root@rabbitmq-1 ~]# rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’
Setting policy “ha-all” for pattern “^” to “{“ha-mode”:“all”}” with priority “0” for vhost “/” …

再次查看队列已经同步到其他两台节点:
在这里插入图片描述
"^"匹配所有的队列,策略名称为ha-all, ‘{“ha-mode”:“all”}’ 策略模式为 all 即复制到所有节点,包含新增节点。

设置策略介绍:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称,可以定义
Pattern: queue的匹配模式(正则表达式),也就是说会匹配一组。
Definition:镜像定义,包括ha-mode,ha-params,ha-sync-mode
ha-mode:指明镜像队列的模式
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
ha-params:ha-mode模式需要用到的参数
案例:
例如,对队列名称以hello开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy的设置命令为:
rabbitmqctl set_policy hello-ha “^hello” ‘{“ha-mode”:”exactly”,”ha-params”:2,”ha-sync-mode”:”automatic”}’

则此时镜像队列设置成功。

已经部署完成

将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致。

三、常见问题

常见错误:

1、使用 rabbitmq-server -detached命令启动rabbitmq时,出现以下提示Warning: PID file not written; -detached was passed,此时使用rabbitmqctl status提示服务已启动,可知此问题不用解决。

2、由于更改hostname文件,在每次rabbitmqctl stop或者rabbitmqctl cluster_status等,只要是rabbitmq的命令就报错,提示大概如下

Cluster status of node rabbit@web2 …
Error: unable to connect to node rabbit@web2: nodedown

DIAGNOSTICS

attempted to contact: [rabbit@web2]

rabbit@web2:

  • connected to epmd (port 4369) on web2

  • epmd reports node ‘rabbit’ running on port 25672

  • TCP connection succeeded but Erlang distribution failed

  • Hostname mismatch: node “rabbit@mq2” believes its host is different. Please ensure that hostnames resolve the same way locally and on “rabbit@mq2”

current node details:

  • node name: ‘rabbitmq-cli-11@web2’
  • home dir: /root
  • cookie hash: SGwxMdJ3PjEXG1asIEFpBg==
    此时先ps aux | grep mq,然后kill -9 该进程,然后再rabbitmq-server -detached即可解决。(即先强杀,再重新启动)

3、使用rabbitmqctl stoprabbitmq-server -detached重新启动后,原先添加的用户admin、虚拟主机coresystem等均丢失,还需要重新添加。

安装并配置负载均衡器HA

注意:如果使用阿里云,可以使用阿里云的内网slb来实现负载均衡,不用自己搭建HA。

安装并配置负载均衡器HA

1、在192.168.1.38安装HAProxy
yum -y install haproxy

2、修改 /etc/haproxy/haproxy.cfg

[root@rabbitmq-1 ~]# cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
[root@rabbitmq-1 ~]# vim /etc/haproxy/haproxy.cfg
global
log 127.0.0.1 local2

chroot      /var/lib/haproxy
pidfile     /var/run/haproxy.pid
maxconn     4000
user        haproxy
group       haproxy
nbproc      4
daemon
# turn on stats unix socket
stats socket /var/lib/haproxy/stats
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

#---------------------------------------------------------------------
defaults
mode http
log global
retries 3
timeout connect 10s
timeout client 1m
timeout server 1m
timeout check 10s
maxconn 2048
#---------------------------------------------------------------------
##监控查看本地状态#####
listen admin_stats
bind *:80
mode http
option httplog
option httpclose
log 127.0.0.1 local0 err
stats uri /haproxy
stats auth qianfeng:123
####################################
###反代监控
frontend server
bind *:5670
log global
mode tcp
default_backend rabbitmq
maxconn 3
backend rabbitmq
mode tcp
log global
balance roundrobin
server rabbitmq1 192.168.1.38:5672 check inter 2000s rise 2 fall 3
server rabbitmq2 192.168.1.39:5672 check inter 2000s rise 2 fall 3
server rabbitmq3 192.168.1.40:5672 check inter 2000s rise 2 fall 3

[root@rabbitmq-1 ~]# systemctl restart haproxy

浏览器输入http://192.168..38/haproxy查看rabbitmq的状态。

常见命令

插件管理

开启某个插件:rabbitmq-plugins enable xxx
关闭某个插件:rabbitmq-plugins disable xxx
注意:重启服务器后生效。
用户管理

新建用户:rabbitmqctl add_user xxx pwd
删除用户: rabbitmqctl delete_user xxx
查看用户:rabbitmqctl list_users
改密码: rabbimqctl change_password {username} {newpassword}
设置用户角色:rabbitmqctl set_user_tags {username} {tag …}
Tag可以为 administrator,monitoring, management
其他使用命令:

rabbitmq使用命令:
rabbitmq-plugins list ----查看安装的插件
rabbitmq-server -detached -----------启动RabbitMQ节点
rabbitmqctl start_app ----------启动RabbitMQ应用,而不是节点
rabbitmqctl stop_app ------停止
rabbitmqctl status ------查看状态
rabbitmqctl add_user mq 123456 -------设置用户和密码
rabbitmqctl set_user_tags mq administrator ------------------新增账户并设置为管理员
rabbitmq-plugins enable rabbitmq_management --------------------启用RabbitMQ_Management
rabbitmqctl cluster_status -------------------集群状态
rabbitmqctl forget_cluster_node rabbit@rabbit3 -------------------节点摘除
rabbitmqctl reset application----------------------重置
rabbitmqctl set_permissions -p “/” soso “." ".” “.*” --------------授权

查看Connection,Queue,Channel,User

rabbitmqctl list_connections #列出所有连接
rabbitmqctl list_queues #列出所有队列
rabbitmqctl list_channels #列出所有通道
rabbitmqctl list_users #列出所有用户

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/126805
推荐阅读
相关标签
  

闽ICP备14008679号