赞
踩
今天说一个在实际项目中特别实用的解决并发耗时问题的办法:异步任务处理。这里采用 redis list 结构来实现。涉及知识点:
1、redis list 结构
2、阻塞、非阻塞、同步、异步的概念
3、如何实现一个异步处理任务实战
同步、异步、阻塞、非阻塞
首先来说同步和异步,这两个概念是针对通信双方消息传送的响应来说的,如果 A 请求 B,B 马上响应 A,这是同步,而如果 A 请求 B ,B 说好的,我已经接受任务,然后把任务交由 C,而 C 是一个专门负责处理任务的,这种模型就是异步。
而阻塞和非阻塞,通常对应于程序请求响应,比如我们常见的 IO 处理,一个进程在处理 IO 操作时,需要等待 IO 操作完成,如果等着,就是阻塞,如果这个时候它不等着,而是转身先去做别的事情,走到 IO 操作完成再回来处理,这种就是非阻塞。
在网上看到一个比较好的关于这两组概念的区别,即同步和异步是描述行为的,即我是如果去处理当有类似阻塞这种情况的请求,比如 IO 操作,比如耗时操作,而阻塞和非阻塞描述的是一种状态。
redis 列表介绍
首先在面试中经常会问一个问题:redis 支持哪几种数据结构?
网上一般的回答是五种,分别是 string、list、set、sorted set、hash,在官方文档还有其他三种,Bitmaps、HyperLogLogs,还有 stream,参考链接如下:
https://redis.io/topics/data-types-intro
其中 stream 类型是在 5.0 中引入的。这里不详细介绍每种的用法,感兴趣的读者可以自行查看官方文档。因为要用到 list 这种数据结构,所以简单介绍下。
redis 的 list 结构,也可以称为是队列,它和 Python 中的 list 相似,它可以按照插入的顺序存储数据,但不同的是 Python 中的 list 底层是数组实现的,而 redis 中的 list 底层是链表实现的,所以 redis 中的 list 无论是在头部还是在尾部插入元素,时间复杂度都是常量级别的。通常的操作命令:
> rpush mylist 1 2 3 4 5 "foo bar"
(integer) 9
> lrange mylist 0 -1
1) "first"
2) "A"
3) "B"
4) "1"
5) "2"
6) "3"
7) "4"
8) "5"
9) "foo bar"
当然,与之对应的还有 lpush, lpop 等,所有操作指令可以在以下链接进行查看:
https://redis.io/commands#list
数据结构存在的意义是根据不同的场景可以达到更高的效率,那 list 结构有什么作用呢?通常有以下两个作用:
1、存储最近的数据,比如用一个列表存储用户访问的记录,每次访问时插入,而如果需要取最近访问的 10 条,只需要使用 lrange(key, 0, 9) 来获取即可
2、存储任务,即作为一个要处理的任务列表。也就是我们这篇文章里要介绍的异步任务,它去存储需要异步处理的任务列表
此外,除了普通的列表,还有两种特殊列表的存在,一种是限制列表,即限制列表的长度,比如我们只需要记录用户最近访问的30条记录,这样做的好处是保证数据不会无限增长,从业务需求上来说也是合理的,因为比较老的数据已经没有太大的价值(当然如果非要保存这些旧的数据,也可以考虑持久化)。我们可以使用 ltrim 指令来完成,示例如下:
> rpush mylist 1 2 3 4 5
(integer) 5
> ltrim mylist 0 2
OK
> lrange mylist 0 -1
1) "1"
2) "2"
3) "3"
另外一种是阻塞列表,即使用 brpop 或者 blpop 。我们都知道 redis 是单线程的,通常是一个命令执行完,才会去执行下一个命令,那试想这样一种情况,如果一个 redis 列表一直为空,我们一直循环去判断它有没有数据,这样是不是很浪费资源,做了很多无用功,所以通常我们需要在读数据时,判断如果没有数据就阻塞几秒,然后再去读数据。这里可以说得有些难理解,后面我们讲了异步实现代码之后,就好理解了。我们先来看一下 brpop 使用示例:
# 如果没有数据,停 5s
> brpop tasks 5
1) "tasks"
2) "do_something"
异步任务处理代码实现
实现的原理其实也很简单,当客户端发起请求,服务端接收到请求,可能需要完成很多复杂的逻辑,有些甚至是包括一些耗时的操作,这个时候我们可以将这个耗时的操作当作一个任务,把它塞到 redis 的 list 中,而在另一边开启一个常驻脚本,即 while True 的循环,一直去从 list 中读取数据(管理这种常驻脚本,也可以使用 pm2,或者其他工具,这里不做介绍,简单地直接 run 一个 python 脚本)。
这里我们模拟服务端接收到请求后,往 redis list 里塞数据,实现代码如下:
# -*- coding: utf-8 -*-
"""
@Time : 2019/7/6 下午10:51
@Author : yrr
@File : task_client.py
@Desc : push task to redis list
"""
# need install redis, pip install redis
import redis
import json
def main():
conn = redis.Redis(host='localhost', port=6379, db=0)
# Redis Lists are simply lists of strings, sorted by insertion order.
# so we need json.dumps
conn.lpush("async_task_list", json.dumps({
# 参数可自定义规则
'name': 'test',
'data': {'name': 'demon'}
}))
if __name__ == '__main__':
main()
而异步任务脚本实现如下:
# -*- coding: utf-8 -*-
"""
@Time : 2019/7/6 下午10:57
@Author : yrr
@File : task_async.py
@Desc : task async handle
"""
import redis
import json
def handle_test(data):
print("has get the data by redis list...")
print("the name is %s" % data['name'])
TASKS_FUNC_MAP = {'test': handle_test}
def main():
conn = redis.Redis(host='localhost', port=6379, db=0)
while True:
# 需要保证异常不影响正常异步任务
# 所以需要用 try 来控制
try:
# 这里需要用到阻塞式列表,因为一直 for 循环会消耗cpu资源
# 可以设置阻塞时间
# 这也是一个面试考点
task = conn.brpop('async_task_list', 2)
if not task:
continue
_, data = task
print("curent handle data: %s" % (data,))
data = json.loads(data)
fname = TASKS_FUNC_MAP.get(data['name'])
if not fname:
# 通常这里需要报错,暂时处理为打印错误信息
print('not func handle, please check')
continue
fname(data)
except Exception as e:
# 这里涉及到错误任务的重试机制,可以根据业务需求来做
# 这里暂时不处理,只打印出异步信息
print(e)
if __name__ == '__main__':
main()
实现效果如下:https://www.zhihu.com/video/1130982109149556736
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。