昨日内容回顾
协程实际上是一个线程,执行了多个任务,遇到IO就切换
切换,可以使用yield,greenlet
遇到IO gevent: 检测到IO,能够使用greenlet实现自动切换,规避了IO阻塞问题。
昨天没有讲到的小问题,看下面的例子:
- import gevent
- def func():
- print('eating')
- gevent.spawn(func) # 协程任务开启
执行程序,没有输出结果
加上join
- import gevent
- def func():
- print('eating')
-
- g = gevent.spawn(func)
- g.join() # 阻塞,等待协程执行完毕
执行输出:eating
加上睡眠
- import time
- import gevent
- def func():
- print('eating')
-
- g = gevent.spawn(func)
- time.sleep(1) # 等待1秒
执行程序,没有输出结果
使用gevent.sleep()
- import gevent
- def func():
- print('eating')
-
- g = gevent.spawn(func)
- gevent.sleep(1)
执行输出:eating 然后过1秒退出
导入monkey模块,再使用内置的time.sleep()
- from gevent import monkey;monkey.patch_all()
- import time
- import gevent
- def func():
- print('eating')
-
- g = gevent.spawn(func)
- time.sleep(1)
执行输出:eating
修改为sleep(0)
- from gevent import monkey;monkey.patch_all()
- import time
- import gevent
- def func():
- print('eating')
-
- g = gevent.spawn(func)
- time.sleep(0)
执行输出:eating
time.sleep(0)虽然时间为0,它也是阻塞,gevent检测到了IO
总结:
协程任务开启,不会立即执行,它需要IO才能执行
- from gevent import monkey;monkey.patch_all()
- import time
- import gevent
- def func():
- print('eating1') # 执行
- time.sleep(0.1) # 遇到IO
- print('eating2')
- time.sleep(0.1)
- print('eating3')
- time.sleep(0.1)
- g = gevent.spawn(func) # 协程任务开启
- time.sleep(0) # 阻塞遇到IO
执行输出:eating1
结果为什么时eating1?下面的2和3为啥不执行呢?
因为time.sleep(0)比time.sleep(0.1)要快,执行之后,就结束了。
切换到主进程时,发i西安主进程已经结束了。所以eating2和eating3没有执行。
如果用join‘,就可以让eating2和etaing3执行
- from gevent import monkey;monkey.patch_all()
- import time
- import gevent
- def func():
- print('eating1') # 执行
- time.sleep(0.1) # 遇到IO
- print('eating2')
- time.sleep(0.1)
- print('eating3')
- time.sleep(0.1)
- g = gevent.spawn(func) # 协程任务开启
- #time.sleep(0) # 阻塞遇到IO
- g.join() # 等待协程结束
执行输出:
总结:
内部执行switch时,必须保证协程不结束之前,主线程不结束
在昨天的socket例子中,没有用到join
因为accept时永久 阻塞,他不需要join
并发编程,在面试和重要。一定要熟练掌握
一、IO模型介绍
为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞
同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,我先限定一下本文的上下文。
本文讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不懂。本文中的流程图也是截取自参考文献。
Stevens在文章中一共比较了五种IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路复用
* signal driven IO 信号驱动IO
* asynchronous IO 异步IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。
再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:
#1)等待数据准备 (Waiting for the data to be ready) #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。
异步:多个任务,并发执行。再执行一个操作的同时,又有另外一个任务也在执行。
input accept recv recvfrom read(文件) 这些表示长时间IO
sleep join
print send write log.debug 这些表示短暂IO
二、阻塞IO(blocking IO)
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。
而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel结果,用户进程才能接触block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv()等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的,如下图
ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统滴哦啊用获得结果或者超时出错时才返回。
实际上,除非特别指定,几乎所有的IO接口(包括socket接口)都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
一个简单的解决方案:
#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
该方案的问题是:
#开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
改进方案:
#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。
改进后方案其实也存在着问题:
#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。
发送数据,首先由操作系统接收数据
操作系统陷入阻塞,等待数据阶段
因为它不知道,何时候数据来了
上面的几个绿色箭头,都是操作系统做的
用户态接收到消息,结束阻塞状态
上图是网络IO模型
之前的并发,是多创建了几个进程
那么阻塞在进程中。用户虽然感觉不到,本质上是不能解决IO问题
阻塞IO是比较低效的
所有的阻塞都不会调用CPU
IO多的线程:比如任务过多时,会耗费CPU
协程500个任务,不耗费CPU,切换速度非常快。
所以协程比较快
如果是高计算型的,需要用到多进程,再多线程,再起协程
这样效率会更高,它会规避很多IO操作
如果任务比较少,起线程和协程区别很小。但是协程会更好一点
总结:
协程能解决的事情,不要用线程
在其它的语言里面:
多进程 数据隔离 可以利用多核
多线程 数据不隔离,可以利用多核
协程 数据不隔离 不能利用多核
所以,协程很鸡肋
在Cpython解释器下的python语言
多进程 数据隔离 可以利用多核
多线程 数据不隔离,不能利用多核
协程 数据不隔离 不能利用多核
但是协程的优势大于线程,在Cpython下,效率非常快。
三、非阻塞IO(non-blocking IO)
Linux下,可以通过设置socket使其变为non-blocking,当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的的时间间隔内做其它事情,或者直接再次发送read操作,一旦
kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
#服务端 from socket import * import time s=socket(AF_INET,SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) #设置socket的接口为非阻塞 conn_l=[] del_l=[] while True: try: conn,addr=s.accept() conn_l.append(conn) except BlockingIOError: print(conn_l) for conn in conn_l: try: data=conn.recv(1024) if not data: del_l.append(conn) continue conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: del_l.append(conn) for conn in del_l: conn_l.remove(conn) conn.close() del_l=[] #客户端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8080)) while True: msg=input('>>: ') if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8'))
但是非阻塞IO模型绝不被推荐。
我们不能否认其有点:能够在等待任务完成的时间里干其他活了(包括提交其它任务,也就是“后台”可以有多个任务在“同时”执行)。
但是也难掩其缺点:
#1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况 #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成”作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
用户态,表示python代码
如果只接受一个连接,只有一个conn,只有一个recv
那么它和阻塞IO效果是一样的
如果不止有一个连接,可以利用等待时间,做其他事情
比如再接收一个新的连接
看下面的例子:
起一个进程,多个客户端同时连接,实现并发效果,完成非阻塞IO模型
服务端
- import socket
-
- sk = socket.socket() # 创建套接字
- sk.bind(('127.0.0.1', 9000)) # 监听回环地址的9000端口
- sk.setblocking(False) # 将套接字设为非阻塞模式
- sk.listen() # 开始监听
- '''
- 只存储正常连接列表。是为了之前的连接,不会被覆盖。
- 可能之前一个连接,还没有处理完,新的连接过来时
- 之前的连接,就被覆盖了。为了解决这个问题,需要一个列表来存储
- '''
- conn_lst = []
- '''
- 关闭连接列表,这个列表的元素如果存在conn_lst中,就会被删除。
- 因为在循环一个列表conn_lst时,不能对conn_lst做增加/删除操作。
- 所以需要遍历del_lst列表,每次从conn_lst中删除一个元素。
- '''
- del_lst = []
- while True:
- try:
- conn, addr = sk.accept() # 非阻塞的模型
- print(conn, addr) # 打印连接以及客户端地址
- conn_lst.append(conn) # 添加正常的连接
- except BlockingIOError as e: # 不正确的连接,刚开始是没有发送数据
- for conn in conn_lst: # [conn1,conn2,conn3]
- try:
- msg = conn.recv(1024) # 非阻塞,接收1024字节
- if not msg: # msg = b'' 判断接收数据为空
- conn.close() # 关闭连接
- del_lst.append(conn) # 添加到删除列表
- continue # 返回程序最开始的地方,即try
- print(msg) # 打印接收信息
- msg = msg.decode('utf-8').upper() # 解码并将接收信息全部大写
- conn.send(msg.encode('utf-8')) # 发送处理后的数据给客户端
- except NameError:pass # 没有客户端连接
- except BlockingIOError:pass # 没有数据来
- for conn in del_lst: # 遍历删除列表
- conn_lst.remove(conn) # 每次从正常的连接中,删除已经关闭的连接。为了不做OSError异常处理,直接删掉连接
- del_lst.clear() # 清空列表。
客户端
- import time
- import socket
-
- sk = socket.socket()
- sk.connect(('127.0.0.1',9000))
- for i in range(20): # 循环20次
- sk.send(b'hello')
- print(sk.recv(1024))
- time.sleep(1) # 模拟自动发送,间隔1秒
- sk.close()
假设conn1.recv阻塞了,它不会就干等着。它会继续执行新的连接,比如conn2.recv
所以CPU,会一直工作,不会闲着。它一直在while True
那么问题来了,非阻塞IO,非常耗费CPU
非阻塞IO虽然完成了异步,但是它非常耗费CPU
四、多路复用IO(IO multiplexing)
IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图: