当前位置:   article > 正文

python2异步编程_Python-异步编程

python2 异步函数

1、异步 同步

函数或方法被调用时,调用者是否得到最终的结果

直接得到最终结果的,就是同步调用

不直接得到最终结果的,就是异步调用

2、阻塞 非阻塞

函数或方法调用的时候,是否立刻返回

立即返回就是非阻塞

不立即返回就是阻塞调用

3、区别

同步,异步,与 阻塞,非阻塞 没有关系

同步,异步强调的是,是否得到最终的结果。

阻塞,非阻塞强调的是时间,是否等待

同步与异步区别在于:调用者是否得到了想要的结果

同步就是一直要执行到返回最终的结果

异步就是直接返回额,但是返回的不是最终的结果,调用者不能通过这种嗲用得到结果,需要童工被调用者的其他方式通知调用者,来取回最终结果。

阻塞与非阻塞 的区别在于,调用者是否还能干其他的事

阻塞,调用者就只能干等

非阻塞,调用者可以先去忙别的,不用一直等。

4、联系

同步阻塞:什么都不做,直到拿到最终的结果

同步非阻塞:等最终结果的期间,可以做别的

异步阻塞:给一个信息,让等通知,但是期间什么都不干

异步非阻塞:等通知,但是期间可以做别的

5、同步IO,异步IO,IO 多路复用

5.1、IO 两个阶段:

1、数据准备阶段

2、内核空间复制数据到用户进程缓冲区阶段

5.2、发生IO的时候:

1、内核从输入设备读,写数据

2、进程从内核复制数据

系统调用 ---- read 函数

6、IO 模型

同步IO:

同步IO 模型包括:阻塞IO,非阻塞IO, IO多路复用

***阻塞IO:

1243334-20181106103347043-645081223.png

1243334-20181106103721718-2091017148.png

进程等待(阻塞),知道读写完成(全程等待)

read/ write 函数

****非阻塞IO

1243334-20181106103852221-679763900.png

1243334-20181106104012906-1453085720.png

进程调用read操作,如果IO设备没有准备好,立即返回error ,进程不阻塞,用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。

第一阶段数据没有准备好,就先忙别的,等会再来看看,检查数据是否准备好了的过程是非阻塞的。

第二阶段是阻塞的,即内核空间和用户空间之间复制数据 是阻塞的。

****IO 多路复用

所谓IO 多路复用,就是同事监控多个IO,有一个准备好了,就不需要等了,开始处理,提高了同时处理IO 的能力

select 几乎所有的操作系统平台都支持,poll是对select的升级

epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加了回调机制,BSD,Mac平台有kqueque,windows有iocp

1243334-20181106105325608-1757472178.png

1243334-20181106105404228-420261401.png

select 为例,将关注的IO 操作 告诉select函数 并调用,进程阻塞,内核监视select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select 返回,在使用read将数据复制到用户进程。

一般情况下,select最多能监听1024个fd(可以修改,不建议修改),但是由于select 采用轮询的方式,当管理的IO 多了,每次都要遍历全部额fd,效率低下(每次某个IO 设备准备好了,都需要遍历一遍select)

基于事件驱动的epoll没有管理的fd的上限, 且是回调机制,不需要遍历,效率很高。

事件驱动IO:

通知机制:

1、水平触发通知,一直通知,直到处理

2、边缘触发, 只通知一次

event 是事件驱动IO

异步IO:

1243334-20181106111119942-182009864.png

进程发起异步IO 请求,立即返回,内核完成 IO 的两个阶段,内核给进程发一个信号

1243334-20181106111641094-1037442954.png

7、Python中 IO 多路 复用

IO 多路复用:

大多数操作系统都支持select 和poll

Linux 2.5+ 支持epoll

BSD,Mac 支持kqueque

windoes的iocp

Python的select库

实现了select , poll 系统调用,这个基本上操作系统都支持,部分实现了epoll

底层的IO 多路复用模块。

开发中的选择:

1、完全夸平台,使用select ,poll,但是性能较差

2、针对不同操作系统自行选择支持的技术,

select 维护一个文件描述符数据结构,单个进程使用有上限,通常1024,线性扫描这个数据结构,效率低,

poll和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才能知道那个设备就绪了

epoll使用事件通知机制,使用回调机制提高效率

select poll 还要从内核空间复制消息到用户空间,而epoll通过内核空间 和用户空间共享的一块内存来减少复制(mmap))

8、selectors 库

3.4版本提 selectors 库, 高级IO 复用库

类层次结构:

BaseSelector+--SelectSelector 实现select+--PollSelector 实现poll+--EpollSelector 实现epoll+--DevpollSelector 实现devpoll+-- KquequeSelector实现kqueque

selector.DefaultSelector 返回当前平台最有效,性能最高的实现、

源代码:

if 'KqueueSelector' inglobals():

DefaultSelector=KqueueSelectorelif 'EpollSelector' inglobals():

DefaultSelector=EpollSelectorelif 'DevpollSelector' inglobals():

DefaultSelector=DevpollSelectorelif 'PollSelector' inglobals():

DefaultSelector=PollSelectorelse:

DefaultSelector= SelectSelector

但是,由于没有实现Windows 下的IOCP ,所以,只能退化为select

abstractmethod register( fileobj, events, data=Nona)

为selector 注册一个文件对象,监视它的IO 事件,返回SelectKey对象

fileobj 被监视文件对象,例如:socket对象

events 事件,该文件对象必须等待的事件

data 可选的,与此文件对象相关联的不透明的数据,例如:关联用来存储每个客户端的会话ID,关联方法,通过这个参数在关注的时间产生后让二selector干什么事

1243334-20181106134318783-1594293376.png

selector.SelectorKey 有4 个属性

fileobj 注册的文件对象

fd 文件描述符

events 等待上面的文件描述符的文件对象的事件

data 注册时关联的数据

举例: 完成一个TCP Server, 能够接受客户端 请求并回应 客户端消息

ContractedBlock.gif

ExpandedBlockStart.gif

1 importselectors2 importthreading3 importsocket4 importlogging5

6 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'

7 logging.basicConfig(level=logging.INFO, format=FORMAT)8

9 #构建缺省性能最优的selector

10 selector =selectors.DefaultSelector()11

12 #创建 tcp server

13 sock =socket.socket()14 laddr = '127.0.0.1', 9999

15 sock.bind(laddr)16 sock.listen()17 logging.info(sock)18

19 sock.setblocking(False) #非阻塞

20

21 #回调函数,自定义形参

22 defaccept(sock:socket.socket, mask):23 """mask :事件 掩码的或值"""

24 conn, raddr = sock.accept() #这里虽然可以阻塞,但是事实上,阻塞是在 events = selector.select() 就处理了

25 conn.setblocking(False) #不阻塞

26 logging.info('new socket {} in accept'.format(conn))27

28 #注册文件对象sock 关注读事件,返回SelectorKey

29 #将sock,关注事件, data都绑定到key 实例属性上

30 key =selector.register(sock, selectors.EVENT_READ, accept)31 logging.info(key)32 #2018-11-06 14:03:15,407 MainThread 2964 SelectorKey(fileobj=

33 #family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0,

34 #laddr=('127.0.0.1', 9999)>, fd=272, events=1, data=)

35 whileTrue:36 #开始监听,等到有文件对象监控事件产生,返回(key, mask) 元组

37 events = selector.select() #没有 准备好的io,会阻塞在这里。

38 logging.info(events)#是一个二元组 组成的列表

39 '''

40 2018-11-06 14:06:20,215 MainThread 5236 [(SelectorKey(fileobj=, fd=272, events=1, data=), 1)]43 '''

44 for key, mask inevents:45 logging.info(key)46 logging.info(mask)47 callback = key.data #accept

48 callback(key.fileobj, mask) #accept(key.fileobj, mask)

测试

结果:

ContractedBlock.gif

ExpandedBlockStart.gif

1 D:\python3.7\python.exe E:/code_pycharm/tt7.py2 2018-11-06 14:11:01,816 MainThread 8520

3 2018-11-06 14:11:01,817 MainThread 8520 SelectorKey(fileobj=, fd=272, events=1, data=)4 2018-11-06 14:11:03,851 MainThread 8520 [(SelectorKey(fileobj=, fd=272, events=1, data=), 1)]5 2018-11-06 14:11:03,851 MainThread 8520 SelectorKey(fileobj=, fd=272, events=1, data=)6 2018-11-06 14:11:03,851 MainThread 8520 1

7 2018-11-06 14:11:03,851 MainThread 8520 new socket in accept

客户端一连接,就退出,且服务端,不会退出

测试:处于一直监听状态

ContractedBlock.gif

ExpandedBlockStart.gif

1 importselectors2 importthreading3 importsocket4 importlogging5

6 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'

7 logging.basicConfig(level=logging.INFO, format=FORMAT)8

9 #构造缺省性能最优elector

10 selector =selectors.DefaultSelector()11

12 #创建TCP server

13 sock =socket.socket()14 laddr = '127.0.0.1', 9999

15 sock.bind(laddr)16 sock.listen()17 logging.info(sock)18

19 sock.setblocking(False) #非阻塞

20

21 #回调函数,自己定义参数

22 defaccept(sock:socket.socket, mask):23 conn, raddr =sock.accept()24 conn.setblocking(False)25

26 logging.info('new socket {} in accept'.format(conn))27 key =selector.register(conn, selectors.EVENT_READ, read)28 logging.info('--------------------')29 logging.info(key)30

31 #回调函数

32 defread(conn:socket.socket, mask):33 logging.info('===================')34 data = conn.recv(1024)35 msg = 'your msg = {} ==='.format(data.decode())36 logging.info(msg)37 conn.send(msg.encode())38

39 #注册 文件对象sock 关注读事件, 返回SelectorKey

40 #将sock, 关注事件,data都 绑定到key实例属性上

41 key =selector.register(sock, selectors.EVENT_READ, accept)42 logging.info(key)43

44 event =threading.Event()45

46 defselect():47 while not event.is_set():#一直处于监听状态

48 #开始监听 ,等到有文件对象监控事件产生, 返回(key, mask) 元组

49 events =selector.select()50 for key, mask inevents:51 callback =key.data52 logging.info('********************')53 callback(key.fileobj, mask)54

55 threading.Thread(target=select, name='select').start()56

57 defmain():58 while notevent.is_set():59 cmd = input('>>>')60 if cmd.strip() =='quit':61 event.set()62 fobjs =[]63 logging.info('{}'.format(list(selector.get_map().items())))64

65 #字典遍历,不能直接操作数据

66 for fd, key in selector.get_map().items(): #返回注册的项

67 print(fd) #从打印可看出,开始用于accept的sock 也被记录当中

68 print(key.fileobj)69 fobjs.append(key.fileobj)70

71 for fobj infobjs:72 selector.unregister(fobj)73 fobj.close()74 selector.close()75

76 if __name__ == '__main__':77 main()

View Code

结果:

ContractedBlock.gif

ExpandedBlockStart.gif

1 D:\python3.7\python.exe E:/code_pycharm/tt8.py2 2018-11-06 14:53:39,911 MainThread 9256

3 2018-11-06 14:53:39,911 MainThread 9256 SelectorKey(fileobj=, fd=272, events=1, data=)4 >>>2018-11-06 14:53:55,211 select 8388 ********************

5 2018-11-06 14:53:55,221 select 8388 new socket inaccept6 2018-11-06 14:53:55,221 select 8388 --------------------

7 2018-11-06 14:53:55,221 select 8388 SelectorKey(fileobj=, fd=280, events=1, data=)8 2018-11-06 14:53:56,851 select 8388 ********************

9 2018-11-06 14:53:56,851 select 8388 new socket inaccept10 2018-11-06 14:53:56,851 select 8388 --------------------

11 2018-11-06 14:53:56,851 select 8388 SelectorKey(fileobj=, fd=296, events=1, data=)12

13

14 >>>>>>quit15 2018-11-06 14:54:03,426 MainThread 9256 [(272, SelectorKey(fileobj=, fd=272, events=1, data=)), (280, SelectorKey(fileobj=, fd=280, events=1, data=)), (296, SelectorKey(fileobj=, fd=296, events=1, data=))]16 272

17

18 280

19

20 296

21

22

23 Process finished with exit code 0

连接了两个客户端

实现ChatServer 群聊:

测试1:只监听了读,收到客户端信息,直接发送信息,给客户端。并没有监听客户端。

ContractedBlock.gif

ExpandedBlockStart.gif

1 importthreading2 importselectors3 importlogging4 importsocket5

6 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'

7 logging.basicConfig(level=logging.INFO, format=FORMAT)8

9

10 classChatServer:11 def __init__(self, ip='127.0.0.1', port=9999):12 self.laddr =ip, port13 self.sock =socket.socket()14 self.event =threading.Event()15 self.selector =selectors.DefaultSelector()16

17 defstart(self):18 self.sock.bind(self.laddr)19 self.sock.listen()20

21 self.selector.register(self.sock, selectors.EVENT_READ, self.accept)22 threading.Thread(target=self.select, name='select', daemon=True).start()23

24 defselect(self):25 while notself.event.is_set():26 events =self.selector.select()27 for key, mask inevents:28 callbakck =key.data29 callbakck(key.fileobj, mask)30

31 defaccept(self,sock, mask):32 conn, raddr =sock.accept()33 key =self.selector.register(conn, selectors.EVENT_READ, self.send)34

35 defsend(self,conn, mask):36 data = conn.recv(1024)37

38 if data.strip() == b'quit' or data == b'':39 self.selector.unregister(conn)40 conn.close()41 return

42

43 for key inself.selector.get_map().values():44 if key.data !=self.accept:45 msg = 'your msg is {}'.format(data.decode()).encode()46 key.fileobj.send(msg)47

48 defstop(self):49 self.event.set()50 fobjs =[]51 for fd, key inself.selector.get_map().items():52 fobjs.append(key.fileobj)53 for fobj infobjs:54 self.selector.unregister(fobj)55 fobj.close()56 self.selector.close()57

58 defmain():59 cs =ChatServer()60 cs.start()61

62 whileTrue:63 print('=======')64 cmd = input(">>>>")65 if cmd == 'quit':66 cs.stop()67 break

68 print(threading.enumerate())69

70 if __name__ == '__main__':71 main()

View Code

将服务端封装为了一个类,实例化 一个对象,进行操作

使用了IO 复用 的模型,不需要创建容器去收纳客户端,selects,提供了一个字典,收纳 (fd, selectkey)

不需要多线程,只需要将阻塞主线程的监听放在一个新线程中,不会阻塞主线程,就行。

只需要循环监听函数就行,不需要 循环 send 和 recv

测试2:实现读写分离,利用中间容器,存放读到的东西,写的时候,从里边拿。这里使用了Queue.

self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)

注册语句,要监听 selectors.EVENT_READ | selectors.EVENT_WRITE 读 与 写 事件

回调的时候,需要mask 来判断究竟是读触发了,还是写触发了,所以,需要修改方法声明,增加mask

注意:读 和 写 是分离的,那么,handle 函数,应该这样写:

1 defhandle(self, sock, mask):2 if mask &selectors.EVENT_READ: 读是1,写是2(十进制),二进制01,10,所以位与3 pass

4

5 #注意: 这里是某一个socket的写操作

6 if mask & selectors.EVENT_WRITE:#写缓冲区准备好了,可以写数据了

7 pass

handle方法 里面处理读, 写,mask有可能是0b01 0b10, 0b11

问题是,假设读取到了客户端发来的数据后,如何写进去》?

为每一个与客户端连接的socket对象增加对应的队列

与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到消息后,会遍历发给所有客户端的队列,这里完成一对多,即一份数据放到了所有的队列中。

与每一个客户端练级的socket对象,发现自己对联有数据,就发送给客户端。

ContractedBlock.gif

ExpandedBlockStart.gif

1 importthreading2 importselectors3 importlogging4 importsocket5 from queue importQueue6

7 FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'

8 logging.basicConfig(level=logging.INFO, format=FORMAT)9

10

11 classChatServer:12 def __init__(self, ip='127.0.0.1', port=9999):13 self.laddr =ip, port14 self.sock =socket.socket()15 self.event =threading.Event()16 self.selector =selectors.DefaultSelector()17

18 defstart(self):19 self.sock.bind(self.laddr)20 self.sock.listen()21

22 self.selector.register(self.sock, selectors.EVENT_READ, self.accept)23 threading.Thread(target=self.select, name='select', daemon=True).start()24

25 defselect(self):26 while notself.event.is_set():27 events =self.selector.select()28 for key, mask inevents:29 if key.data ==self.accept:30 callbakck =key.data31 callbakck(key.fileobj)32 else:33 logging.info('{}'.format(threading.enumerate()))34 callbakck =key.data[0]35 callbakck(key, mask)36

37 defaccept(self,sock):38 conn, raddr =sock.accept()39 self.selector.register(conn, selectors.EVENT_READ |selectors.EVENT_WRITE, (self.handler, Queue()))40

41 defhandler(self, key, mask):42 if mask & selectors.EVENT_READ:#读

43 conn =key.fileobj44 data = conn.recv(1024)45 logging.info(data)46 if data.strip() == b'quit' or data == b'':47 self.selector.unregister(conn)48 conn.close()49 return

50

51 msg = 'your msg is {}'.format(data.decode()).encode()52 for key inself.selector.get_map().values():53 ifisinstance(key.data, tuple):54 key.data[1].put(msg)55 logging.info('----------------------')56

57 if selectors.EVENT_WRITE: #写

58 if not key.data[1].empty():59 key.fileobj.send(key.data[1].get())60

61

62 defstop(self):63 self.event.set()64 fobjs =[]65 for fd, key inself.selector.get_map().items():66 fobjs.append(key.fileobj)67 for fobj infobjs:68 self.selector.unregister(fobj)69 fobj.close()70 self.selector.close()71

72 defmain():73 cs =ChatServer()74 cs.start()75

76 whileTrue:77 print('=======')78 cmd = input(">>>>")79 if cmd == 'quit':80 cs.stop()81 break

82 print(threading.enumerate())83

84 if __name__ == '__main__':85 main()

View Code

这个程序最大的问题,在select(),一直判断可写,几乎一直循环不停,所以对于写不频繁的情况下,就不要监听 写。

对于 Server 来说,一般更多的是等待对方发来数据后,响应时才发出数据,而不是积极的等着发送数据,所以,监听 读, 收到数据之后,再发送就可以了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/102548
推荐阅读
相关标签
  

闽ICP备14008679号