赞
踩
rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic
rabbitMQ.# == rabbit.topic == rabbit.topic.topic
Meta Data : 元数据(描述数据的数据)
对于该模式的两个节点,消息只会存在其中一个节点,另一个节点只保存mate data,当consumer 连接节点2访问节点1的数据信息时,消息会在两个节点中传递。
该模式下p和c应尽量连接每个节点,这样起到线性拓展的作用。
但存在一个问题,如果节点上还有未消费的消息,但是节点挂了。如果节点设置了持久化,则需要在节点重启的时候消息才会恢复。如果未设置持久化,则消息会丢失。
消息存在多个节点中,消息会在节点与节点之间同步,可实现高可用(当一个节点挂了,另一个节点可以接替其位置,继续工作)但会降低性能,因为大量消息进入和同步,会占用大量带宽,但是为了保证高可靠性需要取舍。
代码部分参考 upup小亮的博客
Simple运行机制与WorkQueue相似,只是一个Consumer与多个Consumer的区别。多个Consumer之间存在竞争关系,所以工作队列是创建多个Consumer,多个竞争只有一个可以获取消息消费。消费成功后ack消息删除。
演示代码放到一起了:
生产者
// simple and work queue func main2() { // 连接到 rabbitMQ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法创建连接:%s", err) return } // 默认关闭 defer conn.Close() // 创建通道Channel ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建channel:%s", err) return } // 通道关闭 defer ch.Close() // 创建存储队列 queue, err := ch.QueueDeclare( "hello", // 队列名称 false, // 持久化设置,可以为true根据需求选择 false, // 自动删除,没有用户连接删除queue一般不选用 false, //独占 false, //等待服务器确认 nil) //参数 if err != nil { fmt.Println(err) log.Fatalf("无法声明队列:%s", err) return } var body string // 发送信息 for i := 0; i < 10; i++ { fmt.Println(i) body = "Hello RabbitMQ" + string(i) err = ch.Publish( "", queue.Name, false, // 必须发送到消息队列 false, // 不等待服务器确认 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) if err != nil { log.Fatalf("消息生产失败:%s", err) continue } } }
消费者
// create conn // 如果同时运行两个这样的consumer代码,就是工作队列。只有一个consumer就是simple conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法创建连接:%s", err) return } defer conn.Close() // create channel ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建channel:%s", err) return } defer ch.Close() // create queue queue, err := ch.QueueDeclare( "hello", false, false, false, false, nil) if err != nil { log.Fatalf("无法创建queue:%s", err) return } // 消费信息 msgs, err := ch.Consume( queue.Name, "", true, false, false, false, nil) if err != nil { log.Fatalf("无法消费信息:%s", err) return } for msg := range msgs { log.Println(string(msg.Body)) } return
发布订阅模式可以创建两个Queue,绑定到同一个Exchange中
生产者这边只需要跟交换机对接,而交换机类型为fanout:
func main() { // 连接到 rabbitMQ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法创建连接:%s", err) } // 默认关闭 defer conn.Close() // 创建通道Channel ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建channel:%s", err) } defer ch.Close() // create exchange ex := ch.ExchangeDeclare( "exchange1", // 交换机名称 "fanout", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) fmt.Println(ex) body := "Hello RabbitMQ for Pub/Sub" err = ch.Publish( "exchange1", "", // routing key 可以为空,因为fanout不看routing key false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) if err != nil { log.Fatalf("err %s:", err) } log.Println(body) }
消费者:创建交换机,类型为fanout,创建队列,绑定交换机(创建多个consumer绑定同一个queue和同一个交换机。这样发送一个消息,所有的consumer都能收到。== 发布订阅模型)
// Pub/Sub // Create conn conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil{ log.Fatalf(err) } defer conn.Close() // channel create ch, err := conn.Channel() if err != nil{ log.Fatalf(err) } defer ch.Close() // exchange create ex := ch.ExchangeDeclare( "exchange1", "fanout", true, false, false, false, nil) fmt.Println(ex) // queue create queue, err := ch.QueueDeclare( "hello", false, false, false, false, nil) if err != nil{ log.Fatalf(err) } err = ch.QueueBind( queue.Name, "", "exchange1", false, nil) if err != nil{ log.Fatalf(err) } msgs, err := ch.Consume( queue.Name, "", true, false, false, false, nil) if err != nil{ log.Fatalf(err) } go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf("Waiting for messages. To exit press CTRL+C") <-make(chan struct{}) // 阻塞主goroutine }
生产者
func main() { // 连接到 rabbitMQ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法创建连接:%s", err) } // 默认关闭 defer conn.Close() // 创建通道Channel ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建channel:%s", err) } defer ch.Close() // create exchange ex := ch.ExchangeDeclare( "exchange1", // 交换机名称 "direct", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) fmt.Println(ex) body := "Hello RabbitMQ for direct routing" // 发布消息到交换机,并指定路由键 err = ch.Publish( "logs_direct", // 交换机名称 "routing_key", // 路由键 false, // 是否等待服务器响应 false, // 是否立即将消息写入磁盘 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }, ) if err != nil{ log.Fatalf("无法创建send msg:%s", err) } log.Printf("Sent message: %s", message)
消费者
func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil{ log.Fatalf("无法创建send msg:%s", err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil{ log.Fatalf("无法创建send msg:%s", err) } defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_direct", // 交换机名称 "direct", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf("无法创建send msg:%s", err) } // 声明一个临时队列 q, err := ch.QueueDeclare( "", // 队列名称,留空表示由RabbitMQ自动生成,因为定义了key所以队列名可以是随意的,毕竟是依靠key来进行匹配的 false, // 是否持久化 false, // 是否自动删除(当没有任何消费者连接时) true, // 是否排他队列(仅限于当前连接) false, // 是否等待服务器响应 nil, // 其他属性 ) // 将队列绑定到交换机上,并指定要接收的路由键 err = ch.QueueBind( q.Name, // 队列名称 "routing_key", // 路由键 "logs_direct", // 交换机名称 false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf("无法创建send msg:%s", err) } // 订阅消息 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者标识符,留空表示由RabbitMQ自动生成 true, // 是否自动应答 false, // 是否独占模式(仅限于当前连接) false, // 是否等待服务器响应 false, // 其他属性 nil, // 其他属性 ) failOnError(err, "Failed to register a consumer") // 接收消息的goroutine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf("Waiting for messages. To exit press CTRL+C") <-make(chan struct{}) // 阻塞主goroutine
func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil{ log.Fatalf(err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil{ log.Fatalf(err) } defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_topic", // 交换机名称 "topic", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf(err) } // 定义要发送的消息的路由键和内容 routingKey := "example.key.das" message := "Hello, RabbitMQ!" // 发布消息到交换机,并指定路由键 err = ch.Publish( "logs_topic", // 交换机名称 routingKey, // 路由键 false, // 是否等待服务器响应 false, // 是否立即发送 amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }, ) if err != nil{ log.Fatalf(err) } log.Printf("Sent message: %s", message) }
消费者
func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil{ log.Fatalf(err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil{ log.Fatalf(err) } defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_topic", // 交换机名称 "topic", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf(err) } // 声明一个临时队列 q, err := ch.QueueDeclare( "", // 队列名称,留空表示由RabbitMQ自动生成 false, // 是否持久化 false, // 是否自动删除(当没有任何消费者连接时) true, // 是否排他队列(仅限于当前连接) false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf(err) } // 将队列绑定到交换机上,并指定要接收的路由键 err = ch.QueueBind( q.Name, // 队列名称 "example.#", // 路由键,可以使用通配符*匹配一个单词 "logs_topic", // 交换机名称 false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf(err) } // 创建一个消费者通道 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者标识符,留空表示由RabbitMQ自动生成 true, // 是否自动应答 false, // 是否排他消费者 false, // 是否阻塞 false, // 是否等待服务器响应 nil, // 其他属性 ) if err != nil{ log.Fatalf(err) } // 接收和处理消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf("Waiting for messages...") // 阻塞 <-forever }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。