当前位置:   article > 正文

RabbitMQ 发布订阅

RabbitMQ 发布订阅

 RabbitMQ 发布订阅视频学习地址:

简单模式下RabbitMQ 发布者发布消息 消费者消费消息

Publist/Subscribe 发布订阅

RabbitMQ 中,发布订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特 定的接收者(订阅者)。而是将消息发送到一个交换机,交换机将消息转发到绑定到该交换机的每个队 ,每个绑定交换机的队列都将接收到消息。消费者(订阅者)监听自己的队列 并进行消费 。

 

场景 : 开放平台 开发者订阅了某个开放平台的 api 之后,数据有变化就会自动获取到最新的

 

 

 

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

 

P :生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X (交换机)
C :消费者,消息的接收者,会一直等待消息到来
Queue :消息队列,接收消息、缓存消息
Exchange :交换机( X )。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递 交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。
Exchange 有常见以下 3 种类型:
Fanout :广播,将消息交给所有绑定到交换机的队列
Direct :定向,把消息交给符合指定 routing key 的队列
Topic :通配符,把消息交给符合 routing pattern (路由模式) 的队列
Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

 

RabbitMQ 发布订阅模式的一些应用场景:  

 

1. 数据提供商与应用商 :例如中国气象局向多个门户网站提供气象数据。
2. 新闻机构 :将独家新闻发布给多个订阅者,但可能需要根据新闻类型进行更精细的路由。
3. 商城系统 :新添加商品后,同时更新缓存和数据库。
4. 用户通知 :用户充值或转账成功后,通过多种方式(如短信、邮件)通知用户。
5. 消息广播 :将消息广播到多个消费者,例如系统公告、活动通知等。
6. 降低耦合 :生产者和消费者通过 RabbitMQ 进行解耦,不需要直接连接,提高系统的灵活性和可
扩展性。
7. 异步处理 :生产者发送消息后,消费者可以异步处理,提高系统的响应速度和并发处理能力。

 

生产者
emit_log.go

 

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "os"
  6. "strings"
  7. "github.com/rabbitmq/amqp091-go"
  8. )
  9. func failOnError(err error, msg string) {
  10. if err != nil {
  11. log.Printf("%s: %s", msg, err)
  12. }
  13. }
  14. func bodyForm(args []string) string {
  15. var s string
  16. if (len(args) < 2) || os.Args[1] == "" {
  17. s = "hello"
  18. } else {
  19. s = strings.Join(args[1:], " ")
  20. }
  21. return s
  22. }
  23. func main() {
  24. // 连接到RabbitMQ服务器
  25. conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
  26. failOnError(err, "Failed to connect to RabbitMQ")
  27. defer conn.Close()
  28. // 创建一个通道
  29. ch, err := conn.Channel()
  30. failOnError(err, "Failed to open a channel")
  31. defer ch.Close()
  32. //声明一个交换机
  33. err = ch.ExchangeDeclare(
  34. "logs", //name 交换机名称
  35. "fanout", //交换机类型 Fanout 广播
  36. true, //durable 持久化
  37. false, //autoDelete 是否自动删除
  38. false, //internal 是否内部使用 设置为 false 时,表示无论如何这个交换器都不是
  39. 内置的
  40. false, //noWait 是否等待服务器响应 参数通常默认为False,意味着操作会同步进
  41. 行并等待服务器的响应
  42. nil, // 其他属性
  43. )
  44. failOnError(err, "Failed to declare an exchange")
  45. //发送消息
  46. body := bodyForm(os.Args)
  1. // 发布消息到交换机,并指定路由键
  2. err = ch.PublishWithContext(
  3. context.Background(),
  4. "logs", // 交换器的名称
  5. "", // 队列名
  6. false, // mandatory 必须发送到队列 ,false表示如果交换器无法根据自身的类型和路
  7. 由键找到一个符合条件的队列丢弃
  8. false, //immediate 参数设置为 false 时,表示消息不需要立即被消费者接收
  9. amqp091.Publishing{
  10. ContentType: "text/plain",
  11. Body: []byte(body),
  12. })
  13. failOnError(err, "Failed to publish a message")
  14. log.Printf(" [x] Sent: %s", body)
  15. }

 

消费者
receive_log.go

 

  1. package main
  2. import (
  3. "log"
  4. "github.com/rabbitmq/amqp091-go"
  5. )
  6. func failOnError2(err error, msg string) {
  7. if err != nil {
  8. log.Printf("%s: %s", msg, err)
  9. }
  10. }
  11. func main() {
  12. //建立连接
  13. conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
  14. failOnError2(err, "Failed to connect to RabbitMQ")
  15. defer conn.Close()
  16. //创建一个Channel
  17. ch, err := conn.Channel()
  18. failOnError2(err, "Failed to open a channel")
  19. defer ch.Close()
  20. //声明一个交换机
  21. err = ch.ExchangeDeclare(
  22. "logs", // 交换机名称
  23. "fanout", // 交换机类型
  24. true, // 是否持久化
  25. false, // 是否自动删除
  26. false, // 是否内部使用
  27. false, // 是否等待服务器响应
  28. nil, // 其他属性
  29. )
  30. failOnError2(err, "Failed to declare an exchange")
  31. // 声明一个临时队列
  32. q, err := ch.QueueDeclare(
  33. "", // 队列名称,留空表示由RabbitMQ自动生成
  34. false, // 是否持久化
  35. false, // 是否自动删除(当没有任何消费者连接时)
  36. true, // 是否排他队列(仅限于当前连接)
  37. false, // 是否等待服务器响应
  38. nil, // 其他属性
  39. )
  40. failOnError2(err, "Failed to declare a queue")
  41. // 将队列绑定到交换机上
  42. err = ch.QueueBind(
  43. q.Name, // 队列名称
  44. "", // 路由键,留空表示接收交换机的所有消息
  45. "logs", // 交换机名称
  46. false, // 是否等待服务器响应
  47. nil, // 其他属性
  48. )
  49. failOnError2(err, "Failed to bind a queue")
  50. msgs, err := ch.Consume(
  51. q.Name, // 队列名称
  52. "", // 消费者标识符,留空表示由RabbitMQ自动生成
  53. true, // 是否自动应答
  54. false, // 是否独占模式(仅限于当前连接)
  55. false, // 是否等待服务器响应
  56. false, // noLocal
  57. nil, // 其他属性
  58. )
  59. // msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
  60. failOnError2(err, "Failed to register a consumer")
  61. var forever chan struct{}
  62. go func() {
  63. for d := range msgs {
  64. log.Printf(" [x] %s", d.Body)
  65. }
  66. }()
  67. log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
  68. <-forever
  69. }

 运行

  1. # 如果你想保存日志文件
  2. go run receive_log.go > logs_from_rabbit.log
  3. # 如果你想再终端看到日志
  4. go run receive_log.go
  5. # shell2
  6. go run emit_log.go

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/639201
推荐阅读
相关标签
  

闽ICP备14008679号