赞
踩
最近秋招开始找工作,顺便回顾消息队列并且总结。
消息队列(Message Queue)是一种在应用程序之间传递消息的通信模式。它通过在发送者和接收者之间建立一个消息队列来实现异步通信和解耦。
在消息队列模式中,发送者(Producer)将消息发送到一个中间件(Message Broker)中的消息队列,而接收者(Consumer)则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理,而无需直接交互,从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信,而不需要知道彼此的存在。
1. 异步通信:发送者将消息放入队列后即可继续进行其他操作,无需等待接收者的响应。接收者可以在合适的时候从队列中获取消息进行处理,实现了异步通信模式。
2. 解耦:发送者和接收者之间通过消息队列进行通信,彼此之间不直接耦合。发送者只需将消息发送到队列中,而不需要知道消息是如何被处理的。接收者只需从队列中获取消息进行处理,而不需要知道消息的来源。
3. 可靠性传输:消息队列通常提供持久化机制,确保消息在发送和接收过程中不会丢失。即使接收者暂时不可用,消息也会在队列中等待,直到接收者准备好接收为止。
4. 扩展性:消息队列可以支持多个发送者和接收者,实现系统的扩展性和高并发处理能力。
5. 缓冲和削峰填谷:通过将消息缓存到队列中,可以平衡发送者和接收者之间的处理速度差异,从而避免系统过载。
消息队列在分布式系统、微服务架构、异步任务处理、事件驱动架构等场景中被广泛应用。一些常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们提供了丰富的功能和配置选项,可以根据应用需求选择合适的消息队列实现。
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
四个用途
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
消息分发:提高系统灵活性
应用解耦是指通过使用消息队列等中间件来降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力。通过解耦,每个应用程序可以通过消息队列发送和接收消息,而不需要了解其他应用程序的具体实现细节。通过应用解耦,可以实现系统的松耦合架构,提高系统的可维护性、扩展性和容错性。
没有使用MQ:
异步提速是指通过将耗时的操作转化为异步执行,从而提高系统的响应速度和吞吐量。通过异步处理,应用程序可以在等待某个操作完成的同时继续执行其他任务,而不需要阻塞等待结果返回。
例如,当一个应用程序需要进行网络请求并等待响应时,如果采用同步方式,应用程序会被阻塞,直到响应返回才能继续执行其他任务。而通过异步方式,应用程序可以继续执行其他任务,不需要等待网络请求的结果返回。这样可以提高系统的响应速度,使用户获得更好的体验。
没有使用MQ:
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ:
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。不需要的等待完成
削峰填谷是一种通过平衡系统负载,减轻峰值压力和填充低谷时的资源利用率的技术。它的目标是在系统负载波动较大的情况下,合理利用资源,确保系统的稳定性和高效性。
没有使用MQ:
使用MQ:
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷。简单来说就是慢慢分发
使用MQ后,可以提高系统稳定性。
消息分发是一种将消息从发送者传递到接收者的机制,它在异步系统和事件驱动架构中起着重要的作用。消息分发可以实现解耦和灵活性,允许不同组件或模块之间通过消息进行通信,从而实现系统的松耦合和可扩展性。
下面是消息分发的一些关键概念和示例:
发布者(Publisher):发布者是消息分发系统中的发送者,它负责生成并发布消息。发布者将消息发送到消息分发系统,而不需要知道消息的具体接收者。
订阅者(Subscriber):订阅者是消息分发系统中的接收者,它通过订阅特定的消息或消息类型来表明自己对消息的兴趣。当有匹配的消息到达时,消息分发系统会将消息传递给订阅者。
主题(Topic):主题是消息分发系统中用于分类和组织消息的标识符或名称。发布者可以将消息发布到特定的主题,而订阅者可以选择订阅感兴趣的主题。通过主题,可以实现消息的细粒度过滤和选择性订阅。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。
AMQP三层协议:
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命会发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
AMQP组件:
交换器(Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列(queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
生产者:消息队列创建者,发送消息到MQ
消费者:连接到RabbitMQ,订阅到队列上,消费消息,持续订阅和单条订阅
消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且RabbitMQ用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。是生产者、消费者与RabbitMQ通信的渠道,生产者publish或是消费者subscribe 一个队列都是通过信道来通信的。
信道是建立在TCP连接上的虚拟连接,就是说RabbitMQ在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ都有一个唯一的ID,保证了信道私有性,对应上唯一的线程使用。
Exchange交换机:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。生产者将消息发送到交换器,有交换器将消息路由到一个或者多个队中。当路由不到时,或返回给生产者或直接丟弃。
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
消费者:
package main import ( "log" "github.com/streadway/amqp" ) func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法连接到RabbitMQ服务器:%s", err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建通道:%s", err) } defer ch.Close() // 声明一个队列 queue, err := ch.QueueDeclare( "hello", // 队列名 false, // 持久性 false, // 自动删除 false, // 独占 false, // 等待服务器确认 nil, // 参数 ) if err != nil { log.Fatalf("无法声明队列:%s", err) } // 消费消息 msgs, err := ch.Consume( queue.Name, // 队列名 "", // 消费者标签 true, // 自动确认 false, // 独占 false, // 不等待服务器确认 false, // 参数 ) if err != nil { log.Fatalf("无法注册消费者:%s", err) } // 处理接收到的消息 for msg := range msgs { log.Printf("接收到消息:%s", msg.Body) } }
上述代码首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"heo"的队列。接下来,通过ch.Consume函数注册一个消费者,用于从队列中接收消息。在fo循环中,我们处理接收到的消息,这里只是简单地打印出来。
生产者:
package main import ( "log" "github.com/streadway/amqp" ) func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法连接到RabbitMQ服务器:%s", err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建通道:%s", err) } defer ch.Close() // 声明一个队列 queue, err := ch.QueueDeclare( "hello", // 队列名 false, // 持久性 false, // 自动删除 false, // 独占 false, // 等待服务器确认 nil, // 参数 ) if err != nil { log.Fatalf("无法声明队列:%s", err) } // 发送消息 body := "Hello, RabbitMQ!" err = ch.Publish( "", // 交换机 queue.Name, // 队列名 false, // 必须发送到队列 false, // 不等待服务器确认 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }, ) if err != nil { log.Fatalf("无法发送消息:%s", err) } log.Printf("消息已发送:%s", body) }
上述代码与消费者程序类似,首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"hello"的队列。接下来,通过ch.Publishi函数向队列发送一条消息。
消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关[syncronize]保证一条消息只能被一个消费者使用)。
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
消费者:
package main import ( "fmt" "log" "math/rand" "time" "github.com/streadway/amqp" ) func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("无法连接到RabbitMQ服务器:%s", err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil { log.Fatalf("无法创建通道:%s", err) } defer ch.Close() // 启动多个消费者并行处理任务 for i := 1; i <= 3; i++ { go startConsumer(i, ch) } // 阻塞主进程 select {} } func generateTask(id int) string { time.Sleep(time.Duration(rand.Intn(3)) * time.Second) return fmt.Sprintf("Task %d", id) } func startConsumer(id int, ch *amqp.Channel) { // 声明一个队列 queue, err := ch.QueueDeclare( "tasks_queue", // 队列名 true, // 持久性 false, // 自动删除 false, // 独占 false, // 等待服务器确认 nil, // 参数 ) if err != nil { log.Fatalf("无法声明队列:%s", err) } // 消费任务 msgs, err := ch.Consume( queue.Name, // 队列名 "", // 消费者标签 false, // 手动确认 false, // 不等待服务器确认 false, // 不使用内置的参数 false, // 参数 nil, // 参数 ) if err != nil { log.Fatalf("无法注册消费者:%s", err) } for msg := range msgs { task := string(msg.Body) log.Printf("消费者 %d 接收到任务:%s", id, task) log.Printf("消费者 %d 完成任务:%s", id, task) // 手动确认任务已处理 msg.Ack(false) } }
利用协城启动多个消费者进行消费。
结果如下:
每个消费者监听自己的队列。
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
在RabbitMQ的Publish/Subscribe模型中,生产者将消息发送到交换机,交换机负责将消息广播给所有绑定到它上面的队列。消费者创建队列并将其绑定到交换机上,从而接收交换机发送的消息。这样,一个消息可以被多个消费者接收。
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
package main import ( "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs", // 交换机名称 "fanout", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare an exchange") // 发布消息到交换机 body := "Hello, RabbitMQ!" err = ch.Publish( "logs", // 交换机名称 "", // 路由键,留空表示广播给所有队列 false, // 是否等待服务器响应 false, // 其他属性 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }, ) failOnError(err, "Failed to publish a message") log.Printf("Message sent: %s", body) }
连接到RabbitMQ服务器,声明了一个名为"logs"的交换机,并通过调用ch.Publish方法将消息发布到交换机上。
在示例代码中,通过指定交换机名称为"logs",路由键为空字符串,消息将被广播给所有绑定到该交换机的队列。
package main import ( "fmt" "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs", // 交换机名称 "fanout", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare an exchange") // 声明一个临时队列 q, err := ch.QueueDeclare( "", // 队列名称,留空表示由RabbitMQ自动生成 false, // 是否持久化 false, // 是否自动删除(当没有任何消费者连接时) true, // 是否排他队列(仅限于当前连接) false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare a queue") // 将队列绑定到交换机上 err = ch.QueueBind( q.Name, // 队列名称 "", // 路由键,留空表示接收交换机的所有消息 "logs", // 交换机名称 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to bind a queue") // 订阅消息 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者标识符,留空表示由RabbitMQ自动生成 true, // 是否自动应答 false, // 是否独占模式(仅限于当前连接) false, // 是否等待服务器响应 false, // 其他属性 nil, // 其他属性 ) failOnError(err, "Failed to register a consumer") // 接收消息的goroutine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf("Waiting for messages. To exit press CTRL+C") <-make(chan struct{}) // 阻塞主goroutine }
它连接到RabbitMQ服务器,声明一个fanout类型的交换机(Exchange),创建一个临时队列,将队列绑定到交换机上,并订阅消息。
在示例代码中,创建的交换机名为"logs",交换机类型为"fanout",表示消息将被广播给所有绑定到该交换机的队列。
消费者创建了一个临时队列,并将其绑定到交换机上,这样交换机就会将消息发送到该队列中。
在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
1、队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2、消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3、Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息。
生产者
package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_direct", // 交换机名称 "direct", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare an exchange") // 从命令行参数获取要发送的路由键和消息内容 if len(os.Args) < 3 { log.Fatalf("Usage: %s [info] [message]", os.Args[0]) } severity := os.Args[1] message := strings.Join(os.Args[2:], " ") // 发布消息到交换机,并指定路由键 err = ch.Publish( "logs_direct", // 交换机名称 severity, // 路由键 false, // 是否等待服务器响应 false, // 是否立即将消息写入磁盘 amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }, ) failOnError(err, "Failed to publish a message") log.Printf("Sent message: %s", message) }
它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),并通过指定路由键将消息发布到交换机。
在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。
生产者从命令行参数获取要发送的路由键和消息内容。路由键可以是任意字符串,用于标识消息的类型或者级别。消息内容可以是任意文本。
消费者
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_direct", // 交换机名称 "direct", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare an exchange") // 声明一个临时队列 q, err := ch.QueueDeclare( "", // 队列名称,留空表示由RabbitMQ自动生成 false, // 是否持久化 false, // 是否自动删除(当没有任何消费者连接时) true, // 是否排他队列(仅限于当前连接) false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare a queue") // 从命令行参数获取要绑定的路由键 if len(os.Args) < 2 { log.Fatalf("Usage: %s [info] [warning] [error]", os.Args[0]) } severities := os.Args[1:] // 将队列绑定到交换机上,并指定要接收的路由键 for _, severity := range severities { err = ch.QueueBind( q.Name, // 队列名称 severity, // 路由键 "logs_direct", // 交换机名称 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to bind a queue") } // 订阅消息 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者标识符,留空表示由RabbitMQ自动生成 true, // 是否自动应答 false, // 是否独占模式(仅限于当前连接) false, // 是否等待服务器响应 false, // 其他属性 nil, // 其他属性 ) failOnError(err, "Failed to register a consumer") // 接收消息的goroutine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf("Waiting for messages. To exit press CTRL+C") <-make(chan struct{}) // 阻塞主goroutine }
上述代码实现了一个Routing模型的消费者。它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),创建一个临时队列,并将队列绑定到交换机上,同时指定要接收的路由键。
在RabbitMQ的Routing模型中,生产者将消息发送到交换机,并在发送消息时指定一个路由键(routing key)。交换机根据路由键将消息发送给与之绑定的队列。消费者创建队列并将其绑定到交换机上,并通过指定要接收的路由键来选择性地接收消息。
在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。
消费者创建了一个临时队列,并通过循环将该队列绑定到交换机上,并指定要接收的路由键。路由键可以是任意字符串,用于标识消息的类型或者级别。在示例中,我们通过命令行参数传入要绑定的路由键。
最后,消费者通过调用ch.Consume方法订阅消息。该方法返回一个消息通道msgs,消费者可以从该通道接收到消息。在示例中,我们使用一个goroutine来异步接收消息,并在收到消息时打印出来。
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
统配符
* 匹配不多不少恰好1个词
# 匹配一个或多个词
如:
fan.# 匹配 fan.one.two 或者 fan.one 等
fan.* 只能匹配 fan.one
生产者
func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_topic", // 交换机名称 "topic", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare an exchange") // 定义要发送的消息的路由键和内容 routingKey := "example.key.das" message := "Hello, RabbitMQ!" // 发布消息到交换机,并指定路由键 err = ch.Publish( "logs_topic", // 交换机名称 routingKey, // 路由键 false, // 是否等待服务器响应 false, // 是否立即发送 amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }, ) failOnError(err, "Failed to publish a message") log.Printf("Sent message: %s", message) }
消费者
func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接到RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( "logs_topic", // 交换机名称 "topic", // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部使用 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare an exchange") // 声明一个临时队列 q, err := ch.QueueDeclare( "", // 队列名称,留空表示由RabbitMQ自动生成 false, // 是否持久化 false, // 是否自动删除(当没有任何消费者连接时) true, // 是否排他队列(仅限于当前连接) false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to declare a queue") // 将队列绑定到交换机上,并指定要接收的路由键 err = ch.QueueBind( q.Name, // 队列名称 "example.#", // 路由键,可以使用通配符*匹配多个单词 "logs_topic", // 交换机名称 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to bind a queue") // 创建一个消费者通道 msgs, err := ch.Consume( q.Name, // 队列名称 "", // 消费者标识符,留空表示由RabbitMQ自动生成 true, // 是否自动应答 false, // 是否排他消费者 false, // 是否阻塞 false, // 是否等待服务器响应 nil, // 其他属性 ) failOnError(err, "Failed to register a consumer") // 接收和处理消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf("Waiting for messages...") <-forever }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。