当前位置:   article > 正文

Python3 中的协程 asyncio async await 概念(实例)(ValueError: too many file descriptors in select())_python was never awaited

python was never awaited

代码实例

  1. import time
  2. def demo4():
  3. """
  4. 这是最终我们想要的实现.
  5. """
  6. import asyncio # 引入 asyncio 库
  7. async def washing1():
  8. await asyncio.sleep(3) # 使用 asyncio.sleep(), 它返回的是一个可等待的对象
  9. print('washer1 finished')
  10. async def washing2():
  11. await asyncio.sleep(2)
  12. print('washer2 finished')
  13. async def washing3():
  14. await asyncio.sleep(5)
  15. print('washer3 finished')
  16. """
  17. 事件循环机制分为以下几步骤:
  18. 1. 创建一个事件循环
  19. 2. 将异步函数加入事件队列
  20. 3. 执行事件队列, 直到最晚的一个事件被处理完毕后结束
  21. 4. 最后建议用 close() 方法关闭事件循环, 以彻底清理 loop 对象防止误用
  22. """
  23. # 1. 创建一个事件循环
  24. loop = asyncio.get_event_loop()
  25. # 2. 将异步函数加入事件队列
  26. tasks = [
  27. washing1(),
  28. washing2(),
  29. washing3(),
  30. ]
  31. # 3. 执行事件队列, 直到最晚的一个事件被处理完毕后结束
  32. loop.run_until_complete(asyncio.wait(tasks))
  33. """
  34. PS: 如果不满意想要 "多洗几遍", 可以多写几句:
  35. loop.run_until_complete(asyncio.wait(tasks))
  36. loop.run_until_complete(asyncio.wait(tasks))
  37. loop.run_until_complete(asyncio.wait(tasks))
  38. ...
  39. """
  40. # 4. 如果不再使用 loop, 建议养成良好关闭的习惯
  41. # (有点类似于文件读写结束时的 close() 操作)
  42. loop.close()
  43. """
  44. 最终的打印效果:
  45. washer2 finished
  46. washer1 finished
  47. washer3 finished
  48. elapsed time = 5.126561641693115
  49. (毕竟切换线程也要有点耗时的)
  50. 说句题外话, 我看有的博主的加入事件队列是这样写的:
  51. tasks = [
  52. loop.create_task(washing1()),
  53. loop.create_task(washing2()),
  54. loop.create_task(washing3()),
  55. ]
  56. 运行的效果是一样的, 暂不清楚为什么他们这样做.
  57. """
  58. if __name__ == '__main__':
  59. # 为验证是否真的缩短了时间, 我们计个时
  60. start = time.time()
  61. # demo1() # 需花费10秒
  62. # demo2() # 会报错: RuntimeWarning: coroutine ... was never awaited
  63. # demo3() # 会报错: RuntimeWarning: coroutine ... was never awaited
  64. demo4() # 需花费5秒多一点点
  65. end = time.time()
  66. print('elapsed time = ' + str(end - start))

原作者实例 

  1. from time import sleep, time
  2. def demo1():
  3. """
  4. 假设我们有三台洗衣机, 现在有三批衣服需要分别放到这三台洗衣机里面洗.
  5. """
  6. def washing1():
  7. sleep(3) # 第一台洗衣机, 需要洗3秒才能洗完 (只是打个比方)
  8. print('washer1 finished') # 洗完的时候, 洗衣机会响一下, 告诉我们洗完了
  9. def washing2():
  10. sleep(2)
  11. print('washer2 finished')
  12. def washing3():
  13. sleep(5)
  14. print('washer3 finished')
  15. washing1()
  16. washing2()
  17. washing3()
  18. """
  19. 这个还是很容易理解的, 运行 demo1(), 那么需要10秒钟才能把全部衣服洗完.
  20. 没错, 大部分时间都花在挨个地等洗衣机上了.
  21. """
  22. def demo2():
  23. """
  24. 现在我们想要避免无谓的等待, 为了提高效率, 我们将使用 async.
  25. washing1/2/3() 本是 "普通函数", 现在我们用 async 把它们升级为 "异步函数".
  26. 注: 一个异步的函数, 有个更标准的称呼, 我们叫它 "协程" (coroutine).
  27. """
  28. async def washing1():
  29. sleep(3)
  30. print('washer1 finished')
  31. async def washing2():
  32. sleep(2)
  33. print('washer2 finished')
  34. async def washing3():
  35. sleep(5)
  36. print('washer3 finished')
  37. washing1()
  38. washing2()
  39. washing3()
  40. """
  41. 从正常人的理解来看, 我们现在有了异步函数, 但是却忘了定义应该什么时候 "离开" 一台洗衣
  42. 机, 去看看另一个... 这就会导致, 现在的情况是我们一边看着第一台洗衣机, 一边着急地想着
  43. "是不是该去开第二台洗衣机了呢?" 但又不敢去 (只是打个比方), 最终还是花了10秒的时间才
  44. 把衣服洗完.
  45. PS: 其实 demo2() 是无法运行的, Python 会直接警告你:
  46. RuntimeWarning: coroutine 'demo2.<locals>.washing1' was never awaited
  47. RuntimeWarning: coroutine 'demo2.<locals>.washing2' was never awaited
  48. RuntimeWarning: coroutine 'demo2.<locals>.washing3' was never awaited
  49. """
  50. def demo3():
  51. """
  52. 现在我们吸取了上次的教训, 告诉自己洗衣服的过程是 "可等待的" (awaitable), 在它开始洗衣服
  53. 的时候, 我们可以去弄别的机器.
  54. """
  55. async def washing1():
  56. await sleep(3) # 注意这里加入了 await
  57. print('washer1 finished')
  58. async def washing2():
  59. await sleep(2)
  60. print('washer2 finished')
  61. async def washing3():
  62. await sleep(5)
  63. print('washer3 finished')
  64. washing1()
  65. washing2()
  66. washing3()
  67. """
  68. 尝试运行一下, 我们会发现还是会报错 (报错内容和 demo2 一样). 这里我说一下原因, 以及在
  69. demo4 中会给出一个最终答案:
  70. 1. 第一个问题是, await 后面必须跟一个 awaitable 类型或者具有 __await__ 属性的
  71. 对象. 这个 awaitable, 并不是我们认为 sleep() 是 awaitable 就可以 await 了,
  72. 常见的 awaitable 对象应该是:
  73. await asyncio.sleep(3) # asyncio 库的 sleep() 机制与 time.sleep() 不
  74. # 同, 前者是 "假性睡眠", 后者是会导致线程阻塞的 "真性睡眠"
  75. await an_async_function() # 一个异步的函数, 也是可等待的对象
  76. 以下是不可等待的:
  77. await time.sleep(3)
  78. x = await 'hello' # <class 'str'> doesn't define '__await__'
  79. x = await 3 + 2 # <class 'int'> dosen't define '__await__'
  80. x = await None # ...
  81. x = await a_sync_function() # 普通的函数, 是不可等待的
  82. 2. 第二个问题是, 如果我们要执行异步函数, 不能用这样的调用方法:
  83. washing1()
  84. washing2()
  85. washing3()
  86. 而应该用 asyncio 库中的事件循环机制来启动 (具体见 demo4 讲解).
  87. """
  88. def demo4():
  89. """
  90. 这是最终我们想要的实现.
  91. """
  92. import asyncio # 引入 asyncio 库
  93. async def washing1():
  94. await asyncio.sleep(3) # 使用 asyncio.sleep(), 它返回的是一个可等待的对象
  95. print('washer1 finished')
  96. async def washing2():
  97. await asyncio.sleep(2)
  98. print('washer2 finished')
  99. async def washing3():
  100. await asyncio.sleep(5)
  101. print('washer3 finished')
  102. """
  103. 事件循环机制分为以下几步骤:
  104. 1. 创建一个事件循环
  105. 2. 将异步函数加入事件队列
  106. 3. 执行事件队列, 直到最晚的一个事件被处理完毕后结束
  107. 4. 最后建议用 close() 方法关闭事件循环, 以彻底清理 loop 对象防止误用
  108. """
  109. # 1. 创建一个事件循环
  110. loop = asyncio.get_event_loop()
  111. # 2. 将异步函数加入事件队列
  112. tasks = [
  113. washing1(),
  114. washing2(),
  115. washing3(),
  116. ]
  117. # 3. 执行事件队列, 直到最晚的一个事件被处理完毕后结束
  118. loop.run_until_complete(asyncio.wait(tasks))
  119. """
  120. PS: 如果不满意想要 "多洗几遍", 可以多写几句:
  121. loop.run_until_complete(asyncio.wait(tasks))
  122. loop.run_until_complete(asyncio.wait(tasks))
  123. loop.run_until_complete(asyncio.wait(tasks))
  124. ...
  125. """
  126. # 4. 如果不再使用 loop, 建议养成良好关闭的习惯
  127. # (有点类似于文件读写结束时的 close() 操作)
  128. loop.close()
  129. """
  130. 最终的打印效果:
  131. washer2 finished
  132. washer1 finished
  133. washer3 finished
  134. elapsed time = 5.126561641693115
  135. (毕竟切换线程也要有点耗时的)
  136. 说句题外话, 我看有的博主的加入事件队列是这样写的:
  137. tasks = [
  138. loop.create_task(washing1()),
  139. loop.create_task(washing2()),
  140. loop.create_task(washing3()),
  141. ]
  142. 运行的效果是一样的, 暂不清楚为什么他们这样做.
  143. """
  144. if __name__ == '__main__':
  145. # 为验证是否真的缩短了时间, 我们计个时
  146. start = time()
  147. # demo1() # 需花费10秒
  148. # demo2() # 会报错: RuntimeWarning: coroutine ... was never awaited
  149. # demo3() # 会报错: RuntimeWarning: coroutine ... was never awaited
  150. demo4() # 需花费5秒多一点点
  151. end = time()
  152. print('elapsed time = ' + str(end - start))

其他文章:python异步编程之asyncio(百万并发)

一、asyncio

下面通过举例来对比同步代码和异步代码编写方面的差异,其次看下两者性能上的差距,我们使用sleep(1)模拟耗时1秒的io操作。

 ·同步代码

  1. import time
  2. def hello():
  3. time.sleep(1)
  4. def run():
  5. for i in range(5):
  6. hello()
  7. print('Hello World:%s' % time.time()) # 任何伟大的代码都是从Hello World 开始的!
  8. if __name__ == '__main__':
  9. run()

输出:(间隔约是1s)

  1. Hello World:1527595175.4728756
  2. Hello World:1527595176.473001
  3. Hello World:1527595177.473494
  4. Hello World:1527595178.4739306
  5. Hello World:1527595179.474482

异步高并发代码

  1. import time
  2. import asyncio
  3. loop = asyncio.get_event_loop()
  4. # 定义异步函数
  5. async def hello():
  6. asyncio.sleep(1)
  7. print('Hello World:%s' % time.time())
  8. def run():
  9. for i in range(5):
  10. loop.run_until_complete(hello())
  11. if __name__ =='__main__':
  12. run()

 输出:

  1. Hello World:1527595104.8338501
  2. Hello World:1527595104.8338501
  3. Hello World:1527595104.8338501
  4. Hello World:1527595104.8338501
  5. Hello World:1527595104.8338501
async def 用来定义异步函数,其内部有异步操作。每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。=

二、aiohttp

  如果需要并发http请求怎么办呢,通常是用requests,但requests是同步的库,如果想异步的话需要引入aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个session对象,然后用session对象去打开网页。session可以进行多项操作,比如post, get, put, head等。

基本用法:

  1. async with ClientSession() as session:
  2. async with session.get(url) as response:

aiohttp异步实现的例子:

  1. import asyncio
  2. from aiohttp import ClientSession
  3. tasks = []
  4. url = "https://www.baidu.com/{}"
  5. async def hello(url):
  6. async with ClientSession() as session:
  7. async with session.get(url) as response:
  8. response = await response.read()
  9. print(response)
  10. if __name__ == '__main__':
  11. loop = asyncio.get_event_loop()
  12. loop.run_until_complete(hello(url))

首先async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,response.read()等待request响应,是个耗IO操作。然后使用ClientSession类发起http请求。

多链接异步访问

如果我们需要请求多个URL该怎么办呢,同步的做法访问多个URL只需要加个for循环就可以了。但异步的实现方式并没那么容易,在之前的基础上需要将hello()包装在asyncio的Future对象中,然后将Future对象列表作为任务传递给事件循环

  1. import time
  2. import asyncio
  3. from aiohttp import ClientSession
  4. tasks = []
  5. url = "https://www.baidu.com/{}"
  6. async def hello(url):
  7. async with ClientSession() as session:
  8. async with session.get(url) as response:
  9. response = await response.read()
  10. # print(response)
  11. print('Hello World:%s' % time.time())
  12. def run():
  13. for i in range(5):
  14. task = asyncio.ensure_future(hello(url.format(i)))
  15. tasks.append(task)
  16. if __name__ == '__main__':
  17. loop = asyncio.get_event_loop()
  18. run()
  19. loop.run_until_complete(asyncio.wait(tasks))

 输出:

  1. Hello World:1527754874.8915546
  2. Hello World:1527754874.899039
  3. Hello World:1527754874.90004
  4. Hello World:1527754874.9095392
  5. Hello World:1527754874.9190395

收集http响应

好了,上面介绍了访问不同链接的异步实现方式,但是我们只是发出了请求,如果要把响应一一收集到一个列表中,最后保存到本地或者打印出来要怎么实现呢,可通过asyncio.gather(*tasks)将响应全部收集起来,具体通过下面实例来演示。

  1. import time
  2. import asyncio
  3. from aiohttp import ClientSession
  4. tasks = []
  5. url = "https://www.baidu.com/{}"
  6. async def hello(url):
  7. async with ClientSession() as session:
  8. async with session.get(url) as response:
  9. # print(response)
  10. print('Hello World:%s' % time.time())
  11. return await response.read()
  12. def run():
  13. for i in range(5):
  14. task = asyncio.ensure_future(hello(url.format(i)))
  15. tasks.append(task)
  16. result = loop.run_until_complete(asyncio.gather(*tasks))
  17. print(result)
  18. if __name__ == '__main__':
  19. loop = asyncio.get_event_loop()
  20. run()

输出:

  1. Hello World:1527765369.0785167
  2. Hello World:1527765369.0845182
  3. Hello World:1527765369.0910277
  4. Hello World:1527765369.0920424
  5. Hello World:1527765369.097017
  6. [b'<!DOCTYPE html>\r\n<!--STATUS OK-->\r\n<html>\r\n<head>\r\n......

异常解决

假如你的并发达到2000个,程序会报错:ValueError: too many file descriptors in select()。报错的原因字面上看是 Python 调取的 select 对打开的文件有最大数量的限制,这个其实是操作系统的限制,linux打开文件的最大数默认是1024,windows默认是509,超过了这个值,程序就开始报错。这里我们有三种方法解决这个问题:

1.限制并发数量。(一次不要塞那么多任务,或者限制最大并发数量)

2.使用回调的方式

3.修改操作系统打开文件数的最大限制,在系统里有个配置文件可以修改默认值,具体步骤不再说明了。

不修改系统默认配置的话,个人推荐限制并发数的方法,设置并发数为500,处理速度更快。

  1. #coding:utf-8
  2. import time,asyncio,aiohttp
  3. url = 'https://www.baidu.com/'
  4. async def hello(url,semaphore):
  5. async with semaphore:
  6. async with aiohttp.ClientSession() as session:
  7. async with session.get(url) as response:
  8. return await response.read()
  9. async def run():
  10. semaphore = asyncio.Semaphore(500) # 限制并发量为500
  11. to_get = [hello(url.format(),semaphore) for _ in range(1000)] #总共1000任务
  12. await asyncio.wait(to_get)
  13. if __name__ == '__main__':
  14. # now=lambda :time.time()
  15. loop = asyncio.get_event_loop()
  16. loop.run_until_complete(run())
  17. loop.close()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/79987
推荐阅读
相关标签
  

闽ICP备14008679号