当前位置:   article > 正文

python异步asy_python异步编程之 async await

async with as

python异步编程之 async await

本文代码采用python3.6运行.

发展史

- 3.3: The yield from expression allows for generator delegation.

- 3.4: asyncio was introduced in the Python standard library with provisional API status.

- 3.5: async and await became a part of the Python grammar, used to signify and wait on coroutines. They were not yet reserved keywords. (You could still define functions or variables named async and await.)

- 3.6: Asynchronous generators and asynchronous comprehensions were introduced. The API of asyncio was declared stable rather than provisional.

- 3.7: async and await became reserved keywords. (They cannot be used as identifiers.) They are intended to replace the asyncio.coroutine() decorator. asyncio.run() was introduced to the asyncio package, among a bunch of other features.

本质上是使用了协程,当调用await时让渡CPU,有结果返回时再切换回来.相比使用回调来协调执行顺序来说,await编程方式在每个协程中代码是顺序执行的,对代码编写来说更为友好.

语法

async def

调用一个async func返回了什么?

async def fun1():

return

async def fun2():

yield

print(type(fun1()))

print(type(fun2()))

#

#

await

await只能用于async函数中

async def f(x):

y = await z(x) # OK - `await` and `return` allowed in coroutines

return y

async def g(x):

yield x # OK - this is an async generator

async def m(x):

yield from gen(x) # No - SyntaxError

def m(x):

y = await z(x) # Still no - SyntaxError (no `async def` here)

return y

await后面只能跟一个awaitable对象(一般情况下是一个Coroutine对象)

调用(启用)方式

import time

import asyncio

async def coro(seq) -> list:

"""'IO' wait time is proportional to the max element."""

await asyncio.sleep(max(seq))

return list(reversed(seq))

async def main():

"""

ensure_future():创建任务,Python 3.7+后被create_task()取代.

t.done(): 返回任务执行状态:Bool类型.

执行结果:

------------------------------------------

t: type

t done: False

t result: [1, 2, 3]

t done: True

------------------------------------------

"""

# t = asyncio.create_task(coro([3, 2, 1])) # Python 3.7+

t = asyncio.ensure_future(coro([3, 2, 1])) # Python 3.*

print(f't: type {type(t)}')

print(f't done: {t.done()}')

a = await t

print(f't result: {a}')

print(f't done: {t.done()}')

async def main2():

"""

tasks = gather():收集多个任务

await tasks:任务全部完成一起返回

执行结果:

------------------------------------------

Start: 11:11:29

[[1, 2, 3], [0, 5, 6]]

End: 11:11:35

Both tasks done: True

------------------------------------------

"""

t = asyncio.ensure_future(coro([3, 2, 1]))

t2 = asyncio.ensure_future(coro([6, 5, 0]))

print('Start:', time.strftime('%X'))

a = await asyncio.gather(t, t2)

print(a)

print('End:', time.strftime('%X')) # Should be 10 seconds

print(f'Both tasks done: {all((t.done(), t2.done()))}')

async def main3():

"""

as_completed 每个任务完成即返回该任务的结果.

执行结果:

------------------------------------------

Start: 11:08:13

res: [1, 2, 3] completed at 11:08:16

# 三秒后

res: [0, 5, 6] completed at 11:08:19

End: 11:08:19

Both tasks done: True

------------------------------------------

"""

t = asyncio.ensure_future(coro([3, 2, 1]))

t2 = asyncio.ensure_future(coro([6, 5, 0]))

print('Start:', time.strftime('%X'))

for res in asyncio.as_completed((t, t2)):

compl = await res

print(f'res: {compl} completed at {time.strftime("%X")}')

print('End:', time.strftime('%X'))

print(f'Both tasks done: {all((t.done(), t2.done()))}')

if __name__ == '__main__':

# python < 3.7

loop = asyncio.get_event_loop()

# loop.run_until_complete(main())

# loop.run_until_complete(main2())

loop.run_until_complete(main3())

loop.close()

# python 3.7

# asyncio.run(main())

DEMO

并发http请求

该Demo使用aiohttp发送异步请求.

"""

并发爬取36kr数据,并保存为本地文件

"""

import asyncio

import aiohttp

import json

async def get_json(client, url):

async with client.get(url) as response:

assert response.status == 200

return await response.read()

async def get_36_kr(page, client):

data1 = await get_json(client, 'https://36kr.com/api/search-column/mainsite?per_page=20&page={}'.format(page))

j = json.loads(data1.decode('utf-8'))

with open(f"./content/{page}.json", "w", encoding='utf-8') as f:

f.write(json.dumps(j, indent=2, ensure_ascii=False))

print('Done with page:', page)

async def main():

client = aiohttp.ClientSession(loop=loop)

tasks = [asyncio.ensure_future(get_36_kr(i, client)) for i in range(100)]

await asyncio.gather(*tasks)

await client.close()

if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

loop.close()

精简下(需python3.7下运行)

import asyncio

import aiohttp

import json

async def get_36_kr(page, client):

url = f'https://36kr.com/api/search-column/mainsite?per_page=1&page={page}'

async with client.get(url, ssl=False) as response:

assert response.status == 200

return await response.json()

async def main():

async with aiohttp.ClientSession() as client:

tasks = [asyncio.ensure_future(get_36_kr(i, client))

for i in range(2)]

res = await asyncio.gather(*tasks)

print(json.dumps(res,indent=2))

if __name__ == '__main__':

asyncio.run(main())

异步IO库

async/await可使用的库.

From aio-libs:

aiohttp: Asynchronous HTTP client/server framework

aioredis: Async IO Redis support

aiopg: Async IO PostgreSQL support

aiomcache: Async IO memcached client

aiokafka: Async IO Kafka client

aiozmq: Async IO ZeroMQ support

aiojobs: Jobs scheduler for managing background tasks

async_lru: Simple LRU cache for async IO

From magicstack:

uvloop: Ultra fast async IO event loop

asyncpg: (Also very fast) async IO PostgreSQL support

From other hosts:

trio: Friendlier asyncio intended to showcase a radically simpler design

aiofiles: Async file IO

asks: Async requests-like http library

asyncio-redis: Async IO Redis support

aioprocessing: Integrates multiprocessing module with asyncio

umongo: Async IO MongoDB client

unsync: Unsynchronize asyncio

aiostream: Like itertools, but async

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/102538
推荐阅读
相关标签
  

闽ICP备14008679号