赞
踩
RabbitMQ 发布订阅视频学习地址:
简单模式下RabbitMQ 发布者发布消息 消费者消费消息
Publist/Subscribe 发布订阅
- package main
- import (
- "context"
- "log"
- "os"
- "strings"
- "github.com/rabbitmq/amqp091-go"
- )
- func failOnError(err error, msg string) {
- if err != nil {
- log.Printf("%s: %s", msg, err)
- }
- }
- func bodyForm(args []string) string {
- var s string
- if (len(args) < 2) || os.Args[1] == "" {
- s = "hello"
- } else {
- s = strings.Join(args[1:], " ")
- }
- return s
- }
- func main() {
- // 连接到RabbitMQ服务器
- conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
- failOnError(err, "Failed to connect to RabbitMQ")
- defer conn.Close()
- // 创建一个通道
- ch, err := conn.Channel()
- failOnError(err, "Failed to open a channel")
- defer ch.Close()
- //声明一个交换机
- err = ch.ExchangeDeclare(
- "logs", //name 交换机名称
- "fanout", //交换机类型 Fanout 广播
- true, //durable 持久化
- false, //autoDelete 是否自动删除
- false, //internal 是否内部使用 设置为 false 时,表示无论如何这个交换器都不是
- 内置的
- false, //noWait 是否等待服务器响应 参数通常默认为False,意味着操作会同步进
- 行并等待服务器的响应
- nil, // 其他属性
- )
- failOnError(err, "Failed to declare an exchange")
- //发送消息
- body := bodyForm(os.Args)
- // 发布消息到交换机,并指定路由键
- err = ch.PublishWithContext(
- context.Background(),
- "logs", // 交换器的名称
- "", // 队列名
- false, // mandatory 必须发送到队列 ,false表示如果交换器无法根据自身的类型和路
- 由键找到一个符合条件的队列丢弃
- false, //immediate 参数设置为 false 时,表示消息不需要立即被消费者接收
- amqp091.Publishing{
- ContentType: "text/plain",
- Body: []byte(body),
- })
- failOnError(err, "Failed to publish a message")
- log.Printf(" [x] Sent: %s", body)
- }
- package main
- import (
- "log"
- "github.com/rabbitmq/amqp091-go"
- )
- func failOnError2(err error, msg string) {
- if err != nil {
- log.Printf("%s: %s", msg, err)
- }
- }
- func main() {
- //建立连接
- conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
- failOnError2(err, "Failed to connect to RabbitMQ")
- defer conn.Close()
- //创建一个Channel
- ch, err := conn.Channel()
- failOnError2(err, "Failed to open a channel")
- defer ch.Close()
- //声明一个交换机
- err = ch.ExchangeDeclare(
- "logs", // 交换机名称
- "fanout", // 交换机类型
- true, // 是否持久化
- false, // 是否自动删除
- false, // 是否内部使用
- false, // 是否等待服务器响应
- nil, // 其他属性
- )
- failOnError2(err, "Failed to declare an exchange")
- // 声明一个临时队列
- q, err := ch.QueueDeclare(
- "", // 队列名称,留空表示由RabbitMQ自动生成
- false, // 是否持久化
- false, // 是否自动删除(当没有任何消费者连接时)
- true, // 是否排他队列(仅限于当前连接)
- false, // 是否等待服务器响应
- nil, // 其他属性
- )
- failOnError2(err, "Failed to declare a queue")
- // 将队列绑定到交换机上
- err = ch.QueueBind(
- q.Name, // 队列名称
- "", // 路由键,留空表示接收交换机的所有消息
- "logs", // 交换机名称
- false, // 是否等待服务器响应
- nil, // 其他属性
- )
- failOnError2(err, "Failed to bind a queue")
- msgs, err := ch.Consume(
- q.Name, // 队列名称
- "", // 消费者标识符,留空表示由RabbitMQ自动生成
- true, // 是否自动应答
- false, // 是否独占模式(仅限于当前连接)
- false, // 是否等待服务器响应
- false, // noLocal
- nil, // 其他属性
- )
- // msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
- failOnError2(err, "Failed to register a consumer")
- var forever chan struct{}
- go func() {
- for d := range msgs {
- log.Printf(" [x] %s", d.Body)
- }
- }()
- log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
- <-forever
- }
运行
- # 如果你想保存日志文件
- go run receive_log.go > logs_from_rabbit.log
- # 如果你想再终端看到日志
- go run receive_log.go
- # shell2
- go run emit_log.go
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。