当前位置:   article > 正文

RabbitMQ exchange binding queue原理_

RabbitMQ根据exchange的类型规则将消息路由到相应的queue,其中exchange的类型包括direct,topic,fanout以及headers,我们这里主要介绍topic exchange类型的消息路由原理,其他exchange的路由原理相对简单。Topic exchange的示意图如下:

  • *(星号):可以(只能)匹配一个单词。
  • #(井号):可以匹配多个单词(或者0个)。

这里的使用原理不做介绍,主要介绍RabbitMQ源码内部的实现,即exchange和queue如何进行binding的。

1. exchange与queue的binding

RabbitMQ的exchange和queue的binding代码流程如下:

对于RabbitMQ的使用流程一般为:

  • RabbitMQ客户端与RabbitMQ服务端建立连接。
  • 建立channel。
  • 在建立的channel创建exchange和queue。
  • 将exchange和queue进行binding。
  • 对于生产者而已,向exchange发送消息,且需要指定相应的routing_key。
  • 对于消费者而已,消费处理监听的queue中的消息。

其中exchange与queue的binding便位于第4个步骤,入口代码如下:

  1. %% rabbit_channel.erl
  2. %% 处理将交换机exchange和队列进行绑定的消息
  3. handle_method(#'queue.bind'{queue = QueueNameBin,
  4. exchange = ExchangeNameBin,
  5. routing_key = RoutingKey,
  6. nowait = NoWait,
  7. arguments = Arguments}, _, State) ->
  8. binding_action(fun rabbit_binding:add/2,
  9. ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
  10. #'queue.bind_ok'{}, NoWait, State);

在binding_action函数的一些合法性校验之后,执行真正的binding函数rabbit_binding:add/2。

  1. %% rabbit_binding.erl
  2. %% 增加新的绑定
  3. add(Binding, InnerFun) ->
  4. binding_action(
  5. Binding,
  6. %% Src,Dst都是从mnesia数据库中读取到的数据
  7. fun (Src, Dst, B) ->
  8. %% 找到对应交换机的处理模块去验证绑定的合法性(Src必须是exchange交换机类型)
  9. case rabbit_exchange:validate_binding(Src, B) of
  10. ok ->
  11. %% this argument is used to check queue exclusivity(排他性);
  12. %% in general, we want to fail on that in preference to
  13. %% anything else
  14. %% 功效一:检查队列字段exclusive_owner字段的正确性,检查排他性队列(如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见)
  15. case InnerFun(Src, Dst) of
  16. ok ->
  17. %% 查看rabbit_route表中是否有B这个数据,如果没有则添加到mnesia数据库表中
  18. case mnesia:read({rabbit_route, B}) of
  19. %% 增加绑定的实际操作(实际操作mnesia数据库表的函数)
  20. [] -> add(Src, Dst, B);
  21. [_] -> fun () -> ok end
  22. end;
  23. {error, _} = Err ->
  24. rabbit_misc:const(Err)
  25. end;
  26. {error, _} = Err ->
  27. rabbit_misc:const(Err)
  28. end
  29. end, fun not_found_or_absent_errs/1).

在对应的exchange类型中校验其合法性,校验正常后,需要读取mnesia数据库中rabbit_route表中数据,查看是否有相应的binding信息,如果有则不再进行binding,没有则进行binding操作,判断rabbit_route表数据的原因是:不管exchange和queue是否需要持久化,最后都会将其binding信息写入到rabbit_route表中。

这里提一下,RabbitMQ所保存的一些metadata信息依赖于mnesia分布式数据库,其中RabbitMQ所使用的表可以通过以下命令进行显示:

  1. [root@master scripts]# ./rabbitmqctl eval 'mnesia:info().'
  2. ……
  3. ===> System info in version "4.16", debug level = none <===
  4. opt_disc. Directory "/var/lib/rabbitmq/mnesia/rabbit@master" is used.
  5. use fallback at restart = false
  6. running db nodes = [rabbit@master]
  7. stopped db nodes = []
  8. master node tables = []
  9. remote = []
  10. ram_copies = [gm_group,mirrored_sup_childspec,rabbit_exchange,
  11. rabbit_exchange_serial,
  12. rabbit_exchange_type_consistent_hash,
  13. rabbit_exchange_type_consistent_hash_ring_state,
  14. rabbit_listener,rabbit_queue,rabbit_reverse_route,
  15. rabbit_route,rabbit_semi_durable_route,
  16. rabbit_topic_trie_binding,rabbit_topic_trie_edge,
  17. rabbit_topic_trie_node,rh_exchange_table,
  18. tracked_connection_on_node_rabbit@master,
  19. tracked_connection_per_vhost_on_node_rabbit@master,
  20. x_jms_topic_table]
  21. disc_copies = [rabbit_durable_exchange,rabbit_durable_queue,
  22. rabbit_durable_route,rabbit_runtime_parameters,
  23. rabbit_topic_permission,rabbit_user,
  24. rabbit_user_permission,rabbit_vhost,schema]
  25. disc_only_copies = []
  26. [{rabbit@master,disc_copies}] = [rabbit_runtime_parameters,
  27. rabbit_durable_exchange,rabbit_durable_queue,
  28. rabbit_user,rabbit_durable_route,
  29. rabbit_topic_permission,rabbit_vhost,schema,
  30. rabbit_user_permission]
  31. [{rabbit@master,ram_copies}] = [rabbit_topic_trie_node,
  32. tracked_connection_per_vhost_on_node_rabbit@master,
  33. x_jms_topic_table,
  34. rabbit_exchange_type_consistent_hash_ring_state,
  35. rabbit_reverse_route,
  36. rabbit_topic_trie_binding,
  37. rabbit_exchange_type_consistent_hash,gm_group,
  38. rabbit_listener,mirrored_sup_childspec,
  39. rabbit_exchange,rabbit_route,
  40. rabbit_exchange_serial,
  41. tracked_connection_on_node_rabbit@master,
  42. rabbit_semi_durable_route,rh_exchange_table,
  43. rabbit_queue,rabbit_topic_trie_edge]
  44. 21 transactions committed, 7 aborted, 0 restarted, 0 logged to disc
  45. 0 held locks, 0 in queue; 0 local transactions, 0 remote
  46. 0 transactions waits for other nodes: []
  47. ok

其中disc_copies表示会将数据存入磁盘和内存,ram_copies表示将数据存入内存。disc_only_copies表示将数据只存入磁盘。由于mnesia底层是基于ets和dets实现,所以可以通过ets或者dets查看表中数据,注意ets和dets是只能查询本节点的表数据信息。如查看rabbit_exchang表数据信息:

  1. [root@master scripts]# ./rabbitmqctl eval 'ets:tab2list(rabbit_exchange).'
  2. [{exchange,{resource,<<"/">>,exchange,<<"amq.direct">>},
  3. direct,true,false,false,[],undefined,undefined,undefined,
  4. {[],[rabbit_event_exchange_decorator]},
  5. #{}},
  6. ……
  7. {exchange,{resource,<<"/">>,exchange,<<"amq.headers">>},
  8. headers,true,false,false,[],undefined,undefined,undefined,
  9. {[],[rabbit_event_exchange_decorator]},
  10. #{}}]

具体参看erlang官方文档的mnesia,ets和dets的使用说明。继续回到binding操作,假设第一次进行binding操作,则将执行下列操作:

  1. %% rabbit_binding.erl
  2. %% 增加绑定的实际操作(实际操作mnesia数据库表的函数)
  3. add(Src, Dst, B) ->
  4. [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
  5. case (SrcDurable andalso DstDurable andalso
  6. mnesia:read({rabbit_durable_route, B}) =/= []) of
  7. false -> %% 根据交换机exchange和队列的持久化状态,将路由信息写入mnesia数据库表
  8. ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
  9. fun mnesia:write/3),
  10. %% 交换机exchange创建后的回调
  11. x_callback(transaction, Src, add_binding, B),
  12. %% 让Src对应的exchange交换机类型对应的模块和所有的修饰模块回调serialise_events函数,如果执行成功,则将XName在rabbit_exchange_serial表中的next值加一
  13. Serial = rabbit_exchange:serial(Src),
  14. fun () ->
  15. x_callback(Serial, Src, add_binding, B),
  16. %% 向rabbit_event事件中心发布绑定信息被创建的事件
  17. ok = rabbit_event:notify(binding_created, info(B))
  18. end;
  19. true -> rabbit_misc:const({error, binding_not_found})
  20. end.

sync_route函数将exchange和queue的binding信息根据exchange和queue是否持久化来判断是否写入rabbit_durable_route和rabbit_semi_durable_route表中,但无论是否持久化都会写入rabbit_route和rabbit_reverse_route表中。即

  • rabbit_durable_route表:exchange和queue都持久化则写入该表。
  • rabbit_semi_durable_route表:exchange不需要持久化,queue需要持久化则写入该表。
  • rabbit_route和rabbit_reverse_route表:exchange和queue是否持久化都会写入该表。

除了binding信息写入上述表以外,对于类型为topic的exchange,RabbitMQ还会将binding信息写入rabbit_topic_trie_node,rabbit_topic_trie_edge和rabbit_topic_trie_binding表中。

  1. %% rabbit_exchange_type_topic.erl
  2. %% 路由绑定信息的添加回调该模块进行相关的处理(transaction:事务)
  3. add_binding(transaction, _Exchange, Binding) ->
  4. internal_add_binding(Binding);
  5. %% rabbit_exchange_type_topic.erl
  6. %% exchange交换机内部添加绑定信息的接口
  7. internal_add_binding(#binding{source = X, key = K, destination = D,
  8. args = Args}) ->
  9. %% 创建新的节点和边
  10. FinalNode = follow_down_create(X, split_topic_key(K)),
  11. %% 创建绑定信息
  12. trie_add_binding(X, FinalNode, D, Args),
  13. ok.

其内部原理可参考以下链接:https://www.erlang-solutions.com/blog/rabbit-s-anatomy-understanding-topic-exchanges.html

再提一下,我们执行rabbitmqctl list_bindings命令时,查看到的exchange和queue的bindings信息是从rabbit_route表中进行获取的。即

  1. %% rabbit_binding.erl
  2. %% 列出VHostPath下的所有路由绑定信息
  3. list(VHostPath) ->
  4. VHostResource = rabbit_misc:r(VHostPath, '_'),
  5. Route = #route{binding = #binding{source = VHostResource,
  6. destination = VHostResource,
  7. _ = '_'},
  8. _ = '_'},
  9. [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
  10. Route)].

2. 总结

1 RabbitMQ的exchange和queue的binding的建立在客户端与RabbitMQ之间建立连接且exchange和queue创建完成后。

2 binding信息会被保存到mnesia数据库中,其保存的表为rabbit_route表,根据exchange和queue是否持久化判断是否保存到rabbit_durable_route和rabbit_semi_durable_route表中。

3 对于topic类型的exchange,其binding信息还会被保存到rabbit_topic_trie_node,rabbit_topic_trie_edge和rabbit_topic_trie_binding表中。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/513239
推荐阅读
相关标签
  

闽ICP备14008679号