赞
踩
最近在倒腾自建博客后端系统,需要用到延时任务的功能,但手头只有一套MySQL和Redis,如果搞一套MQ成本有点大,于是想着用redis实现延时消息队列。有些场景用数据库的定时扫表也能简单实现延时消息的功能,不过对于我这边的实际场景(比如计数系统)其实是把数据存到redis中,如果用数据库实现延时消息会对数据库有比较大的压力。
这里参考了有赞的延迟队列设计
事件消息体
- type EventEntity struct {
- EventId int64
- Topic string
- Body string
- EffectTime time.Time
- }
- 复制代码
1、当有新增延时任务过来时,会在EVENT_POOL对应的topic中添加一条记录,同时也会把任务添加到EVENT_BUCKET中,按生效时间排序;
2、搬运线程会定时扫描EVENT_BUCKET中已经到期的任务,将这些任务push到EVENT_QUEUE对应topic的队列当中,之后将这些任务从EVENT_BUCKET中删除;
3、EVENT_QUEUE每个topic会有一个监听线程,当发现当前topic队列中有待消费的任务,则会将任务pop出来,并从EVENT_POOL中查询任务详情,交给consumer消费。
- func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error {
- pipeline := q.redisClient.WithContext(ctx).Pipeline()
- defer pipeline.Close()
-
- // 向EVENT_POOL中添加任务
- pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
- // 将任务id添加到EVENT_BUCKET中,按生效时间排序
- pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z{
- Member: strconv.FormatInt(event.EventId, 10),
- Score: float64(event.EffectTime.Unix()),
- })
- _, err := pipeline.Exec()
- if err != nil {
- logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
- return err
- }
- return nil
- }
- 复制代码
- func (q *DelayQueue) carryEventToQueue(topic string) error {
- ctx := context.Background()
- // 扫描zset中到期的任务
- members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeBy{Min: "0", Max: util.ToString(time.Now().Unix())}).Result()
- if err != nil && err != redis.Nil {
- logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
- return err
- }
- if len(members) == 0 {
- return nil
- }
-
- errMap := make(map[string]error)
- // 将任务添加到对应topic的待消费队列里
- for _, m := range members {
- eventId := m.Member.(string)
- err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
- if err != nil {
- logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
- errMap[eventId] = err
- }
- }
-
- // 从Bucket中删除已进入待消费队列的事件
- var doneMembers []interface{}
- for _, m := range members {
- eventId := m.Member.(string)
- if _, ok := errMap[eventId]; !ok {
- doneMembers = append(doneMembers, eventId)
- }
- }
- if len(doneMembers) == 0 {
- return nil
- }
-
- err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
- if err != nil {
- logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
- }
- return nil
- }
-
- 复制代码
这里使用了List的BLPop命令,当有数据时会立即返回,没有数据则会一直阻塞直到有数据过来;这样可以避免定时扫描列表浪费资源。
- func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error {
- for {
- ctx := context.Background()
- kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
- if err != nil {
- logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
- continue
- }
- if len(kvPair) < 2 {
- continue
- }
-
- eventId := kvPair[1]
- data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
- if err != nil && err != redis.Nil {
- logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
- if q.persistFn != nil {
- _ = q.persistFn(&EventEntity{
- EventId: util.String2Int64(eventId),
- Topic: topic,
- })
- }
- continue
- }
- event := &EventEntity{}
- _ = jsoniter.UnmarshalFromString(data, event)
-
- for _, subscriber := range subscriberList {
- util.Retry(3, 0, func() (success bool) {
- err = subscriber.Handle(ctx, event)
- if err != nil {
- logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
- return false
- }
- return true
- })
- }
-
- err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
- if err != nil {
- logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
- }
- }
- }
- 复制代码
1、优雅关闭
DelayQueue对象中使用wg、isRunning、stop三个变量来实现优雅关闭,具体可参考源码。
- type DelayQueue struct {
- namespace string
- redisClient *redis.Client
- once sync.Once
- wg sync.WaitGroup
- isRunning int32
- stop chan struct{}
- persistFn PersistFn
- }
- 复制代码
- // gracefully shudown
- func (q *DelayQueue) ShutDown() {
- if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0) {
- return
- }
- close(q.stop)
- q.wg.Wait()
- }
- 复制代码
2、消费失败后持久化任务
可为DelayQueue对象设置持久化方法persistFn,用来在监听线程消费任务失败时将任务id持久化以便人工处理。
- ...
-
- q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
- if err != nil && err != redis.Nil {
- logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
- if q.persistFn != nil {
- _ = q.persistFn(&EventEntity{
- EventId: util.String2Int64(eventId),
- Topic: topic,
- })
- }
- continue
- }
-
- ...
- 复制代码
redis_delay_queue: github.com/hudingyu/re…
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。