赞
踩
由于业务需求,需要在服务中加入一个异步任务,执行大量的耗时计算操作,需求细节如下:
A
请求之后创建一个异步任务,然后立即返回,异步任务持续计算,直到结束。B
请求之后可以正确终止掉上述异步任务。10
个服务进程,由于计算消耗大量CPU和内存资源,需要控制10个服务进程同一时间只能执行一个上述异步任务。这里看似很简单的需求,但是对于我们很少接触异步任务的人来说,还是有相当大的挑战:
python2
做到?top
命令看到一个标记为Z
的杀不掉的进程是什么?python2
中的并发控制怎么做,如何做到让10
个服务进程共享同一个异步进程实例。没错,一个看似简单的需求,我我断断续续得实现 + 优化,经过了好几个月的时间,依次解决了上述所有的问题。这中间学习到了很多,或者说有一部分以前书中学到的,觉得没用的知识,终于出现在工作中了。因此值得记录下来,下面我们一步一步得来讲解上述问题为什么出现,以及怎么解决。
1. 说到进程
,我们自动省略了两个字——同步,即一般情况下,我们在主进程main_process
中手动创建一个子进程sub_process
,都会手动调用sub_process.join()
,让前者等待后者退出之后再退出。
import time
from multiprocessing import Process
def do_in_sub_process():
print "[+] Sub process prints."
for i in range(5):
print "[+] Waitting for %d second(s) because of sub process..." % (5 - i)
time.sleep(1)
if __name__ == "__main__":
sub_process = Process(target=do_some_thing)
sub_process.start()
sub_process.join()
print "[+] Main process prints."
上述代码的执行结果应该是:
[+] Sub process prints.
[+] Waitting for 5 second(s) because of sub process…
[+] Waitting for 4 second(s) because of sub process…
[+] Waitting for 3 second(s) because of sub process…
[+] Waitting for 2 second(s) because of sub process…
[+] Waitting for 1 second(s) because of sub process…
[+] Main process prints.
可见,__main__
方法中的主进程因为sub_process.join()
这一句,会先等待子进程全部执行结束并退出,才继续执行自己的其他逻辑,最后自己也退出。
2. 异步进程
的应用场景就体现在,我们不想等待子进程结束,就让主进程立即返回并退出。试想一个异步任务需要耗时1个小时
,而用户在前端页面上不可能忍耐一个请求执行一小时
才返回,一般情况下,一个请求应该在秒级返回响应。
这里出现了一个矛盾点: 想要进程的资源被正确回收,我们就必须调用其join
方法,但是因为我们要使其异步执行,就不能调用join
方法。如此产生了一个严重问题,即【问题与难点】部分的3和4,我们会发现写好的逻辑只能执行一次,执行结束后产生了一个标记为Z
的僵尸进程
,通过kill -9 pid
也杀不掉,这就是因为没有调用子进程的join
方法回收其资源,导致了死锁。再创建新的异步进程时由于资源不够,也没法正常执行。
3. 由于我们的耗时任务计算量很大,所以需要并发来充分利用服务器资源,达到降低计算耗时的目的。多线程共享同个进程的CPU和内存资源,所以不够充分,我们选择多进程,即进程池concurrent.futures.ProcessPoolExecutor
。
这里要注意的是:在python2中concurrent.futures
需要通过pip install futures
额外安装,而python3中是内置的,无需额外安装。
1. Tornado服务的app.py
这个文件用于启动tornado服务,配置请求路径路由到对应的Handler
import tornado.ioloop
import tornado.web
import tornado.process
from handler import AHandler, BHandler
def make_app():
return tornado.web.Application([
# 用于启动异步任务的AHandler
(r"/a", AHandler),
# 用于终止异步任务的BHandler
(r"/b", BHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888, address="127.0.0.1")
# 开启10个服务进程
tornado.process.fork_processes(10)
tornado.ioloop.IOLoop.current().start()
2. 异步任务管理文件task_manager.py
由于我们需要在主进程中创建一个一级子进程master_subprocess
,并在其中创建一个二级子进程池slave_executor
,同时还要保证10个服务进程共享它们的单例,所以将它们单独放在一个文件中。
import os
import signal
from concurrent.futures import ProcessPoolExecutor
import tornado
# 子进程执行的任务、变量必须是全局变量,否则会报错
master_subprocess = None
slave_executor = ProcessPoolExecutor(max_workers=8)
slave_future_tasks = []
def sigchld_handler(signum, frame):
tornado.ioloop.IOLoop.current().add_callback(check_task_process)
def check_task_process():
global fuzz_task_process
while True:
try:
# 使用os.waitpid()函数处理已结束的子进程
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
break
if master_subprocess is not None and pid == master_subprocess.pid:
# 等待子进程退出
master_subprocess.join()
master_subprocess = None
except OSError:
break
# 注册SIGCHLD信号处理器
signal.signal(signal.SIGCHLD, sigchld_handler)
可以看到,除了子进程和子进程池,还有两个方法和一行signal
的代码,这三个部分控制了子进程的正常结束和资源回收,避免master_subprocess
成为僵尸进程。后面会详细讲解其代码逻辑。
3. 子进程和子进程池中执行的方法文件tasks.py
这个文件中主要存放两个方法,即子进程中之行的任务和子进程池中之行的任务。需要注意的是,它们通过import task_manager
导入上述文件,通过task_manager.master_subprocess
和task_manager.slave_executor
来对这些变量进行引用和赋值,保证修改可以被所有服务进程看到并共享。
import task_manager
import traceback
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED
def do_in_master_subprocess(raw_datas):
try:
# 1. 先将原始数据进行预处理,使之适用于多进程并发
datas_to_calculate = pre_process_data(raw_datas)
# python2的ProcessPoolExecutor不支持上下文管理器,不可以使用with关键字
# 2. 提交给进程池去分别计算每个子任务
for data in datas_to_calculate:
future = task_manager.slave_executor.submit(do_in_slave_subprocess, data)
task_manager.slave_future_tasks.append(future)
wait(fuzz_task_manager.slave_future_tasks, timeout=1800, return_when=ALL_COMPLETED)
# 3. 通过返回值和future的组合获取每个子任务的计算结果
result_list = [future.result() if future.done() else "" for future in task_manager.slave_future_tasks]
# 4. 手动关闭进程池,避免产生僵尸进程
task_manager.slave_executor.shutdown(wait=False)
task_manager.slave_executor = ProcessPoolExecutor(max_workers=8)
# 5. 将计算结果转换成数据库需要的数据格式
records = post_process_data(result_list)
# 6. 更新数据库,代码省略
except Exception:
traceback.print_exc()
def do_in_slave_subprocess(data):
try:
begin_at = time.time()
# 1. 执行子任务的计算逻辑
result = calculate(data)
time_cost = time.time() - begin_at
print "[+] Calc sub task: [Time cost]%fs, [Result]%f" % (time_cost, result)
return result
except Exception:
traceback.print_exc()
return ""
4. 两个请求处理器AHandler和BHandler文件handler.py
这个文件在前面的app.py中被引用,主要存放通过处理请求启动和终止异步任务的逻辑
import os
import signal
import multiprocessing
import tornado.web
import tornado.gen
import task_manager
from tasks import do_in_master_subprocess
class AHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
if task_manager.master_subprocess is not None and task_manager.master_subprocess.is_alive():
self.set_status(403)
self.write("当前已有一个异步任务正在运行,请稍后重试")
return
# 1. 从数据库中查得原始数据raw_datas
raw_datas = dbmanager.get_raw_datas()
# 2. 启动异步任务(子进程)
task_manager.master_subprocess = multiprocessing.Process(target=do_in_master_subprocess, args=(raw_datas,))
task_manager.master_subprocess.start()
self.write("异步任务已启动,PID:{}".format(task_manager.master_subprocess.pid))
self.finish()
class BHandler(tornado.web.RequestHandler):
def get(self):
if task_manager.master_subprocess is not None and task_manager.master_subprocess.is_alive():
# 1. 先终止进程池
if task_manager.slave_future_tasks.__len__() > 0:
for future in task_manager.slave_future_tasks:
future.cancel()
task_manager.slave_future_tasks = []
task_manager.slave_executor.shutdown(wait=False)
task_manager.slave_executor = ProcessPoolExecutor(max_workers=8)
# 2. 再终止异步子进程
task_manager.master_subprocess.terminate()
# 等待子进程退出
task_manager.master_subprocess.join()
self.write("异步任务已终止,PID:{}".format(task_manager.master_subprocess.pid))
task_manager.master_subprocess = None
else:
self.write("没有正在运行的异步任务")
self.finish()
上述4部分代码,除了task_manager.py
,都是很简单,很好理解的。这里我们只讲task_manager.py
。
这个文件主要解决了两个问题:
join
方法而变成僵尸进程的问题。1
很好理解,全局定义了三个变量,其他文件通过以下方式来引用和赋值这些变量,保证了这些变量对所有服务进程都立即可见、且值是最新的:
import task_manager
task_manager.master_subprocess
task_manager.slave_executor
task_manager.slave_future_tasks
2
即通过剩余的两个方法和signal
信号量实现,以下是详细的解释:
在上述示例代码中,我们使用了signal
模块来处理信号。信号是一种在操作系统级别实现的进程间通信(IPC)机制。当一个进程接收到一个信号时,操作系统会中断进程的正常执行流程,并调用与该信号关联的处理函数。在我们的示例中,我们使用了SIGCHLD
信号来处理子进程的结束。
以下是signal
模块在示例代码中的作用:
1)注册信号处理器:在task_manager.py
中,我们使用signal.signal()
函数将sigchld_handler
函数注册为SIGCHLD
信号的处理器。这意味着当进程接收到SIGCHLD
信号时,操作系统将调用sigchld_handler
函数。
# 注册SIGCHLD信号处理器
signal.signal(signal.SIGCHLD, sigchld_handler)
2)信号处理器:sigchld_handler
函数在接收到SIGCHLD
信号时被调用。在这个函数中,我们使用Tornado的IOLoop
的add_callback()
方法将check_task_process
函数添加到事件循环中。这样,check_task_process
函数将在下一个I/O循环迭代时被调用。
def sigchld_handler(signum, frame):
io_loop = tornado.ioloop.IOLoop.current()
io_loop.add_callback(check_task_process)
3)处理子进程结束:check_task_process
函数在sigchld_handler
函数中被调度时执行。在这个函数中,我们使用os.waitpid()
函数来处理已结束的子进程。这将确保子进程在完成后被正确清理,从而避免僵尸进程。
def check_task_process():
global task_process
while True:
try:
# 使用os.waitpid()函数处理已结束的子进程
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
break
if task_process is not None and pid == task_process.pid:
task_process.join() # 等待子进程退出
task_process = None
except OSError:
break
1. SIGCHLD信号是什么?它由哪个进程发出,哪个进程接收?
SIGCHLD
是一个由操作系统定义的信号,用于通知父进程其子进程已经结束(正常退出或因信号而终止)。当子进程结束时,操作系统会自动向父进程发送SIGCHLD
信号。父进程可以通过捕获和处理SIGCHLD
信号来执行相应的操作,例如清理子进程资源、终止其他子进程或执行其他任务。
在我们之前的示例代码中,当异步任务(子进程)结束时,操作系统会向运行Tornado服务的进程(父进程)发送SIGCHLD
信号。我们在父进程中注册了一个信号处理器sigchld_handler
,用于处理SIGCHLD
信号。当父进程收到SIGCHLD
信号时,sigchld_handler
函数被调用,然后我们将check_task_process
函数添加到Tornado的事件循环中。在check_task_process
函数中,我们使用os.waitpid()
函数处理已结束的子进程,从而避免僵尸进程。
2. check_task_process函数中的代码逻辑
check_task_process
函数的主要目的是处理已结束的子进程,以避免僵尸进程。在这个函数中,我们使用os.waitpid()
函数来查询已结束的子进程,并在必要时对它们进行清理。以下是check_task_process
函数的代码逻辑解释:
def check_task_process():
global master_subprocess
while True:
try:
# 使用os.waitpid()函数处理已结束的子进程
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
break
if master_subprocess is not None and pid == master_subprocess.pid:
master_subprocess.join() # 等待子进程退出
master_subprocess = None
except OSError:
break
1)我们使用一个while
循环来查询所有已结束的子进程。这是因为可能有多个子进程同时结束,我们需要确保所有子进程都被处理。
2)在while
循环中,我们调用os.waitpid()
函数来查询已结束的子进程。我们传递-1
作为第一个参数,表示我们希望查询任何子进程。我们还传递os.WNOHANG
作为第二个参数,表示os.waitpid()
函数应立即返回,而不是等待子进程结束。
3)os.waitpid()
函数返回一个包含两个元素的元组:已结束子进程的进程ID(PID)和子进程的状态。如果没有已结束的子进程,os.waitpid()
函数将返回(0, 0)
。
4)我们检查os.waitpid()
函数返回的PID是否为0。如果是,则表示没有更多已结束的子进程,因此我们跳出while
循环。如果PID不为0,我们继续检查该PID是否与我们的master_subprocess
变量关联。如果是,我们调用master_subprocess.join()
方法来等待子进程退出并释放资源。然后,我们将master_subprocess
设置为None
,表示没有正在运行的任务。
5)如果在调用os.waitpid()
期间发生OSError
异常,我们将跳出while
循环。这可能是因为没有子进程可用,或者其他原因导致os.waitpid()
失败。
3. 除了SIGCHLD还有什么常见的信号吗?它们都在什么时候产生?
在Unix系统中,有许多不同类型的信号用于在进程之间传递信息。以下是一些常见的信号及其产生的情况:
1)SIGHUP:当终端挂起或控制进程终止时发送。通常用于通知守护进程重新加载配置文件或重新初始化。
2)SIGINT:当用户按下中断键(通常是Ctrl+C)时发送。通常用于终止正在运行的进程。
3)SIGQUIT:当用户按下退出键(通常是Ctrl+\)时发送。与SIGINT类似,但还会生成一个核心转储文件。
4)SIGILL:当进程尝试执行非法指令时发送。通常是由于程序错误或数据损坏引起的。
5)SIGTRAP:由断点指令或其他陷阱指令产生。通常在调试器中使用。
6)SIGABRT:当进程调用abort()
函数时发送。通常用于表示严重的程序错误。
7)SIGFPE:当进程执行算术错误(如除以零)时发送。
8)SIGKILL:用于强制终止进程。这个信号不能被捕获或忽略,因此它总是会导致进程终止。
9)SIGSEGV:当进程执行无效内存引用(如访问未分配的内存)时发送。通常表示程序中的严重错误。
10)SIGPIPE:当进程试图向已关闭的管道或套接字写入数据时发送。通常可以忽略。
11)SIGALRM:当alarm()
函数设置的定时器到期时发送。通常用于实现超时或定期执行任务。
12)SIGTERM:用于请求进程终止。与SIGKILL不同,这个信号可以被捕获和忽略。通常用于优雅地终止进程。
13)SIGUSR1和SIGUSR2:为用户定义的信号保留。您可以在应用程序中使用这些信号来实现自定义功能。
请注意,这些信号在不同的操作系统和平台上可能有所不同。在处理信号时,请确保你了解目标系统上信号的具体行为。在Python中,可以使用signal
模块来处理信号,如我们之前讨论的示例所示。
信号量这个名词以前只在书本中听到过。这次实现这个需求,前后话费了非常多的时间。在和chatgpt的不断询问、测试、优化中,终于通过信号量实现了目标的功能,使服务从只能跑一次就要重启服务器,到可以多跑几次,再到完全解决异步进程不能释放资源变成僵尸进程的问题。值得记录下来,供以后的自己和大家参考。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。