当前位置:   article > 正文

MQ——RabbitMQ集群原理

rabbitmq集群原理

摘要

主要围绕运维层面展开论述,主要包括集群搭建、日志查看、故障恢复、集群迁移、集群监控这几个方面。

RabbitMQ集群搭建

如果RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台RabbitMQ 服务器可以满足每秒1 000 条消息的吞吐量,那么如果应用需要RabbitMQ 服务满足每秒10万条消息的吞吐量呢?购买昂贵的服务器来增强单机RabbitMQ 服务的性能显得捉襟见肘, 搭建一个RabbitMQ 集群才是解决实际问题的关键。

RabbitMQ 集群允许消费者和生产者在RabbitMQ 单个节点崩惯的情况下继续运行, 它可以通过添加更多的节点来线性地扩展消息通信的吞吐量。当失去一个RabbitMQ 节点时,客户端能够重新连接到集群中的任何其他节点并继续生产或者消费。

不过RabbitMQ集群不能保证消息的万无一失,即将消息、队列、交换器等都设置为可持久化,生产端和消费端都正确地使用了确认方式。当集群中一个RabbitMQ 节点崩溃时,该节点上的所有队列中的消息也会丢失。RabbitMQ 集群中的所有节点都会备份所有的元数据信息,包括以下内容。

  • 队列元数据: 队列的名称及属性;
  • 交换器:交换器的名称及属性:
  • 绑定关系元数据: 交换器与队列或者交换器与交换器之间的绑定关系;
  • vhost 元数据:为vhost 内的队列、交换器和绑定提供命名空间及安全属性。

但是不会备份消息(当然通过特殊的配置比如镜像队列可以解决这个问题,基于存储空间和性能的考虑,在RabbitMQ 集群中创建队列,集群只会在单个节点而不是在所有节点上创建队列的进程包含完整的队列信息(元数据、状态、内容)。这样只有队列的宿主节点, 即所有者节点知道队列的所有信息, 所有其他非所有者节点只知道队列的元数据和指向该队列存在的那个节点的指针。因此当集群节点崩溃时,该节点的队列进程和关联的绑定都会消失。附加在那些队列上的消费者也会丢失其所订阅的信息, 井且任何匹配该队列绑定信息的新消息也都会消失

不同于队列那样拥有自己的进程,交换器其实只是一个名称和绑定列表。当消息发布到交换器时,实际上是由所连接的信道将消息上的路由键同交换器的绑定列表进行比较,然后再路由消息。当创建一个新的交换器时, RabbitMQ 所要做的就是将绑定列表添加到集群中的所有节点上。这样,每个节点上的每条信道都可以访问到新的交换器了。

集群的构建

假设这里一共有三台物理主机, 均己正确地安装了RabbitMQ ,且主机名分别为node1 , node2
和node3 RabbitMQ 集群对延迟非常敏感,应当只在本地局域网内使用。在广域网中不应该使
用集群,而应该使用Federation 或者Shove1 来代替。

192 . 168.0 .2node1
192 . 168.0 .3node2
192 . 168.0 .4node3

第一步,配置各个节点的hosts 文件,让各个节点都能互相识别对方的存在。在LÏnux 系统中可以编辑/etc/hosts 文件,在其上添加IP地址与节点名称的映射信息:

  1. 192.168.0.2 node1
  2. 192.168.0.3 node2
  3. 192.168.0.4 node3

第二步,编辑RabbitMQ 的cookie文件,以确保各个节点的cookie 文件使用的是同一个值。可以读取node1 节点的cookie 值, 然后将其复制到node2 和node3 节点中。cookie 文件默认路径为/var/lib/rabbitmq/.erlang . cookie 或者$HOME/ . erlang.cookieocookie 相当于密钥令牌,集群中的RabbitMQ 节点需要通过交换密钥令牌以获得相互认证。如果节点的密钥令牌不一致,那么在配置节点时就会有如下的报错,注意字体加粗部分。

第三步,配置集群。配置集群有三种方式:通过rabbitmq ctl工具配置:通过rabbitmq.config 配置文件配置; 通过rabbitmq-auto cluster1 插件配置。这里主要讲的是通过rabbitmqctl 工具的方式配置集群,这种方式也是最常用的方式。其余两种方式在实际应用中用之甚少, 所以不多做介绍。

  1. 首先启动nodel , node2 和node3 这3 个节点的RabbitMQ 服务。
  2. [root@nodel -]# rabbitmq-server -detached
  3. [root@node2 -]# rabbitmq-server -detached
  4. [root@node3 -]# rabbitmq-server -detached

接下来为了将3 个节点组成一个集群,需要以nodel 节点为基准,将node2 和node3 节点加入nodel 节点的集群中。这3 个节点是平等的,如果想调换彼此的加入顺序也未尝不可。首先将node2 节点加入nodel 节点的集群中,需要执行如下4 个命令步骤。

  1. [root@node2 -]# rabbitmqctl stop app
  2. Stopping rabbit application on node rabbit@node2
  3. [root@node2 -]# rabbitmqctl reset
  4. Resetting node rabb 工t@node2
  5. [root@node2 -]# rabbitmqctl join cluster rabbit@nodel
  6. Clustering node rabbit@node2 with rabbit@nodel
  7. [root@node2 -]# rabbitmqctl start_app
  8. Starting node rabbit@node2

如此, nodel 节点和node2 节点便处于同一个集群之中,我们在这两个节点上都执行rabbitmqctl cluster status 命令可以看到同样的输出。

  1. [{nodes , [{d 工sc , [rabbit@nodel , rabb 工t@node2] }] },
  2. {runn 工ng nodes , [rabb 工t@nodel , rabbit@ 口ode2] },
  3. {cluster name , << " rabb 工t@nodel " >>} ,
  4. {partit 工ons , [] },
  5. {alarms , [{ rabbi t@nodel , [] } , {rabbi t@node2 , [] } ] }]

最后将node3 节点也加入nodel 节点所在的集群中,这3 个节点组成了一个完整的集群。在任意一个节点中都可以看到如下的集群状态。

  1. [{nodes , [{disc, [rabbit@nodel , rabbit@node2 , rabbit@node3]}]} ,
  2. {running nodes , [rabbit@node l , rabbit@node2 , rabbit @node3]} ,
  3. {cluster name , << " rabbit@nodel " >>l ,
  4. {partitions , []},
  5. {alarms , [{ rabbit@nodel , [] } , {rabbi t@node2 , []} , {rabbi t@node3 , [] } ] 1 ]

节点的删除

创建集群的过程可以看作向集群中添加节点的过程。那么如何将一个节点从集群中剔除呢?这样可以让集群规模变小以节省硬件资源,或者替换一个机器性能更好的节点。同样以nodel 、node2 和node3 组成的集群为例,这里有两种方式将node2 剥离出当前集群。

  1. 第二种方式是在node2 上执行rabbitmqctl reset 命令。如果不是像上面由于启动顺序的缘故而不得不删除一个集群节点,建议采用这种方式。
  2. [root@node2 -] # rabbitmqctl stop app
  3. Stopping rabbit application on node rabbit@node2
  4. [root@node2 -]# rabbitmqctl reset
  5. Resetting node rabbit@node2
  6. [root@node2 -] # rabbitmqctl start app
  7. Starting node rabbit@node2
  8. 如果从node2 节点上检查集群的状态, 会发现它现在是独立的节点。同样在集群中剩余的节点nodel 和node3 上看到node2 已不再是集群中的一部分了。
  9. 正如之前所说的, rabbitmqctl reset 命令将清空节点的状态, 并将其恢复到空白状态。当重设的节点是集群中的一部分时, 该命令也会和集群中的磁盘节点进行通信, 告诉它们该节点正在离开集群。不然集群会认为该节点出了故障, 并期望其最终能够恢复过来。
  1. 第一种,首先在node2 节点上执行rabbitmqctl stop_app或者rabbitmqctl stop命令来关闭RabbitMQ 服务。之后再在nodel节点或者node3 节点上执行rabbitmqctlforget_cluster_node rabbit@node2 命令将nodel节点剔除出去。这种方式适合node2节点不再运行RabbitMQ 的情况。
  2. [root@nodel -]# rabbitmqctl forget cluster_node rabbit@node2
  3. Removing node rabbit@node2 from cluster
  4. 在关闭集群中的每个节点之后,如果最后一个关闭的节点最终由于,某些异常而无法启动,则可以通过rabbi tmqctl forget_cluster_node 命令来将此节点剔除出当前集群。
  5. 举例,集群中节点按照node3 、node2 、nodel 的顺序关闭,此时如果要启动集群, 就要先启动nodel 节点。
  6. [root@node3 -]# rabbitmqctl stop
  7. Stopping and halting node rabbit@node3
  8. [root@node2 -j# rabbitmqctl stop
  9. Stopping and halting node rabbit@node2
  10. [root@nodel -]# rabbitmqctl stop
  11. Stopping and halting node rabbit@nodel
  12. 这里可以在node2 节点中执行命令将nodel 节点剔除出当前集群。
  13. [root@node2 -]# rabbitmqctl forget_cluster_node rabbit@nodel -offline
  14. Removing node rabbit@nodel from cluster
  15. * Impersonating node : rabbit@node2 ... done
  16. * Mnesia directory : /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node2
  17. [root@node2 -] # rabbitmq-server -detached
  18. Warning: PID file not written ; -detached was passed.
  19. [root@node2 -]# rabbitmqctl cluster status
  20. Cluster status of node rabbit@node2
  21. [{nodes , [{d 工sc , [rabbit@node2 , rabbit@node3]}]} ,
  22. {run 口工ng nodes , [rabbit@node2]} ,
  23. {cluster name , << " rabbit@nodel " >>} ,
  24. {partitions , []} ,
  25. {alarms , [{rabbit@node2 , []}]}]

集群节点的升级

如果RabbitMQ 集群由单独的一个节点组成,那么升级版本很容易,只需关闭原来的服务,然后解压新的版本再运行即可。不过要确保原节点的Mnesia 中的数据不被变更,且新节点中的Mnesia 路径的指向要与原节点中的相同。或者说保留原节点Mnesia 数据, 然后解压新版本到相应的目录,再将新版本的Mnesia 路径指向保留的Mnesia 数据的路径(也可以直接复制保留的Mnesia 数据到新版本中相应的目录) ,最后启动新版本的服务即可。

  1. (1)关闭所有节点的服务, 注意采用rabbitmqctl stop 命令关闭。
  2. (2) 保存各个节点的Mnesia 数据。
  3. (3)解压新版本的RabbitMQ 到指定的目录。
  4. (4) 指定新版本的Mnesia 路径为步骤2 中保存的Mnesia 数据路径。
  5. (5) 启动新版本的服务,注意先重启原版本中最后关闭的那个节点。

其中步骤4 和步骤5 可以一起操作,比如执行RABBITMQ MNESIA BASE=/opt/mnesiarabbitmq-server-detached 命令,其中/opt/mnesia 为原版本保存Mnesia 数据的路径。

RabbitMQ 的版本有很多, 难免会有数据格式不兼容的现象, 这个缺陷在越旧的版本中越发凸显,所以在对不同版本升级的过程中,最好先测试两个版本互通的可能性,然后再在线上环境中实地操作。

如果原集群上的配置和数据都可以舍弃,则可以删除原版本的RabbitMQ ,然后再重新安装配置即可:如果配置和数据不可丢弃,则按照之前所述保存元数据,之后再关闭所有生产者并等待消费者消费完队列中的所有数据,紧接着关闭所有消费者,然后重新安装RabbitMQ 并重建元数据等。

RabbitMQ日志查看

如果在使用RabbitMQ 的过程中出现了异常情况,通过翻阅RabbitMQ 的服务日志可以让你在处理异常的过程中事半功倍。RabbitMQ 日志中包含各种类型的事件,比如连接尝试、服务启动、插件安装及解析请求时的错误等。本节首先举几个例子来展示一下RabbitMQ 服务日志的内容和日志的等级,接着再来阐述如何通过程序化的方式来获得日志及对服务日志的监控。

RabbitMQ 的日志默认存放在$RABBITMQ HOME/var/log/rabbitmq 文件夹内。在这个文件夹内Rabbi tMQ 会创建两个日志文件: RABBITMQ_NODENAME-sasl.log 和RABBITMQ NODENAME . log 。

SASL ( System Application Support Libraries ,系统应用程序支持库)是库的集合,作为Erlang-OTP 发行版的一部分。它们帮助开发者在开发Erlang 应用程序时提供一系列标准,其中之一是日志记录格式。所以当RabbitMQ 记录Erlang 相关信息时,它会将日志写入文件RABBITMQ_NODENAME-sasl . log 中。举例来说,可以在这个文件中找到Erlang 的崩横报告,有助于调试无法启动的RabbitMQ 节点。

RabbitMQ故障恢复

在RabbitMQ 使用过程中,或多或少都会遇到一些故障。对于集群层面来说,更多的是单点故障。所谓的单点故障是指集群中单个节点发生了故障,有可能会引起集群服务不可用、数据丢失等异常。配置数据节点冗余(镜像队列)可以有效地防止由于单点故障而降低整个集群的可用性、可靠性,本节主要讨论的是单节点故障有哪些,以及怎么恢复或者处理相应类型的单节点故障。

单节点故障包括:机器硬件故障、机器掉电、网络异常、服务进程异常。

  • 单节点机器硬件故障包括机器硬盘、内存、主板等故障造成的死机,无法从软件角度来恢复。此时需要在集群中的其他节点中执行rabbitmqctl forget cluster node{nodename} 命令来将故障节点剔除,其中nodename 表示故障机器节点名称。如果之前有客户端连接到此故障节点上,在故障发生时会有异常报出,此时需要将故障节点的IP地址从连接列表里删除,并让客户端重新与集群中的节点建立连接,以恢复整个应用。如果此故障机器修复或者原本有备用机器,那么也可以选择性的添加到集群中。
  • 当遇到机器掉电故障,需要等待电源接通之后重启机器。此时这个机器节点上的RabbitMQ处于stop 状态,但是此时不要盲目重启服务,否则可能会引起网络分区。此时同样需要在其他节点上执行rabbitmqctl forget_cluster_node{nodename} 命令将此节点从集群中剔除,然后删除当前故障机器的RabbitMQ 中的Mnesia数据(相当于重置),然后再重启RabbitMQ 服务,最后再将此节点作为一个新的节点加入到当前集群中。
  • 网线松动或者网卡损坏都会引起网络故障的发生。对于网线松动,无论是彻底断开,还是"藕断丝连",只要它不降速, RabbitMQ 集群就没有任何影响。但是为了保险起见,建议先关闭故障机器的RabbitMQ 进程,然后对网线进行更换或者修复操作,之后再考虑是否重新开启RabbitMQ 进程。而网卡故障极易引起网络分区的发生,如果监控到网卡故障而网络分区尚未发生时,理应第一时间关闭此机器节点上的RabbitMQ 进程,在网卡修复之前不建议再次开启。如果己经发生了网络分区。
  • 对于服务进程异常,如RabbitMQ 进程非预期终止,需要预先思考相关风险是否在可控范围之内。如果风险不可控,可以选择抛弃这个节点。一般情况下,重新启动RabbitMQ 服务进程即可。

RabbitMQ集群迁移

对于RabbitMQ 运维层面来说,扩容和迁移是必不可少的。扩容比较简单,一般向集群中加入新的集群节点即可,不过新的机器节点中是没有队列创建的,只有后面新创建的队列才有可能进入这个新的节点中。或者如果集群配置了镜像队列,可以通过一点"小手术"将原先队列"漂移"到这个新的节点中。

迁移同样可以解决扩容的问题,将旧的集群中的数据(包括元数据信息和消息)迁移到新的且容量更大的集群中即可。RabbitMQ 中的集群迁移更多的是用来解决集群故障不可短时间内修复而将所有的数据、客户端连接等迁移到新的集群中,以确保服务的可用性。相比于单点故障而言,集群故障的危害性就大得多,比如IDC 整体停电、网线被挖断等。这时候就需要通过集群迁移重新建立起一个新的集群。

RabbitMQ 集群迁移包括元数据重建、数据迁移,以及与客户端连接的切换。

RabbitMQ集群监控

任何应用功能再强大、性能再优越,如果没有与之匹配的监控那么一切都是虚无缭绕妙的。监控不仅可以提供运行时的数据为应用提供依据参考, 还可以迅速定位问题、提供预防及告警等功能,很大程度上增强了整体服务的鲁棒性。RabbitMQ 扩展的RabbitMQ Management 插件就能提供一定的监控功能。Web 管理界面提供了很多的统计值信息: 如发送速度、确认速度、消费速度、消息总数、磁盘读写速度、句柄数、Socket 连接数、Connection 数、Channel数、内存信息等。总体上来说, RabbitMQ Management 插件提供的监控页面是相对完善的,在实际应用中具有很高的使用价值。但是有一个遗憾就是其难以和公司内部系统平台关联, 对于业务资源的使用情况、相应的预防及告警的联动无法顺利贯通。如果在人力、物力等条件允许的情况下,自定义一套监控系统非常有必要。

通过HTTPAPI 接口提供监控数据

那么监控数据从哪里来呢? RabbitMQ Management 插件不仅提供了一个优秀的Web 管理界面,还提供了盯TP API 接口以供调用。下面以集群、交换器和队列这3 个角度来阐述如何通过HTTPAPI 获取监控数据。

假设集群中一共有4 个节点node 1 、node2 、node3 和node4 , 有一个交换器exchange 通过
同一个路由键" rk" 绑定了3 个队列queue 1 、queue2 和queue3 。

下面首先收集集群节点的信息, 集群节点的信息可以通过/api/nodes 接口来获取。有关从/ api / nodes 接口中获取到数据的结构可以参考附录B ,其中包含了很多的数据统计项,可以挑选感兴趣的内容进行数据收集。

数据来集完之后并没有结束,图7-10 中简单囊括了从数据采集到用户使用的过程。首先采集程序通过定时调用HTTPAPI 接口获取JSON 数据,然后进行JSON 解析之后再进行持久化处理。对于这种基于时间序列的数据非常适合使用OpenTSDB6来进行存储。监控管理系统可以根据用户的检索条件来从OpenTSDB(基于Hbase 的分布式的,可伸缩的时间序列数据库。主要用途就是做监控系统,比如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储、查询。) 中获取相应的数据并展示到页面之中。监控管理系统本身还可以具备报表、权限管理等功能,同时也可以实时读取所采集的数据,对其进行分析处理,对于异常的数据需要及时报告给相应的人员。

对于交换器而言的数据采集可以调用/api/exchanges/vhost/name 接口, 比如需要调用虚拟主机为默认的"/气交换器名称为exchange 的数据, 只需要使用HTTP GET 方法获取http ://xxx. xxx.xxx . xxx : 15672/apνexchanges/%2F / exchange 的数据即可。注意, 这里需要将" /"进行HTML 转义成" %2F ",否则会出错。

对于队列而言的数据来集相关的接口为/api/queues/vhost/name ,对应的数据结构可以参考下方内容,同时参考前面的内容进行相应的编码逻辑。

通过客户端提供监控数据

Java 版客户端C3 . 6 .x 版本开始〉中Channel 接口中也提供了两个方法来获取数据。定义如下:

  1. /**
  2. * Returns the number of messages i 口a queue ready to be delivered
  3. * to consumers. This method assumes the queue exists . If it doesn ' t ,
  4. * an exception will be closed with an exception .
  5. * @param queue the 口ame of the queue
  6. * @return the number of messages in ready state
  7. * @throws IOException Problem transmitting method.
  8. */
  9. long messageCount(String queue) throws IOException;
  10. /**
  11. * Returns the number of consumers on a queue .
  12. * This method assumes the queue ex 工sts. If it doesn't ,
  13. * an except 工on wil1 be closed with an exception.
  14. * @param queue the name of the queue
  15. * @return the number of consumers
  16. * @throws IOException Problem transmitting method .
  17. */
  18. long consumerCount(String queue) throws IOException;

messageCount (String queue) 用来查询队列中的消息个数,可以为监控消息堆积的情况提供数据。consumerCount(String queue) 用来查询队列中的消费者个数, 可以为监控消费者的情况提供数据。除了这两个方法,也可以通过连接的状态进行监控。Java 客户端中Connection 接口提供了addBlockedListenerCBlockedListenerlistener) 方法(用来监昕连接阻塞信息)和addShutdownListener CShutdownListener listener) 方法(用来监昕连接关闭信息)

  1. try {
  2. Connection connection = connectionFactory.newConnection();
  3. connection.addShutdownListener(new ShutdownListener() {
  4. public void shutdownCompleted(ShutdownSignalException cause) {
  5. //处理并记录连接关闭事项
  6. }
  7. });
  8. connection.addBlockedListener(new BlockedListener() {
  9. public void handleBlocked(String reason) throws IOException {
  10. //处理并记录连接阻塞事项
  11. }
  12. public void handleUnblocked() throws IOException {
  13. //处理并记录连接阻塞取消事项
  14. }
  15. });
  16. Channel channel = connection.createChannel();
  17. long msgCount = channel.messageCount(" queuel ");
  18. long consumerCount = channel.consumerCount("queuel");
  19. //记录msgCount 和consumerCount
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. } catch (TimeoutException e){
  23. e.printStackTrace();
  24. }

用户客户端还可以自行定义一些数据进行埋点,比如客户端成功发送的消息个数和发送失败的消息个数, 进一步可以计算发送消息的成功率等。

检测RabbitMQ 服务是否健康

不管是通过HTTP API 接口还是客户端,获取的数据都是以作监控视图之用,不过这一切都基于RabbitMQ 服务运行完好的情况下。虽然可以通过某些其他工具或方法来检测RabbitMQ进程是否在运行(如ps aux I grep rabbitmq) ,或者5672 端口是否开启(如telnetxxx.xxx . xxx . xxx 5672) ,但是这样依旧不能真正地评判RabbitMQ 是否还具备服务外部请求的能力。这里就需要使用AMQP 协议来构建一个类似于TCP 协议中的Ping 的检测程序。当这个测试程序与RabbitMQ 服务无法建立TCP 协议层面的连接,或者无法构建AMQP 协议层面的连接,再或者构建连接超时时,则可判定RabbitMQ 服务处于异常状态而无法正常为外部应用提供相应的服务。

元数据管理与监控

确保RabbitMQ 能够健康运行还不足以让人放松警惕。考虑这样一种情况:小明为小张创建了一个队列井绑定了一个交换器,之后某人由于疏忽而阴差阳错地删除了这个队列而无人得知,最后小张在使用这个队列的时候就会报出"NOT FOUND" 的错误。如果这些在测试环境中发生,那么还可以弥补。在实际生产环境中,如果误删了一个队列,必然会造成不可估计的影响。此时业务方如果正在使用这个队列,正常情况下会立刻报出异常,相关人员可以迅速做出动作以尽可能地降低影响。试想如果是一个定时任务调用此队列,并在深夜3 点执行相应的逻辑,此时报出异常想必也会对相关人员造成不小的精神骚扰。

不止删除队列这一个方面,还有删除了一个交换器,或者修改了绑定信息,再或者是胡乱建立了一个队列绑定到现有的一个交换器中,同时又没有消费者订阅消费此队列,从而留下消息堆积的隐患等都会对使用RabbitMQ 服务的业务应用造成影响。所以对于RabbitMQ 元数据的管理与监控也尤为重要。

许多应用场景是在业务逻辑代码中创建相应的元数据资源(交换器、队列及绑定关系)并使用。对于排他的、自动删除的这类非高可靠性要求的元数据资源可以在一定程度上忽略元数据变更的影响。但是对于两个非常重要的且通过消息中间件交互的业务应用,在使用相应的元数据资源时最好进行相应的管控,如果一方或者其他方肆意变更所使用的元数据,必然对另一方造成不小的损失。管控的介入自然会降低消息中间件的灵活度,但是可以增强系统的可靠性。比如通过专用的"元数据审核系统"来配置相应的元数据资源,提供给业务方使用的用户只有可读和可写的权限,这样可以进一步降低风险。

非管控的元数据可以天马行空,业务方可以在这一时刻创建,下一时刻就删除,对其监控也无太大的意义。对于管控的元数据来说,监控的介入就会有意义也会有必要很多。虽然对于只有可读写权限的用户不能够变更元数据信息,也难免会被其他具有可配置权限的超级用户篡改。RabbitMQ 中在创建元数据资源的时候是以一种声明的形式完成的:无则创建、有则不变,不过在对应的元数据存在的情况下,对其再次声明时使用不同的属性会报出相应的错误信息。我们可以利用这一特性来监控元数据的变更,通过定时程序来将记录中的元数据信息重新声明一次,查看是否有异常报出。不过这种方法非常具有局限性,只能增加元数据的信息而不能减少。比如有一个队列没有消费者且以后也不会被使用,我们对其进行了解绑操作,这样就没有更多的消息流入而造成消息堆积,不过这一变更由于某些局限性没有及时将记录变更以通知到那个定时程序,此时又重新将此队列绑定到原交换器中。

这里列举一个简单的元数据管控和监控的示例来应对此种情况,此系统并非最优,但可以给读者在实际应用时提供一种解决对应问题的思路。

如图7-15 所示, 所有的业务应用都需要通过元数据审核系统来申请创建(当然也可以包含查询、修改及删除〉相应的元数据信息。在申请动作完成之后,由专门的人员进行审批,之后在数据库中存储和在RabbitMQ 集群中创建相应的元数据,这两个步骤可以同时进行,而且也无须为这两个动作添加强一致性的事务逻辑。在数据库和RabbitMQ 集群之间会有一个元数据一致性校验程序来检测元数据不一致的地方,然后将不一致的数据上送到监控管理系统。监控管理系统中可以显示元数据不一致的记录信息, 也可以以告警的形式推送出来,然后相应的管理人员可以选择手动或者自动地进行元数据修正。这里的不一致有可能是由于数据库的记录未被正确及时地更新, 也有可能是RabbitMQ 集群中元数据被异常篡改。元数据修正需慎之又慎,在整个系统修正逻辑完备之前, 建议优先采用人工的方式,毕竟不一致的元数据仅占少数, 人工修正的工作量并不太大。

RabbitMQ 的元数据可以很顺利地以表的形式记录在数据库中, 参考附录A , 主要的元数据是queues 、exchanges 和bindings ,可以分别建立三张表。

  1. Table 1: 队列信息表,名称为rmq queues 。列名有name 、vhost 、durable 、auto delete 、arguments 、cluster name 、description ,其中name 、durable 、auto delete 、arguments 可以参考3.2.2 节中的内容。vhost 表示虚拟主机。cluster name 表示队列所在的集群名称,毕竟一般一个公司所用的RabbitMQ 集群井非只有一个。description 是相应的描述信息,相当于备注,通常可置为空。
  2. Table 2: 交换器u信息表,名称为rmq_exchanges 。列名有name 、vhost 、type 、durable 、auto delete , internal , arguments 、cluster name 、description 。其中name 、type 、durable 、auto delete 、internal 、arguments 可参考3 .2 .1 节中的内容。vhost 、cluster 口ame 和description 可参考Table 10
  3. Table 3: 绑定信息表,名称为rmq bindings 0 列名有source 、vhost 、destination 、estination_type 、routing_key 、argume 口ts 、cluster name 、descriptio 口。其中source 、vhost 、destinationdestination type 、routi呵key 、argume口ts 可以参考3 .2.3 节和3 .2.4节中的内容。vhost 、cluster name 和description 可参考Table1

元数据一致性检测程序可以通过/api/definitions 的HTTP API 接口获取集群的元数据信息,通过解析之后与数据库中的记录一一比对,查看是否有不一致的地方。

博文参考

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/457220
推荐阅读
相关标签
  

闽ICP备14008679号