当前位置:   article > 正文

RabbitMQ-topic exchange使用方法

RabbitMQ-topic exchange使用方法

RabbitMQ-默认读、写方式介绍

RabbitMQ-发布/订阅模式

RabbitMQ-直连交换机(direct)使用方法

目录

1、概述

2、topic交换机使用方法

2.1 适用场景

2.2 解决方案

3、代码实现

3.1 源代码实现

3.2 运行记录

4、小结


1、概述

topic 交换机是比直连交换机功能更加强大的交换方式,通过不同的路由规则,可以实现fanout、direct两种交换机的功能。

2、topic交换机使用方法

2.1 适用场景

假设我们要对动物做一个描述,根据速度、颜色、种类等特征对其进行分别入到不同的mq队列中,routing key的格式为:"<speed>.<colour>.<species>",比如说,所有黄色动物入队列1,跑的速度慢的,还有小兔子入队列2,哪该如何实现该需求呢?

2.2 解决方案

结合2.1描述的需求,我们可以画出如下框图:

知识点解释:

* (star) :和正则的功能类似,可以代表一整个单词。

# (hash) :代表0个或者多个单词。

如果一条消息的routing key为「quick.orange.rabbit」,将会被同时路由到Q1和Q2,「lazy.orange.elephant」的routing key同样也将会被同时路由到Q1和Q2,「quick.orange.fox」的消息只会被路由Q1,【lazy.brown.fox】只会被路由到Q2,【lazy.pink.rabbit】只会被路由到Q2一次,虽然匹配了两个binding,【quick.brown.fox】没有匹配到任何的绑定,那么消息将会被丢弃。

如果一个队列绑定的是【#】,那么他将会接收到所有的消息,会忽略调binding key,实现类似扇形交换机的功能。

如果一个routing key中没有设计【#】和【*】,那么他会实现类似直连交换机的功能。

3、代码实现

3.1 源代码实现

生产者:

  1. package main
  2. import (
  3. "fmt"
  4. amqp "github.com/rabbitmq/amqp091-go"
  5. )
  6. func main() {
  7. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  8. if err != nil {
  9. fmt.Println("Failed to connect to RabbitMQ")
  10. return
  11. }
  12. defer conn.Close()
  13. ch, err := conn.Channel()
  14. if err != nil {
  15. fmt.Println("Failed to open a channel")
  16. return
  17. }
  18. err = ch.ExchangeDeclare(
  19. "logs_topic", // name
  20. "topic", // type
  21. true, // durable
  22. false, // auto-deleted
  23. false, // internal
  24. false, // no-wait
  25. nil, // arguments
  26. )
  27. if err != nil {
  28. fmt.Println("Failed to declare an exchange,err:", err)
  29. return
  30. }
  31. body := "Hello World by topic exchange"
  32. err = ch.Publish(
  33. "logs_topic", // exchange
  34. "quick.orange.fox", // routing key
  35. false,
  36. false,
  37. amqp.Publishing{
  38. ContentType: "text/plain",
  39. Body: []byte(body),
  40. })
  41. if err != nil {
  42. fmt.Println("Failed to publish a message")
  43. return
  44. }
  45. }

代码示例中routing key为【quick.orange.fox】,这条消息将会被路由到2.2中的Q1队列中。

消费侧代码:

  1. package main
  2. import (
  3. "fmt"
  4. amqp "github.com/rabbitmq/amqp091-go"
  5. )
  6. func main() {
  7. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  8. if err != nil {
  9. fmt.Println("Failed to connect to RabbitMQ")
  10. return
  11. }
  12. defer conn.Close()
  13. ch, err := conn.Channel()
  14. if err != nil {
  15. fmt.Println("Failed to open a channel")
  16. return
  17. }
  18. err = ch.ExchangeDeclare("logs", "direct", true, false, false, false, nil)
  19. if err != nil {
  20. fmt.Println("Failed to declare an exchange")
  21. return
  22. }
  23. q, err := ch.QueueDeclare(
  24. "logs_topic", // name
  25. true, // durable
  26. false, // delete when unused
  27. false, // exclusive
  28. false, // no-wait
  29. nil, // arguments
  30. )
  31. err = ch.QueueBind(
  32. q.Name, // queue name
  33. "*.orange.*", // routing key(binding key)
  34. "logs_topic", // exchange
  35. false,
  36. nil,
  37. )
  38. msgs, err := ch.Consume(
  39. q.Name, // queue
  40. "", // consumer
  41. true, // auto-ack
  42. true, // exclusive
  43. false, // no-local
  44. false, // no-wait
  45. nil, // args
  46. )
  47. var forever chan struct{}
  48. go func() {
  49. for d := range msgs {
  50. fmt.Printf(" [x] %s\n", d.Body)
  51. }
  52. }()
  53. fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  54. <-forever
  55. }

3.2 运行记录

发送消息:

接收消息:

4、小结

学到这里发现,topic交换机完全具备fanout、direct两种交换机的全部功能,日常开发完全可以使用topic交换机,根据不同routing key即可以实现扇形和直连交换机的功能。

比如第3章节中消费者,routing key设置为【#】,则这个队列可以接收所有消息,类似扇形交换机功能。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/729738
推荐阅读
相关标签
  

闽ICP备14008679号