赞
踩
package rabbitmq import ( "encoding/json" "fmt" "time" "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel configs RabbitMqConfig connErrorChan chan *amqp.Error returnErrorChan chan amqp.Return activateChan chan interface{} } func NewRabbitMQ() *RabbitMQ { return &RabbitMQ{} } // Init 初始化队列服务 func (r *RabbitMQ) Init(cfg RabbitMqConfig) error { // 建立连接 conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s%s", cfg.User, cfg.PassWord, cfg.Addr, cfg.VHost)) if err != nil { return err } r.conn = conn r.configs = cfg // 创建管道 if err := r.initChannel(); err != nil { return err } // 交换 if err := r.exchangeDeclare(); err != nil { return err } // 队列 if err := r.queueDeclare(); err != nil { return err } // 队列绑定交换 if err := r.queueBind(); err != nil { return err } // 保持连接活动状态 r.connErrorChan = make(chan *amqp.Error, 1) r.conn.NotifyClose(r.connErrorChan) go r.reopen() // 消息没有成功路由到队列监控 r.returnErrorChan = make(chan amqp.Return, 1) r.channel.NotifyReturn(r.returnErrorChan) go r.messagePushError() r.activateChan = make(chan interface{}, 1) return nil } // reopen 重试 func (r *RabbitMQ) reopen() { for { select { case err := <-r.connErrorChan: logrus.WithError(err).Error("RabbitMq server exception retry") if r.conn != nil { r.conn = nil // 清除连接 r.activateChan <- 1 // 通知监控协程结束 } // r.isActivate = false time.Sleep(time.Second * time.Duration(r.configs.Interval)) if err := r.Init(r.configs); err != nil { logrus.WithError(err).Error("reopen queue rabbitmq") continue } logrus.Info("reopen rabbitmq success ") return } } } // messagePushError 消息发送到队列时异常监控 func (r *RabbitMQ) messagePushError() { for { select { case v, ok := <-r.returnErrorChan: if !ok { continue } logrus.WithFields(map[string]interface{}{ "code": v.ReplyCode, "message": v.ReplyText, "content": string(v.Body), }).Error("send to rabbitmq failed") case <-r.activateChan: logrus.Info("The current connection has been interrupted") return } } } // initChannel 初始化管道 func (r *RabbitMQ) initChannel() error { channel, err := r.conn.Channel() if err != nil { return err } if err := channel.Qos( 1, // prefetch count 0, // prefetch size false, // global ); err != nil { return err } r.channel = channel return nil } // exchangeDeclare 创建交换器 func (r *RabbitMQ) exchangeDeclare() error { exchange := r.configs.RabbitmqExchange return r.channel.ExchangeDeclare( exchange.Name, exchange.Kind, r.configs.Durable, r.configs.AutoDelete, r.configs.Internal, r.configs.NoWait, nil) } // queueDeclare 创建队列 func (r *RabbitMQ) queueDeclare() error { _, err := r.channel.QueueDeclare( r.configs.RabbitmqQueue.Name, r.configs.Durable, r.configs.AutoDelete, r.configs.Internal, r.configs.NoWait, nil) return err } // queueBind 队列与交换器的绑定 func (r *RabbitMQ) queueBind() error { return r.channel.QueueBind(r.configs.RabbitmqQueue.Name, r.configs.RabbitmqQueue.Name, r.configs.RabbitmqExchange.Name, r.configs.NoWait, nil) } // Send 消息发送 func (r *RabbitMQ) Send(message interface{}) error { messageByte, err := json.Marshal(message) if err != nil { return err } err = r.channel.Publish( "", // 交换机 r.configs.RabbitmqQueue.Name, // 路由队列的Key true, // 发送到队列失败是否进行通知 false, // 目标队列没有消费者时是否进行通知,官方不建议开启 amqp.Publishing{ Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", DeliveryMode: amqp.Persistent, // 消息持久化 Body: messageByte, }, ) if err != nil { return err } return nil } // Close 关闭服务 func (r *RabbitMQ) Close() error { if err := r.conn.Close(); err != nil { return err } return nil } // 队列配置项结构 type RabbitMqConfig struct { Addr string `mapstructure:"addr"` // mq地址 VHost string `mapstructure:"vhost"` // mq的vhost User string `mapstructure:"user"` // mq用户名 PassWord string `mapstructure:"password"` // mq密码 Durable bool `mapstructure:"durable"` //持久化标识, true: 持久化, false: 否 AutoDelete bool `mapstructure:"auto_delete"` //是否自动删除, true: 是, false: 否 Internal bool `mapstructure:"internal"` //是否是内部的 NoWait bool `mapstructure:"nowait"` //是否等待服务器确认, true: 不等待, false: 等待 Interval int `mapstructure:"interval"` // 重连时间间隔 RabbitmqExchange RabbitmqExchange `mapstructure:"exchange"` RabbitmqQueue RabbitmqQueue `mapstructure:"queue"` } type RabbitmqExchange struct { Name string `mapstructure:"name"` // 交换机名称 Kind string `mapstructure:"kind"` // 交换机类型, direct: 默认值, 使用路由, fanout: 不使用路由,topic: 订阅, } type RabbitmqQueue struct { Name string `mapstructure:"name"` //队列名称 }
package main import ( "standard/rabbitmq/rmq" //自行替换为上面的包的位置 "github.com/sirupsen/logrus" ) func main() { cfg := rabbitmq.RabbitMqConfig{ Addr: "127.0.0.1:5672", VHost: "/", User: "guest", PassWord: "guest", Durable: true, AutoDelete: false, Internal: false, NoWait: false, RabbitmqExchange: rabbitmq.RabbitmqExchange{ Name: "exchange.test", Kind: "direct", }, RabbitmqQueue: rabbitmq.RabbitmqQueue{ Name: "queue.test", }, Interval: 2, } err := rabbitmq.NewRabbitMQ().Init(cfg) if err != nil { logrus.WithError(err).Error("init rabbit") return } logrus.Info("init rabbitmq success") /** err = rabbitmq.NewRabbitMQ().Send(nil) // 发送数据至队列 if err != nil { logrus.WithError(err).Error("init rabbit") return } logrus.Info("send to rabbitmq success") **/ select { } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。