- import time
- def demo4():
- """
- 这是最终我们想要的实现.
- """
- import asyncio # 引入 asyncio 库
- async def washing1():
- await asyncio.sleep(3) # 使用 asyncio.sleep(), 它返回的是一个可等待的对象
- print('washer1 finished')
- async def washing2():
- await asyncio.sleep(2)
- print('washer2 finished')
- async def washing3():
- await asyncio.sleep(5)
- print('washer3 finished')
- """
- 事件循环机制分为以下几步骤:
- 1. 创建一个事件循环
- 2. 将异步函数加入事件队列
- 3. 执行事件队列, 直到最晚的一个事件被处理完毕后结束
- 4. 最后建议用 close() 方法关闭事件循环, 以彻底清理 loop 对象防止误用
- """
- # 1. 创建一个事件循环
- loop = asyncio.get_event_loop()
- # 2. 将异步函数加入事件队列
- tasks = [
- washing1(),
- washing2(),
- washing3(),
- ]
- # 3. 执行事件队列, 直到最晚的一个事件被处理完毕后结束
- loop.run_until_complete(asyncio.wait(tasks))
- """
- PS: 如果不满意想要 "多洗几遍", 可以多写几句:
- loop.run_until_complete(asyncio.wait(tasks))
- loop.run_until_complete(asyncio.wait(tasks))
- loop.run_until_complete(asyncio.wait(tasks))
- ...
- """
- # 4. 如果不再使用 loop, 建议养成良好关闭的习惯
- # (有点类似于文件读写结束时的 close() 操作)
- loop.close()
- """
- 最终的打印效果:
- washer2 finished
- washer1 finished
- washer3 finished
- elapsed time = 5.126561641693115
- (毕竟切换线程也要有点耗时的)
- 说句题外话, 我看有的博主的加入事件队列是这样写的:
- tasks = [
- loop.create_task(washing1()),
- loop.create_task(washing2()),
- loop.create_task(washing3()),
- ]
- 运行的效果是一样的, 暂不清楚为什么他们这样做.
- """
- if __name__ == '__main__':
- # 为验证是否真的缩短了时间, 我们计个时
- start = time.time()
- # demo1() # 需花费10秒
- # demo2() # 会报错: RuntimeWarning: coroutine ... was never awaited
- # demo3() # 会报错: RuntimeWarning: coroutine ... was never awaited
- demo4() # 需花费5秒多一点点
- end = time.time()
- print('elapsed time = ' + str(end - start))
- import time
- def hello():
- time.sleep(1)
- def run():
- for i in range(5):
- hello()
- print('Hello World:%s' % time.time()) # 任何伟大的代码都是从Hello World 开始的!
- if __name__ == '__main__':
- run()
- Hello World:1527595175.4728756
- Hello World:1527595176.473001
- Hello World:1527595177.473494
- Hello World:1527595178.4739306
- Hello World:1527595179.474482
- import time
- import asyncio
- loop = asyncio.get_event_loop()
- # 定义异步函数
- async def hello():
- asyncio.sleep(1)
- print('Hello World:%s' % time.time())
- def run():
- for i in range(5):
- loop.run_until_complete(hello())
- if __name__ =='__main__':
- run()
- Hello World:1527595104.8338501
- Hello World:1527595104.8338501
- Hello World:1527595104.8338501
- Hello World:1527595104.8338501
- Hello World:1527595104.8338501
async def 用来定义异步函数,其内部有异步操作。每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。=
如果需要并发http请求怎么办呢,通常是用requests,但requests是同步的库,如果想异步的话需要引入aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个session对象,然后用session对象去打开网页。session可以进行多项操作,比如post, get, put, head等。
- async with ClientSession() as session:
- async with session.get(url) as response:
- import asyncio
- from aiohttp import ClientSession
- tasks = []
- url = "https://www.baidu.com/{}"
- async def hello(url):
- async with ClientSession() as session:
- async with session.get(url) as response:
- response = await response.read()
- print(response)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- loop.run_until_complete(hello(url))
首先async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,response.read()等待request响应,是个耗IO操作。然后使用ClientSession类发起http请求。
- import time
- import asyncio
- from aiohttp import ClientSession
- tasks = []
- url = "https://www.baidu.com/{}"
- async def hello(url):
- async with ClientSession() as session:
- async with session.get(url) as response:
- response = await response.read()
- # print(response)
- print('Hello World:%s' % time.time())
- def run():
- for i in range(5):
- task = asyncio.ensure_future(hello(url.format(i)))
- tasks.append(task)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- run()
- loop.run_until_complete(asyncio.wait(tasks))
- Hello World:1527754874.8915546
- Hello World:1527754874.899039
- Hello World:1527754874.90004
- Hello World:1527754874.9095392
- Hello World:1527754874.9190395
- import time
- import asyncio
- from aiohttp import ClientSession
- tasks = []
- url = "https://www.baidu.com/{}"
- async def hello(url):
- async with ClientSession() as session:
- async with session.get(url) as response:
- # print(response)
- print('Hello World:%s' % time.time())
- return await response.read()
- def run():
- for i in range(5):
- task = asyncio.ensure_future(hello(url.format(i)))
- tasks.append(task)
- result = loop.run_until_complete(asyncio.gather(*tasks))
- print(result)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- run()
- Hello World:1527765369.0785167
- Hello World:1527765369.0845182
- Hello World:1527765369.0910277
- Hello World:1527765369.0920424
- Hello World:1527765369.097017
- [b'<!DOCTYPE html>\r\n<!--STATUS OK-->\r\n<html>\r\n<head>\r\n......
假如你的并发达到2000个,程序会报错:ValueError: too many file descriptors in select()。报错的原因字面上看是 Python 调取的 select 对打开的文件有最大数量的限制,这个其实是操作系统的限制,linux打开文件的最大数默认是1024,windows默认是509,超过了这个值,程序就开始报错。这里我们有三种方法解决这个问题:
- #coding:utf-8
- import time,asyncio,aiohttp
- url = 'https://www.baidu.com/'
- async def hello(url,semaphore):
- async with semaphore:
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- return await response.read()
- async def run():
- semaphore = asyncio.Semaphore(500) # 限制并发量为500
- to_get = [hello(url.format(),semaphore) for _ in range(1000)] #总共1000任务
- await asyncio.wait(to_get)
- if __name__ == '__main__':
- # now=lambda :time.time()
- loop = asyncio.get_event_loop()
- loop.run_until_complete(run())
- loop.close()
