赞
踩
同步、异步是指函数或方法调用的时候,被调用者是否得到最终结果的。
函数调用完的时候,是否能获得了最终结果。 例如:
同步好比去买馒头:跟店小二说打包一份馒头,一直等到店小二把馒头打包给我。
异步好比点外卖:下完订单后,返回一个订单成功信息,但是外卖还没到我手上,中间还可能短信通知外卖还有多久到。–调用请求后立马返回请求成功。成功后我去做其他事。外卖做到哪一步了是否最终好了,要么自己主动实时查看、要么平台通知你。即使外面员已经把外卖送到小区门口了,我还可以不用立即去取,还可以先把当前还剩10分钟的剧追完了再去拿,完成这个异步非阻塞过程。
又比如:
异步:业务中,发起一个创建存储池的任务,发起创建请求后,接口返回一个200的状态码,但是创池时间要很久,一直等到各个组件部署完成,进度100%任务才算结束。这是一个异步过程。 — 这是一个请求发起者自己去看任务是否准备好了:立即返回后,需要请求方再发起状态查询请求,实时查询当前进度。当任务进度100%后,就完成了。
同步:调用函数进行算数运算,函数通过运算返回最终结果,而没有任何中间值。
函数或方法调用的时候,是否立刻返回。立即返回就是非阻塞调用;不立即返回就是阻塞调用。
同步、 异步,与阻塞、非阻塞不相关。
同步、异步强调的是结果;阻塞、非阻塞强调是时间,是否等待。
同步与异步区别在于:调用者是否得到了想要的结果。
同步就是一直要到返回结果;异步就是直接返回了,但是不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其他方式通知调用者,来取回最终结果。
阻塞与非阻塞的区别在于,调用者是否还能干其他事。
四种场景:
IO过程的两阶段:
发生IO的时候:
一个网络IO过程简单理解:
从上述的网络IO可以看出,请求过程发生了三次数据拷贝过程,效率太低了。怎么办呢?通过映射,直接从磁盘缓冲区拷贝到内核缓冲区,直接发送出去,这就是sendfile(零拷贝)。https/nginx都通过零拷贝来提高效率。
1 阻塞IO: 进程等待(阻塞),直到读写read/write完成。
阻塞IO模型
2 非阻塞IO: 进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。
淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是饭盛好了。read/write
3 IO多路复用: 就是同时监控多个IO,有一个准备好了,就不需要等待,开始处理,提高了同时处理IO等能力。IO多路复用不是多线性。
IO对路复用不同平台有不同的模型,select支持所有平台(linux、windows、mac)。
如图以select为例:将关注的10操作告诉select函数并调用,进程阻塞,内核"监视"select关注的文件描述符fd,被关注的任何一个fd对应的10准备好了数据,select返回。在使用read将数据复制到用户进程。
select举例:食堂供应很多菜(众多的10),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来,你得自己找找看哪一样才好了,请服务员把做好的菜打给你。
epoll模型是linux对select的增强。epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。
4 异步IO: 进程发起异步10请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。
举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。在整个过程中,进程都可以忙别的,等好了才过来。
举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。
python的selectors
模块selectors.DefaultSelector
会自动选择平台支持的最优模型。所以不用关心内部怎么多路复用的,只需关心怎么注册、怎么回调。
python中的IO多路复用代码示例及理解:
import selectors
import socket
import threading
from tool.logger_define import get_log
logger = get_log(__name__)
def get_my_sock():
addr = ('127.0.0.1', 9998)
_sock = socket.socket()
_sock.bind(addr)
_sock.listen()
return _sock
def my_handle(s: socket.socket, mask, select: selectors.DefaultSelector):
"""
selectors注册的socket,事件被触发后的打包回调函数
"""
logger.info("Event mask:{}".format(mask))
conn, _ = s.accept()
conn.setblocking(False) # 继续设置为非阻塞模式,为什么?因为recv和send交给了selectors的条件满足之后通知机制,对吧。当然也可以设置为阻塞
select.register(conn, selectors.EVENT_READ, my_chat_handle)
def my_chat_handle(conn: socket.socket, mask, select: selectors.DefaultSelector):
"""实现群聊"""
try:
data = conn.recv(1024)
except Exception as e:
logger.info("[mask:{}] stop connection, error:{}".format(mask, e))
select.unregister(conn) # 异常时,取消这一事件的注册
return
logger.info("[mask:{} recv]{}".format(mask, data.decode()))
if data.decode() in ["quit", 'q']:
select.unregister(conn)
conn.close()
return
for fd, selector_key in select.__dict__.get('_fd_to_key').items():
cur_conn, cur_data = selector_key.fileobj, selector_key.data.__name__
if cur_data != "my_chat_handle":
continue
try:
cur_conn.send("[mask:{} ack]{}".format(mask, data.decode()).encode())
except Exception as e:
logger.info("[mask:{}] stop connection, error:{}".format(mask, e))
select.unregister(cur_conn)
def my_selector(ev: threading.Event):
sock = get_my_sock()
sock.setblocking(False) # 因为sock交给了selectors处理,当满足事件被触发后(有客户端主动发起连接请求),再由回调函数做下一步处理(accept连接,即连接后的数据交互),所以socket可以设置为非阻塞。而多线程模式处理多路请求,如果设置为非阻塞,直接报错。
logger.info("my first socket:{}".format(sock))
my_select = selectors.DefaultSelector() # DefaultSelector实现了多平台自适应,不同关心内部怎么多路复用的,只需关心怎么注册、怎么回调。
# selectors是一种条件满足之后通知的机制(通过注册,当一个事件被触发后,selectors将这个事件注册打包的处理函数送过来,用户再回调这个处理函数进行处理),而不是多线程阻塞(accept其一个线程,每来一个新的连接之后再起一个线程)模式。
# selectors.register,相当于将socket.listen这个监听事件交给selectors,由selectors来监听,当有连接请求时,触发selectors将的Event置为set状态。当selectors将的Event为set时,用户就可根据Event中保留的注册回调函数信息,进行对应的回调函数的调用处理。
my_select.register(sock, events=selectors.EVENT_READ, data=my_handle) # 所以注册这个sock相当于由selectors来listen。并把建立连接后的处理函数my_handle,在注册时打包进去,方便后续回调。
while not ev.is_set():
# socket在selectors.select这里阻塞,select相当于listen和accept之间。实际上,socket打开监听,用户就可以申请建立连接,且连接状态已经为ESTABLISHED。只等服务端accept后,在新的socket中进行数据通信。
events = my_select.select()
if events:
for key, mask in events: # 对当前所有被触发的事件,执行回调函数处理;mask为事件掩码,应该是唯一的ID
logger.info("fd={}, mask={}".format(key, mask))
callback = key.data
callback(key.fileobj, mask, my_select)
print(my_select.__dict__)
if __name__ == '__main__':
my_ev = threading.Event()
threading.Thread(target=my_selector, name="my_selector", daemon=True, args=(my_ev,)).start()
while True:
cmd = input(">>>").strip()
if cmd == "q" or cmd == "quit":
my_ev.set()
logger.info("exit")
break
IO多路复用selectors,并不是多线程示例:
import socket
def my_selectors_test(addr=('127.0.0.1', 9998)):
"""本例测试,单个线程,多个客户端申请连接,客户端显示连接成功(ESTABLISHED)。但是同一时刻只能accept一个连接,当已连接的客户端主动断开连接后,才能进入下一个循环处理另一个连接
用于理解IO多路复用selectors,并不是多线程。且selectors只是位于listen和accept之间,selectors只是监听了有没有用户发起连接请求。
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(addr)
sock.listen(3)
print('tcpServer listen at: %s:%s\n\r' % addr)
while True:
client_sock, client_addr = sock.accept()
print('{}: connect'.format(client_addr))
while True:
try:
recv = client_sock.recv(1024)
except Exception as e:
print(e)
break
print('[Client %s:%s said]:%s' % (client_addr[0], client_addr[1], recv))
try:
client_sock.send('tcpServer has received your message'.encode())
except Exception as e:
print(e)
break
if __name__ == '__main__':
my_selectors_test()
import queue
import selectors
import socket
import threading
from tool.logger_define import get_log
logger = get_log(__name__)
class MyConn:
def __init__(self, conn: socket.socket, handle):
self.queue = queue.Queue()
self.conn = conn
self.handle = handle
self.first_conn = True
class MyChartBySelectors:
def __init__(self, ip="127.0.0.1", port=9998):
self.addr = (ip, port)
self.selector = selectors.DefaultSelector()
self.sock = socket.socket()
self.clients = dict()
self.ev = threading.Event()
def start(self):
self.sock.bind(self.addr)
self.sock.listen()
self.sock.setblocking(False)
self.selector.register(self.sock, events=selectors.EVENT_READ, data=self._accept)
threading.Thread(target=self._run, name="run", daemon=True).start()
def _accept(self, sock: socket.socket, *args):
conn, addr = sock.accept()
conn.setblocking(False)
my_handle = MyConn(conn, self.handle)
self.clients[addr] = my_handle
logger.info("connection client:{}".format(addr))
self.selector.register(conn, events=selectors.EVENT_READ, data=my_handle)
def _run(self):
while not self.ev.is_set():
events = self.selector.select() # timeout可设置可不设置
logger.info(events)
for select_key, mask in events:
if callable(select_key.data):
callback = select_key.data
else:
callback = select_key.data.handle
callback(select_key.fileobj, mask)
def handle(self, conn: socket.socket, mask):
logger.info("Handle the request, mask:{}".format(mask))
addr = conn.getpeername()
if mask & selectors.EVENT_READ:
logger.info("start to receive data")
try:
data = conn.recv(1024)
except Exception as e:
logger.error("Error:{}".format(e))
self.clients.pop(addr)
self.selector.unregister(conn)
return
if data.decode().strip() in ["quit", "q"]:
logger.info("{} close connection".format(conn.getpeername()))
self.clients.pop(addr)
self.selector.unregister(conn)
conn.close()
return
logger.info("Recv:{}".format(data.decode()))
logger.info("start to send data, conn:{}".format(self.clients))
msg = "Ack:{}".format(data.decode()).encode()
for my_addr, my_conn_obj in self.clients.items():
my_conn = my_conn_obj.conn
try:
my_conn.send(msg)
except Exception as e:
logger.error("Error:{}".format(e))
self.clients.pop(my_addr)
logger.info("send msg finish")
def stop(self):
for addr, conn_obj in self.clients.items():
conn = conn_obj.conn
self.selector.unregister(conn)
conn.close()
self.ev.set()
self.sock.close()
logger.info("finish")
if __name__ == '__main__':
logger.info("Start my chat")
my_chat = MyChartBySelectors()
my_chat.start()
while True:
cmd = input(">>>").strip()
if cmd == "q" or cmd == "quit":
my_chat.stop()
logger.info("exit")
break
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。