当前位置:   article > 正文

golang rabbitmq客户端连接及重连

golang rabbitmq客户端连接及重连

1、连接、发送、发送异常、重连

package rabbitmq

import (
	"encoding/json"
	"fmt"
	"time"

	"github.com/sirupsen/logrus"
	"github.com/streadway/amqp"
)

type RabbitMQ struct {
	conn            *amqp.Connection
	channel         *amqp.Channel
	configs         RabbitMqConfig
	connErrorChan   chan *amqp.Error
	returnErrorChan chan amqp.Return
	activateChan    chan interface{}
}

func NewRabbitMQ() *RabbitMQ {
	return &RabbitMQ{}
}

// Init 初始化队列服务
func (r *RabbitMQ) Init(cfg RabbitMqConfig) error {
	// 建立连接
	conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s%s",
		cfg.User,
		cfg.PassWord,
		cfg.Addr,
		cfg.VHost))
	if err != nil {
		return err
	}
	r.conn = conn
	r.configs = cfg

	// 创建管道
	if err := r.initChannel(); err != nil {
		return err
	}

	// 交换
	if err := r.exchangeDeclare(); err != nil {
		return err
	}

	// 队列
	if err := r.queueDeclare(); err != nil {
		return err
	}

	// 队列绑定交换
	if err := r.queueBind(); err != nil {
		return err
	}

	// 保持连接活动状态
	r.connErrorChan = make(chan *amqp.Error, 1)
	r.conn.NotifyClose(r.connErrorChan)
	go r.reopen()

	// 消息没有成功路由到队列监控
	r.returnErrorChan = make(chan amqp.Return, 1)
	r.channel.NotifyReturn(r.returnErrorChan)
	go r.messagePushError()

	r.activateChan = make(chan interface{}, 1)

	return nil
}

// reopen 重试
func (r *RabbitMQ) reopen() {
	for {
		select {
		case err := <-r.connErrorChan:
			logrus.WithError(err).Error("RabbitMq server exception retry")
			if r.conn != nil {
				r.conn = nil        // 清除连接
				r.activateChan <- 1 // 通知监控协程结束
			}
			// r.isActivate = false
			time.Sleep(time.Second * time.Duration(r.configs.Interval))
			if err := r.Init(r.configs); err != nil {
				logrus.WithError(err).Error("reopen queue rabbitmq")
				continue
			}
			logrus.Info("reopen rabbitmq success ")
			return
		}
	}
}

// messagePushError 消息发送到队列时异常监控
func (r *RabbitMQ) messagePushError() {
	for {
		select {
		case v, ok := <-r.returnErrorChan:
			if !ok {
				continue
			}
			logrus.WithFields(map[string]interface{}{
				"code":    v.ReplyCode,
				"message": v.ReplyText,
				"content": string(v.Body),
			}).Error("send to rabbitmq failed")
		case <-r.activateChan:
			logrus.Info("The current connection has been interrupted")
			return
		}
	}
}

// initChannel 初始化管道
func (r *RabbitMQ) initChannel() error {
	channel, err := r.conn.Channel()
	if err != nil {
		return err
	}

	if err := channel.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	); err != nil {
		return err
	}
	r.channel = channel
	return nil
}

// exchangeDeclare 创建交换器
func (r *RabbitMQ) exchangeDeclare() error {
	exchange := r.configs.RabbitmqExchange
	return r.channel.ExchangeDeclare(
		exchange.Name,
		exchange.Kind,
		r.configs.Durable,
		r.configs.AutoDelete,
		r.configs.Internal,
		r.configs.NoWait, nil)
}

// queueDeclare 创建队列
func (r *RabbitMQ) queueDeclare() error {
	_, err := r.channel.QueueDeclare(
		r.configs.RabbitmqQueue.Name,
		r.configs.Durable,
		r.configs.AutoDelete,
		r.configs.Internal,
		r.configs.NoWait, nil)
	return err
}

// queueBind 队列与交换器的绑定
func (r *RabbitMQ) queueBind() error {
	return r.channel.QueueBind(r.configs.RabbitmqQueue.Name,
		r.configs.RabbitmqQueue.Name,
		r.configs.RabbitmqExchange.Name,
		r.configs.NoWait, nil)
}

// Send 消息发送
func (r *RabbitMQ) Send(message interface{}) error {
	messageByte, err := json.Marshal(message)
	if err != nil {
		return err
	}
	err = r.channel.Publish(
		"",                           // 交换机
		r.configs.RabbitmqQueue.Name, // 路由队列的Key
		true,                         // 发送到队列失败是否进行通知
		false,                        // 目标队列没有消费者时是否进行通知,官方不建议开启
		amqp.Publishing{
			Headers:         amqp.Table{},
			ContentType:     "text/plain",
			ContentEncoding: "",
			DeliveryMode:    amqp.Persistent, // 消息持久化
			Body:            messageByte,
		},
	)
	if err != nil {
		return err
	}
	return nil
}

// Close 关闭服务
func (r *RabbitMQ) Close() error {
	if err := r.conn.Close(); err != nil {
		return err
	}
	return nil
}

// 队列配置项结构
type RabbitMqConfig struct {
	Addr             string           `mapstructure:"addr"`        // mq地址
	VHost            string           `mapstructure:"vhost"`       // mq的vhost
	User             string           `mapstructure:"user"`        // mq用户名
	PassWord         string           `mapstructure:"password"`    // mq密码
	Durable          bool             `mapstructure:"durable"`     //持久化标识, true: 持久化, false: 否
	AutoDelete       bool             `mapstructure:"auto_delete"` //是否自动删除, true: 是, false: 否
	Internal         bool             `mapstructure:"internal"`    //是否是内部的
	NoWait           bool             `mapstructure:"nowait"`      //是否等待服务器确认, true: 不等待, false: 等待
	Interval         int              `mapstructure:"interval"`    // 重连时间间隔
	RabbitmqExchange RabbitmqExchange `mapstructure:"exchange"`
	RabbitmqQueue    RabbitmqQueue    `mapstructure:"queue"`
}

type RabbitmqExchange struct {
	Name string `mapstructure:"name"` // 交换机名称
	Kind string `mapstructure:"kind"` // 交换机类型, direct: 默认值, 使用路由, fanout: 不使用路由,topic: 订阅,
}
type RabbitmqQueue struct {
	Name string `mapstructure:"name"` //队列名称
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220

 

2、调用示例

package main

import (
	"standard/rabbitmq/rmq"    //自行替换为上面的包的位置
	"github.com/sirupsen/logrus"
)

func main() {
	cfg := rabbitmq.RabbitMqConfig{
		Addr:       "127.0.0.1:5672",
		VHost:      "/",
		User:       "guest",
		PassWord:   "guest",
		Durable:    true,
		AutoDelete: false,
		Internal:   false,
		NoWait:     false,
		RabbitmqExchange: rabbitmq.RabbitmqExchange{
			Name: "exchange.test",
			Kind: "direct",
		},
		RabbitmqQueue: rabbitmq.RabbitmqQueue{
			Name: "queue.test",
		},
		Interval: 2,
	}

	err := rabbitmq.NewRabbitMQ().Init(cfg)
	if err != nil {
		logrus.WithError(err).Error("init rabbit")
		return
	}
	logrus.Info("init rabbitmq success")
	
	/**
	err = rabbitmq.NewRabbitMQ().Send(nil)  // 发送数据至队列
	if err != nil {
		logrus.WithError(err).Error("init rabbit")
		return
	}
	logrus.Info("send to rabbitmq success")
	**/

	select {
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号