赞
踩
订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
有几种交换器类型可用:direct
, topic
, headers
和 fanout
。我们将集中讨论最后一个——fanout
。
创建一个这种类型的交换器,并给它起个名字叫logs
:
- err = ch.ExchangeDeclare(
- "logs", // name
- "fanout", // type
- true, // durable
- false, // auto-deleted
- false, // internal
- false, // no-wait
- nil, // arguments
- )
fanout
(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。
也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。
自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。
- q, err := ch.QueueDeclare(
- "", // 空字符串作为队列名称
- false, // 非持久队列
- false, // delete when unused
- true, // 独占队列(当前声明队列的连接关闭后即被删除)
- false, // no-wait
- nil, // arguments
- )
上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
交换器和队列之间的关系称为绑定。
- err = ch.QueueBind(
- q.Name, // queue name
- "", // routing key
- "logs", // exchange
- false,
- nil,
- )
从现在开始,logs
交换器将会把消息添加到我们的队列中。
例如,发布到fanout
交换器:
- body := bodyFrom(os.Args)
- err = ch.Publish(
- "logs", // exchange
- "", // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "text/plain",
- Body: []byte(body),
- })
产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs
交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey
,但是对于fanout
型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go
脚本的代码:
- package main
-
- import (
- "log"
- "os"
- "strings"
-
- "github.com/streadway/amqp"
- )
-
- func failOnError(err error, msg string) {
- if err != nil {
- log.Fatalf("%s: %s", msg, err)
- }
- }
-
- func main() {
- conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
- failOnError(err, "Failed to connect to RabbitMQ")
- defer conn.Close()
-
- ch, err := conn.Channel()
- failOnError(err, "Failed to open a channel")
- defer ch.Close()
-
- err = ch.ExchangeDeclare(
- "logs", // name
- "fanout", // type
- true, // durable
- false, // auto-deleted
- false, // internal
- false, // no-wait
- nil, // arguments
- )
- failOnError(err, "Failed to declare an exchange")
-
- body := bodyFrom(os.Args)
- err = ch.Publish(
- "logs", // exchange
- "", // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "text/plain",
- Body: []byte(body),
- })
- failOnError(err, "Failed to publish a message")
-
- log.Printf(" [x] Sent %s", body)
- }
-
- func bodyFrom(args []string) string {
- var s string
- if (len(args) < 2) || os.Args[1] == "" {
- s = "hello"
- } else {
- s = strings.Join(args[1:], " ")
- }
- return s
- }
如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。
如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。
receive_logs.go
的代码:
- package main
-
- import (
- "log"
-
- "github.com/streadway/amqp"
- )
-
- func failOnError(err error, msg string) {
- if err != nil {
- log.Fatalf("%s: %s", msg, err)
- }
- }
-
- func main() {
- conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
- failOnError(err, "Failed to connect to RabbitMQ")
- defer conn.Close()
-
- ch, err := conn.Channel()
- failOnError(err, "Failed to open a channel")
- defer ch.Close()
-
- err = ch.ExchangeDeclare(
- "logs", // name
- "fanout", // type
- true, // durable
- false, // auto-deleted
- false, // internal
- false, // no-wait
- nil, // arguments
- )
- failOnError(err, "Failed to declare an exchange")
-
- q, err := ch.QueueDeclare(
- "", // name
- false, // durable
- false, // delete when unused
- true, // exclusive
- false, // no-wait
- nil, // arguments
- )
- failOnError(err, "Failed to declare a queue")
-
- err = ch.QueueBind(
- q.Name, // queue name
- "", // routing key
- "logs", // exchange
- false,
- nil,
- )
- failOnError(err, "Failed to bind a queue")
-
- msgs, err := ch.Consume(
- q.Name, // queue
- "", // consumer
- true, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- failOnError(err, "Failed to register a consumer")
-
- forever := make(chan bool)
-
- go func() {
- for d := range msgs {
- log.Printf(" [x] %s", d.Body)
- }
- }()
-
- log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
- <-forever
- }
如果要将日志保存到文件,只需打开控制台并输入:
go run receive_logs.go > logs_from_rabbit.log
如果希望在屏幕上查看日志,请切换到一个新的终端并运行:
go run receive_logs.go
当然,要发出日志,请输入:
go run emit_log.go
使用rabbitmqctl list_bindings
命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go
程序后,你应该看到类似以下内容:
- sudo rabbitmqctl list_bindings
- # => Listing bindings ...
- # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
- # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
- # => ...done.
对结果的解释很简单:数据从logs
交换器进入了两个由服务器分配名称的队列。这正是我们想要的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。