当前位置:   article > 正文

go消息队列RabbitMQ - 订阅模式-fanout_订阅rabbitmq消息

订阅rabbitmq消息

1、发布订阅 

订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

  • 相关场景:邮件群发,群聊天,广播(广告)

2、Exchanges(交换器)

消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

有几种交换器类型可用:directtopicheaders 和 fanout。我们将集中讨论最后一个——fanout

2.1 创建交换器

创建一个这种类型的交换器,并给它起个名字叫logs

  1. err = ch.ExchangeDeclare(
  2. "logs", // name
  3. "fanout", // type
  4. true, // durable
  5. false, // auto-deleted
  6. false, // internal
  7. false, // no-wait
  8. nil, // arguments
  9. )

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。 

2.2 临时队列

也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。

自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。

  1. q, err := ch.QueueDeclare(
  2. "", // 空字符串作为队列名称
  3. false, // 非持久队列
  4. false, // delete when unused
  5. true, // 独占队列(当前声明队列的连接关闭后即被删除)
  6. false, // no-wait
  7. nil, // arguments
  8. )

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

2.3 交换器与队列绑定

 

交换器和队列之间的关系称为绑定

  1. err = ch.QueueBind(
  2. q.Name, // queue name
  3. "", // routing key
  4. "logs", // exchange
  5. false,
  6. nil,
  7. )

从现在开始,logs交换器将会把消息添加到我们的队列中。

2.4 发布消息到交换机

例如,发布到fanout交换器:

  1. body := bodyFrom(os.Args)
  2. err = ch.Publish(
  3. "logs", // exchange
  4. "", // routing key
  5. false, // mandatory
  6. false, // immediate
  7. amqp.Publishing{
  8. ContentType: "text/plain",
  9. Body: []byte(body),
  10. })

 3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "strings"
  6. "github.com/streadway/amqp"
  7. )
  8. func failOnError(err error, msg string) {
  9. if err != nil {
  10. log.Fatalf("%s: %s", msg, err)
  11. }
  12. }
  13. func main() {
  14. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  15. failOnError(err, "Failed to connect to RabbitMQ")
  16. defer conn.Close()
  17. ch, err := conn.Channel()
  18. failOnError(err, "Failed to open a channel")
  19. defer ch.Close()
  20. err = ch.ExchangeDeclare(
  21. "logs", // name
  22. "fanout", // type
  23. true, // durable
  24. false, // auto-deleted
  25. false, // internal
  26. false, // no-wait
  27. nil, // arguments
  28. )
  29. failOnError(err, "Failed to declare an exchange")
  30. body := bodyFrom(os.Args)
  31. err = ch.Publish(
  32. "logs", // exchange
  33. "", // routing key
  34. false, // mandatory
  35. false, // immediate
  36. amqp.Publishing{
  37. ContentType: "text/plain",
  38. Body: []byte(body),
  39. })
  40. failOnError(err, "Failed to publish a message")
  41. log.Printf(" [x] Sent %s", body)
  42. }
  43. func bodyFrom(args []string) string {
  44. var s string
  45. if (len(args) < 2) || os.Args[1] == "" {
  46. s = "hello"
  47. } else {
  48. s = strings.Join(args[1:], " ")
  49. }
  50. return s
  51. }

emit_logs.go源码

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

  1. package main
  2. import (
  3. "log"
  4. "github.com/streadway/amqp"
  5. )
  6. func failOnError(err error, msg string) {
  7. if err != nil {
  8. log.Fatalf("%s: %s", msg, err)
  9. }
  10. }
  11. func main() {
  12. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  13. failOnError(err, "Failed to connect to RabbitMQ")
  14. defer conn.Close()
  15. ch, err := conn.Channel()
  16. failOnError(err, "Failed to open a channel")
  17. defer ch.Close()
  18. err = ch.ExchangeDeclare(
  19. "logs", // name
  20. "fanout", // type
  21. true, // durable
  22. false, // auto-deleted
  23. false, // internal
  24. false, // no-wait
  25. nil, // arguments
  26. )
  27. failOnError(err, "Failed to declare an exchange")
  28. q, err := ch.QueueDeclare(
  29. "", // name
  30. false, // durable
  31. false, // delete when unused
  32. true, // exclusive
  33. false, // no-wait
  34. nil, // arguments
  35. )
  36. failOnError(err, "Failed to declare a queue")
  37. err = ch.QueueBind(
  38. q.Name, // queue name
  39. "", // routing key
  40. "logs", // exchange
  41. false,
  42. nil,
  43. )
  44. failOnError(err, "Failed to bind a queue")
  45. msgs, err := ch.Consume(
  46. q.Name, // queue
  47. "", // consumer
  48. true, // auto-ack
  49. false, // exclusive
  50. false, // no-local
  51. false, // no-wait
  52. nil, // args
  53. )
  54. failOnError(err, "Failed to register a consumer")
  55. forever := make(chan bool)
  56. go func() {
  57. for d := range msgs {
  58. log.Printf(" [x] %s", d.Body)
  59. }
  60. }()
  61. log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  62. <-forever
  63. }

receive_logs.go源码

如果要将日志保存到文件,只需打开控制台并输入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

  1. sudo rabbitmqctl list_bindings
  2. # => Listing bindings ...
  3. # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
  4. # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
  5. # => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/843982
推荐阅读
相关标签
  

闽ICP备14008679号