当前位置:   article > 正文

教你如何基于Redis来实现高性能延时消息队列!_怎么用redis实现延迟消息

怎么用redis实现延迟消息

最近在倒腾自建博客后端系统,需要用到延时任务的功能,但手头只有一套MySQL和Redis,如果搞一套MQ成本有点大,于是想着用redis实现延时消息队列。有些场景用数据库的定时扫表也能简单实现延时消息的功能,不过对于我这边的实际场景(比如计数系统)其实是把数据存到redis中,如果用数据库实现延时消息会对数据库有比较大的压力。

系统设计

这里参考了有赞的延迟队列设计

数据结构设计

事件消息体

  1. type EventEntity struct {
  2. EventId int64
  3. Topic string
  4. Body string
  5. EffectTime time.Time
  6. }
  7. 复制代码
  • EVENT_POOL: 使用redis的hash,里面存储了任务事件的完整信息,key=prefix+namespace+topic,field=EventId, val=EventEntity;
  • EVENT_BUCKET: 使用redis的zset,里面存储了任务事件的有序集合,key=prefix+namespace+topic,score=EffectTime, member=EventId;
  • EVENT_QUEUE: 使用redis的list, list中存储了到期待消费任务的EventId。

延迟队列的执行流程

1、当有新增延时任务过来时,会在EVENT_POOL对应的topic中添加一条记录,同时也会把任务添加到EVENT_BUCKET中,按生效时间排序;

2、搬运线程会定时扫描EVENT_BUCKET中已经到期的任务,将这些任务push到EVENT_QUEUE对应topic的队列当中,之后将这些任务从EVENT_BUCKET中删除;

3、EVENT_QUEUE每个topic会有一个监听线程,当发现当前topic队列中有待消费的任务,则会将任务pop出来,并从EVENT_POOL中查询任务详情,交给consumer消费。

代码实现

核心代码

发布延时任务

  1. func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error {
  2. pipeline := q.redisClient.WithContext(ctx).Pipeline()
  3. defer pipeline.Close()
  4. // 向EVENT_POOL中添加任务
  5. pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
  6. // 将任务id添加到EVENT_BUCKET中,按生效时间排序
  7. pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z{
  8. Member: strconv.FormatInt(event.EventId, 10),
  9. Score: float64(event.EffectTime.Unix()),
  10. })
  11. _, err := pipeline.Exec()
  12. if err != nil {
  13. logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
  14. return err
  15. }
  16. return nil
  17. }
  18. 复制代码

搬运线程扫描到期任务

  1. func (q *DelayQueue) carryEventToQueue(topic string) error {
  2. ctx := context.Background()
  3. // 扫描zset中到期的任务
  4. members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeBy{Min: "0", Max: util.ToString(time.Now().Unix())}).Result()
  5. if err != nil && err != redis.Nil {
  6. logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
  7. return err
  8. }
  9. if len(members) == 0 {
  10. return nil
  11. }
  12. errMap := make(map[string]error)
  13. // 将任务添加到对应topic的待消费队列里
  14. for _, m := range members {
  15. eventId := m.Member.(string)
  16. err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
  17. if err != nil {
  18. logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
  19. errMap[eventId] = err
  20. }
  21. }
  22. // 从Bucket中删除已进入待消费队列的事件
  23. var doneMembers []interface{}
  24. for _, m := range members {
  25. eventId := m.Member.(string)
  26. if _, ok := errMap[eventId]; !ok {
  27. doneMembers = append(doneMembers, eventId)
  28. }
  29. }
  30. if len(doneMembers) == 0 {
  31. return nil
  32. }
  33. err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
  34. if err != nil {
  35. logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
  36. }
  37. return nil
  38. }
  39. 复制代码

监听线程消费任务

这里使用了List的BLPop命令,当有数据时会立即返回,没有数据则会一直阻塞直到有数据过来;这样可以避免定时扫描列表浪费资源。

  1. func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error {
  2. for {
  3. ctx := context.Background()
  4. kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
  5. if err != nil {
  6. logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
  7. continue
  8. }
  9. if len(kvPair) < 2 {
  10. continue
  11. }
  12. eventId := kvPair[1]
  13. data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
  14. if err != nil && err != redis.Nil {
  15. logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
  16. if q.persistFn != nil {
  17. _ = q.persistFn(&EventEntity{
  18. EventId: util.String2Int64(eventId),
  19. Topic: topic,
  20. })
  21. }
  22. continue
  23. }
  24. event := &EventEntity{}
  25. _ = jsoniter.UnmarshalFromString(data, event)
  26. for _, subscriber := range subscriberList {
  27. util.Retry(3, 0, func() (success bool) {
  28. err = subscriber.Handle(ctx, event)
  29. if err != nil {
  30. logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
  31. return false
  32. }
  33. return true
  34. })
  35. }
  36. err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
  37. if err != nil {
  38. logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
  39. }
  40. }
  41. }
  42. 复制代码

其他

1、优雅关闭

DelayQueue对象中使用wg、isRunning、stop三个变量来实现优雅关闭,具体可参考源码。

  1. type DelayQueue struct {
  2. namespace string
  3. redisClient *redis.Client
  4. once sync.Once
  5. wg sync.WaitGroup
  6. isRunning int32
  7. stop chan struct{}
  8. persistFn PersistFn
  9. }
  10. 复制代码
  1. // gracefully shudown
  2. func (q *DelayQueue) ShutDown() {
  3. if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0) {
  4. return
  5. }
  6. close(q.stop)
  7. q.wg.Wait()
  8. }
  9. 复制代码

2、消费失败后持久化任务

可为DelayQueue对象设置持久化方法persistFn,用来在监听线程消费任务失败时将任务id持久化以便人工处理。

  1. ...
  2. q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
  3. if err != nil && err != redis.Nil {
  4. logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
  5. if q.persistFn != nil {
  6. _ = q.persistFn(&EventEntity{
  7. EventId: util.String2Int64(eventId),
  8. Topic: topic,
  9. })
  10. }
  11. continue
  12. }
  13. ...
  14. 复制代码

源码地址

redis_delay_queue: github.com/hudingyu/re…

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/907617
推荐阅读
相关标签
  

闽ICP备14008679号