当前位置:   article > 正文

.net core Redis 使用有序集合实现延迟队列_stackexchange.redis2.8

stackexchange.redis2.8

Redis 有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。
不同的是每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

有序集合的成员是唯一的,但分数(score)却可以重复。

集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。 集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。

延迟队列的设计思想是将队列的延迟时间作为分数,按照这个进行排序

  1. 安装依赖
Newtonsoft.Json             13.0.3   
StackExchange.Redis         2.8.0 
  • 1
  • 2
  1. 封装Redis
using StackExchange.Redis;
namespace LedayQueue.RedisHelper
{
    public class RedisConnection
    {
        private readonly ConnectionMultiplexer _connection;
        public IDatabase _database;

        public RedisConnection()
        {
            _connection = ConnectionMultiplexer.Connect("localhost:6379");
            _database = _connection.GetDatabase();
        }



        public async Task AddToQueueAsync(string task, TimeSpan delay)
        {
            var executionTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + delay.TotalSeconds;
            await _database.SortedSetAddAsync("delayedQueue", task, executionTime);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 封装background service
using StackExchange.Redis;

namespace LedayQueue.RedisHelper
{
    public class DelayedQueueProcessor : BackgroundService
    {
        private readonly RedisConnection _connection;
        private const string QueueKey = "delayedQueue";
        public DelayedQueueProcessor(RedisConnection redisConnection)
        {
            _connection = redisConnection;
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
                var tasks = await _connection._database.SortedSetRangeByScoreWithScoresAsync(QueueKey, 0, now);

                foreach (var task in tasks)
                {
                    // 处理任务
                    var taskString = task.Element.ToString();
                    ProcessTask(taskString);

                    // 从队列中移除任务
                    await _connection._database.SortedSetRemoveAsync(QueueKey, task.Element);
                }

                await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); // 每秒检查一次
            }
        }

        private void ProcessTask(string content)
        {
            Console.WriteLine(content);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  1. 注册
builder.Services.AddSingleton<RedisConnection>();
builder.Services.AddHostedService<DelayedQueueProcessor>();

  • 1
  • 2
  • 3

源码

官网

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/798636
推荐阅读
相关标签
  

闽ICP备14008679号