当前位置:   article > 正文

python2处理耗时任务_如何实现异步任务处理来解决耗时操作问题

python 耗时任务

今天说一个在实际项目中特别实用的解决并发耗时问题的办法:异步任务处理。这里采用 redis list 结构来实现。涉及知识点:

1、redis list 结构

2、阻塞、非阻塞、同步、异步的概念

3、如何实现一个异步处理任务实战

同步、异步、阻塞、非阻塞

首先来说同步和异步,这两个概念是针对通信双方消息传送的响应来说的,如果 A 请求 B,B 马上响应 A,这是同步,而如果 A 请求 B ,B 说好的,我已经接受任务,然后把任务交由 C,而 C 是一个专门负责处理任务的,这种模型就是异步。

而阻塞和非阻塞,通常对应于程序请求响应,比如我们常见的 IO 处理,一个进程在处理 IO 操作时,需要等待 IO 操作完成,如果等着,就是阻塞,如果这个时候它不等着,而是转身先去做别的事情,走到 IO 操作完成再回来处理,这种就是非阻塞。

在网上看到一个比较好的关于这两组概念的区别,即同步和异步是描述行为的,即我是如果去处理当有类似阻塞这种情况的请求,比如 IO 操作,比如耗时操作,而阻塞和非阻塞描述的是一种状态。

redis 列表介绍

首先在面试中经常会问一个问题:redis 支持哪几种数据结构?

网上一般的回答是五种,分别是 string、list、set、sorted set、hash,在官方文档还有其他三种,Bitmaps、HyperLogLogs,还有 stream,参考链接如下:

https://redis.io/topics/data-types-intro

其中 stream 类型是在 5.0 中引入的。这里不详细介绍每种的用法,感兴趣的读者可以自行查看官方文档。因为要用到 list 这种数据结构,所以简单介绍下。

redis 的 list 结构,也可以称为是队列,它和 Python 中的 list 相似,它可以按照插入的顺序存储数据,但不同的是 Python 中的 list 底层是数组实现的,而 redis 中的 list 底层是链表实现的,所以 redis 中的 list 无论是在头部还是在尾部插入元素,时间复杂度都是常量级别的。通常的操作命令:

> rpush mylist 1 2 3 4 5 "foo bar"

(integer) 9

> lrange mylist 0 -1

1) "first"

2) "A"

3) "B"

4) "1"

5) "2"

6) "3"

7) "4"

8) "5"

9) "foo bar"

当然,与之对应的还有 lpush, lpop 等,所有操作指令可以在以下链接进行查看:

https://redis.io/commands#list

数据结构存在的意义是根据不同的场景可以达到更高的效率,那 list 结构有什么作用呢?通常有以下两个作用:

1、存储最近的数据,比如用一个列表存储用户访问的记录,每次访问时插入,而如果需要取最近访问的 10 条,只需要使用 lrange(key, 0, 9) 来获取即可

2、存储任务,即作为一个要处理的任务列表。也就是我们这篇文章里要介绍的异步任务,它去存储需要异步处理的任务列表

此外,除了普通的列表,还有两种特殊列表的存在,一种是限制列表,即限制列表的长度,比如我们只需要记录用户最近访问的30条记录,这样做的好处是保证数据不会无限增长,从业务需求上来说也是合理的,因为比较老的数据已经没有太大的价值(当然如果非要保存这些旧的数据,也可以考虑持久化)。我们可以使用 ltrim 指令来完成,示例如下:

> rpush mylist 1 2 3 4 5

(integer) 5

> ltrim mylist 0 2

OK

> lrange mylist 0 -1

1) "1"

2) "2"

3) "3"

另外一种是阻塞列表,即使用 brpop 或者 blpop 。我们都知道 redis 是单线程的,通常是一个命令执行完,才会去执行下一个命令,那试想这样一种情况,如果一个 redis 列表一直为空,我们一直循环去判断它有没有数据,这样是不是很浪费资源,做了很多无用功,所以通常我们需要在读数据时,判断如果没有数据就阻塞几秒,然后再去读数据。这里可以说得有些难理解,后面我们讲了异步实现代码之后,就好理解了。我们先来看一下 brpop 使用示例:

# 如果没有数据,停 5s

> brpop tasks 5

1) "tasks"

2) "do_something"

异步任务处理代码实现

实现的原理其实也很简单,当客户端发起请求,服务端接收到请求,可能需要完成很多复杂的逻辑,有些甚至是包括一些耗时的操作,这个时候我们可以将这个耗时的操作当作一个任务,把它塞到 redis 的 list 中,而在另一边开启一个常驻脚本,即 while True 的循环,一直去从 list 中读取数据(管理这种常驻脚本,也可以使用 pm2,或者其他工具,这里不做介绍,简单地直接 run 一个 python 脚本)。

这里我们模拟服务端接收到请求后,往 redis list 里塞数据,实现代码如下:

# -*- coding: utf-8 -*-

"""

@Time : 2019/7/6 下午10:51

@Author : yrr

@File : task_client.py

@Desc : push task to redis list

"""

# need install redis, pip install redis

import redis

import json

def main():

conn = redis.Redis(host='localhost', port=6379, db=0)

# Redis Lists are simply lists of strings, sorted by insertion order.

# so we need json.dumps

conn.lpush("async_task_list", json.dumps({

# 参数可自定义规则

'name': 'test',

'data': {'name': 'demon'}

}))

if __name__ == '__main__':

main()

而异步任务脚本实现如下:

# -*- coding: utf-8 -*-

"""

@Time : 2019/7/6 下午10:57

@Author : yrr

@File : task_async.py

@Desc : task async handle

"""

import redis

import json

def handle_test(data):

print("has get the data by redis list...")

print("the name is %s" % data['name'])

TASKS_FUNC_MAP = {'test': handle_test}

def main():

conn = redis.Redis(host='localhost', port=6379, db=0)

while True:

# 需要保证异常不影响正常异步任务

# 所以需要用 try 来控制

try:

# 这里需要用到阻塞式列表,因为一直 for 循环会消耗cpu资源

# 可以设置阻塞时间

# 这也是一个面试考点

task = conn.brpop('async_task_list', 2)

if not task:

continue

_, data = task

print("curent handle data: %s" % (data,))

data = json.loads(data)

fname = TASKS_FUNC_MAP.get(data['name'])

if not fname:

# 通常这里需要报错,暂时处理为打印错误信息

print('not func handle, please check')

continue

fname(data)

except Exception as e:

# 这里涉及到错误任务的重试机制,可以根据业务需求来做

# 这里暂时不处理,只打印出异步信息

print(e)

if __name__ == '__main__':

main()

实现效果如下:https://www.zhihu.com/video/1130982109149556736

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/102465
推荐阅读
相关标签
  

闽ICP备14008679号