当前位置:   article > 正文

RabbitMQ-发布/订阅模式_rabbittemate的默认发布模式

rabbittemate的默认发布模式

RabbitMQ-默认读、写方式介绍
RabbitMQ-直连交换机(direct)使用方法

RabbitMQ-topic exchange使用方法

目录

1、发布/订阅模式介绍

2、交换机(exchange)

3、fanout交换机的使用方式

3.1 声明交换机

3.2 发送消息到交换机

3.2 扇形交换机发送消息代码

 3.2 声明队列,用于接收消息

3.3 binding

4、总结


1、发布/订阅模式介绍

在普通的生产者、消费者模式,rabbitmq会将消息依次传递给每一个消费者,一个worker一个,平均分配,这就是Round-robin调度方式,为了实现更加复杂的调度,我们就需要使用发布/订阅的方式。

2、交换机(exchange)

RabbitMQ中,消息模型的核心理念就是,生产者从来不能直接将消息发送到队列,甚至生产者都不知道消息要被发送到队列中。

相反,生产者只能将消息发送到交换机中,交换机一侧从生产者接收消息,一侧将消息发送到队列中,交换机需要知道如何处理接收到的消息,是发送给一个队列还是多个队列?这是由交换机的类型决定的。

交换机共分为四类:  directtopicheaders and fanout. 本章节以扇形交换机为例说明rabbitmq的使用。

3、fanout交换机的使用方式

扇形交换机,就像你猜测的那样,他可以将他接收到的全部消息广播到所有队列里。

3.1 声明交换机

首先声明一个扇形交换机,type参数设置为『fanout』

  1. err = ch.ExchangeDeclare(
  2. "logs", // name
  3. "fanout", // type
  4. true, // durable
  5. false, // auto-deleted
  6. false, // internal
  7. false, // no-wait
  8. nil, // arguments
  9. )

3.2 发送消息到交换机

交换机设定完成后,就可以往该交换机发送消息:

  1. body := "Hello World!"
  2. err = ch.Publish("logs", "", false, false, amqp.Publishing{
  3. ContentType: "text/plain",
  4. Body: []byte(body),
  5. })

如果要在rabbitmq的页面上查看发送的消息,需要提前创建一个队列,并绑定到该交换机[logs]上,就可以查看发送的消息:

扇形交换机的特性,就是他会将收到的消息广播给所有绑定到该交换机的队列,我们可以创建多个队列,并绑定到该交换机上,我们发送一次消息,就会看到,所有绑定到该交换机的队列中都会有一条消息,先创建三个队列,并分别绑定到logs交换机:

之后运行脚本,发送两次消息:

 可以看到,三个队列当中都有两条消息。

3.2 扇形交换机发送消息代码

  1. package main
  2. import (
  3. "fmt"
  4. amqp "github.com/rabbitmq/amqp091-go"
  5. )
  6. func main() {
  7. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  8. if err != nil {
  9. fmt.Println("Failed to connect to RabbitMQ")
  10. return
  11. }
  12. defer conn.Close()
  13. ch, err := conn.Channel()
  14. if err != nil {
  15. fmt.Println("Failed to open a channel")
  16. return
  17. }
  18. err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
  19. if err != nil {
  20. fmt.Println("Failed to declare an exchange")
  21. return
  22. }
  23. body := "Hello World!"
  24. err = ch.Publish("logs", "", false, false, amqp.Publishing{
  25. ContentType: "text/plain",
  26. Body: []byte(body),
  27. })
  28. if err != nil {
  29. fmt.Println("Failed to publish a message")
  30. return
  31. }
  32. }

 3.2 声明队列,用于接收消息

  1. q, err := ch.QueueDeclare(
  2. "", // name
  3. false, // durable
  4. false, // delete when unused
  5. true, // exclusive
  6. false, // no-wait
  7. nil, // arguments
  8. )

声明队列时,没有指定队列名称,这时,系统会返回一个随机名称存储在q变量中。 

3.3 binding

队列声明完成后,需要将该队列绑定到交换机上,这样交换机才能把消息广播给该队列:

绑定代码: 

  1. err = ch.QueueBind(
  2. q.Name, // queue name
  3. "", // routing key
  4. "logs", // exchange
  5. false,
  6. nil,
  7. )

消费者侧全部代码如下:

  1. package main
  2. import (
  3. "fmt"
  4. amqp "github.com/rabbitmq/amqp091-go"
  5. )
  6. func main() {
  7. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  8. if err != nil {
  9. fmt.Println("Failed to connect to RabbitMQ")
  10. return
  11. }
  12. defer conn.Close()
  13. ch, err := conn.Channel()
  14. if err != nil {
  15. fmt.Println("Failed to open a channel")
  16. return
  17. }
  18. err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
  19. if err != nil {
  20. fmt.Println("Failed to declare an exchange")
  21. return
  22. }
  23. q, err := ch.QueueDeclare(
  24. "", // name
  25. false, // durable
  26. false, // delete when unused
  27. true, // exclusive
  28. false, // no-wait
  29. nil, // arguments
  30. )
  31. err = ch.QueueBind(
  32. q.Name, // queue name
  33. "", // routing key
  34. "logs", // exchange
  35. false,
  36. nil,
  37. )
  38. msgs, err := ch.Consume(
  39. q.Name, // queue
  40. "", // consumer
  41. true, // auto-ack
  42. false, // exclusive
  43. false, // no-local
  44. false, // no-wait
  45. nil, // args
  46. )
  47. var forever chan struct{}
  48. go func() {
  49. for d := range msgs {
  50. fmt.Printf(" [x] %s\n", d.Body)
  51. }
  52. }()
  53. fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  54. <-forever
  55. }

程序启动后,控制台上会增加一个随机命名的队列。

 运行【3.2】的生产者程序,发送消息到扇形交换机,这个时候消费者就会同步消费到消息,并进行打印:

4、总结

关于扇形交换机,核心的一点需要我们记住,发送到扇形交换机的消息,他会将消息广播给所有绑定到该交换机的队列上,无脑广播,所有队列会同时接受到交换机上全部的消息。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/843971
推荐阅读
相关标签
  

闽ICP备14008679号