赞
踩
原理:1次性发送多个请求,然后挂起,去处理其他任务,对方响应后通知服务器(称为"回调"),服务
器再回来处理响应.可以使用第3方组件来完成发送请求和回调的工作
一.通过多线程/多进程实现并发
1.通过多线程实现:
#################################版本一#################################
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url):
res=requests.get(url)
print(url,res)
pool=ThreadPoolExecutor(7)
url_list=[
"http://www.cnblogs.com",
"http://huaban.com/favorite",
"http://www.bing.com",
"http://www.zhihu.com",
"http://www.sina.com",
"http://www.baidu.com",
"http://www.autohome.com.cn"
]
for url in url_list:
pool.submit(task,url)
pool.shutdown(wait=True)
#结果:
http://www.baidu.com <Response [200]>
http://www.bing.com <Response [200]>
http://www.cnblogs.com <Response [200]>
http://www.sina.com <Response [200]>
http://huaban.com/favorite <Response [200]>
http://www.autohome.com.cn <Response [200]>
http://www.zhihu.com <Response [400]>
#################################版本二#################################
#注意:2个版本的功能完全相同,只不过版本二降低了耦合度
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url):
res=requests.get(url)
return res
def done(future,*args,**kwargs):
print(future,args,kwargs)
response=future.result()#得到response对象
print(response)
pool=ThreadPoolExecutor(7)
url_list=[
"http://www.cnblogs.com",
"http://huaban.com/favorite",
"http://www.bing.com",
"http://www.zhihu.com",
"http://www.sina.com",
"http://www.baidu.com",
"http://www.autohome.com.cn"
]
for url in url_list:
v=pool.submit(task,url)
v.add_done_callback(done)
#在收到task()的返回值后执行done(),传入的参数中就包括task()的返回值
pool.shutdown(wait=True)
#结果:
<Future at 0x20cf9f04bc8 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9eec848 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf92edbc8 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9eea508 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9f2af88 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9efab08 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9eec9c8 state=finished returned Response> () {}
<Response [400]>
#######################################################################
#问题:
在请求发送出去到收到响应之间的这段时间,线程是空闲的
2.通过多进程实现:
#################################版本一#################################
from concurrent.futures import ProcessPoolExecutor
import requests
def task(url):
res=requests.get(url)
print(url,res)
if __name__ == '__main__':
pool=ProcessPoolExecutor(7)
url_list=[
"http://www.cnblogs.com",
"http://huaban.com/favorite",
"http://www.bing.com",
"http://www.zhihu.com",
"http://www.sina.com",
"http://www.baidu.com",
"http://www.autohome.com.cn"
]
for url in url_list:
pool.submit(task,url)
pool.shutdown(wait=True)
#结果:
http://www.baidu.com <Response [200]>
http://www.bing.com <Response [200]>
http://www.cnblogs.com <Response [200]>
http://www.sina.com <Response [200]>
http://huaban.com/favorite <Response [200]>
http://www.autohome.com.cn <Response [200]>
http://www.zhihu.com <Response [400]>
#################################版本二#################################
#注意:2个版本的功能完全相同,只不过版本二降低了耦合度
from concurrent.futures import ProcessPoolExecutor
import requests
def task(url):
res=requests.get(url)
return res
def done(future,*args,**kwargs):
print(future,args,kwargs)
response=future.result()#得到response对象
print(response)
if __name__=='__main__':
pool=ProcessPoolExecutor(7)
url_list=[
"http://www.cnblogs.com",
"http://huaban.com/favorite",
"http://www.bing.com",
"http://www.zhihu.com",
"http://www.sina.com",
"http://www.baidu.com",
"http://www.autohome.com.cn"
]
for url in url_list:
v=pool.submit(task,url)
v.add_done_callback(done)
#在收到task()的返回值后执行done(),传入的参数中就包括task()的返回值
pool.shutdown(wait=True)
#结果:
<Future at 0x20cf9f04bc8 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9eec848 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf92edbc8 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9eea508 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9f2af88 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9efab08 state=finished returned Response> () {}
<Response [200]>
<Future at 0x20cf9eec9c8 state=finished returned Response> () {}
<Response [400]>
#######################################################################
#问题:
在请求发送出去到收到响应之间的这段时间,进程是空闲的,即IO阻塞会造成线程/进程的浪费
二.通过协程+异步IO实现并发
使用频率:Gevent+Requests>Twisted>Tornado>Asyncio+Aiohttp
1.使用Asyncio模块实现
该模块使用协程(微线程)+异步IO来实现并发,实际上相当于通过1个线程处理多个HTTP请求
Asyncio模块就是通过异步IO+协程实现的单线程并发IO操作
(1)Asyncio模块示例:
import asyncio
@asyncio.coroutine
def task():
print('before...func1......')
yield from asyncio.sleep(5)#不能是time.sleep()
print('end...func1......')
tasks=[task(),task()]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
#结果:
before...func1......
before...func1......
end...func1......
end...func1......
(2)改造Asyncio模块以发送HTTP请求:
Asyncio不支持发送HTTP请求,只支持发送TCP请求,因此需要自定义封装HTTP数据包
import asyncio
@asyncio.coroutine
def fetch_async(host,url='/'):
print(host,url)
reader,writer=yield from asyncio.open_connection(host,80)#创建socket连接
request_header_content="""GET %s HTTP/1.0\r\nHost: %s\r\n\r\n"""%(url,host,)#自定义封装HTTP数据包
request_header_content=bytes(request_header_content,encoding='utf-8')
#发送TCP请求(通过自定义,相当于发送HTTP请求)并接收返回值:
writer.write(request_header_content)
yield from writer.drain()
text=yield from reader.read()
print(host,url,text)
writer.close()
tasks=[
fetch_async('www.cnblogs.com','/wupeiqi/'),
fetch_async('dig.chouti.com','/pic/show?nid=4073644713430508&lid=10273091')
]
loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
#结果:
www.cnblogs.com /wupeiqi/
dig.chouti.com /pic/show?nid=4073644713430508&lid=10273091
www.cnblogs.com /wupeiqi/ b'HTTP/1.1 301 Moved Permanently\r\nDate: Mon, 19 Oct 2020 04:06:51 GMT\r\nContent-Length: 0\r\nConnection: close\r\nLocation: https://www.cnblogs.com/wupeiqi/\r\n\r\n'
dig.chouti.com /pic/show?nid=4073644713430508&lid=10273091 b'HTTP/1.1 301 Moved Permanently\r\nDate: Mon, 19 Oct 2020 04:06:51 GMT\r\nContent-Type: text/html\r\nContent-Length: 278\r\nConnection: close\r\nServer: Tengine\r\nLocation: https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091\r\nX-Via: 1.1 PSjszjsx4bl93:6 (Cdn Cache Server V2.0), 1.1 PS-000-01ywf53:10 (Cdn Cache Server V2.0)\r\nX-Ws-Request-Id: 5f8d10db_PS-000-0188p51_45617-29757\r\n\r\n<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">\r\n<html>\r\n<head><title>301 Moved Permanently</title></head>\r\n<body bgcolor="white">\r\n<h1>301 Moved Permanently</h1>\r\n<p>The requested resource has been assigned a new permanent URI.</p>\r\n<hr/>Powered by Tengine</body>\r\n</html>\r\n'
(3)使用Asyncio模块+Aiohttp模块实现:
Aiohttp是基于Asyncio实现的HTTP框架,帮助用户完成了HTTP数据包的封装
import aiohttp,asyncio
@asyncio.coroutine
def fetch_async(url):
print(url)
response = yield from aiohttp.request('GET', url)
# data = yield from response.read()
# print(url, data)
print(url, response)
response.close()
tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]
event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
(4)使用Asyncio模块+Requests模块实现:
通过Requests模块完成HTTP数据包的封装
import asyncio,requests
@asyncio.coroutine
def fetch_async(func,*args):
print(func,args)
loop=asyncio.get_event_loop()
future=loop.run_in_executor(None,func,*args)#在内部会执行func(*args)
response=yield from future
print(response.url,response.content)
tasks=[
fetch_async(requests.get,'http://www.baidu.com'),
fetch_async(requests.get,'http://www.bing.com')
]
loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
#结果:
<function get at 0x00000244A1826B88> ('http://www.baidu.com',)
<function get at 0x00000244A1826B88> ('http://www.bing.com',)
http://www.baidu.com/ b'<!DOCTYPE html>\r\n<!--STATUS OK--><html> <head>...(省略)
http://cn.bing.com/ b'<!DOCTYPE html><html lang="zh">...(省略)
2.使用Gevent模块实现
原理和Asyncio相同,只是在实现方式上有所区别
Gevent模块本身只实现了异步IO,内部是通过Greenlet模块实现的协程
另外,Gevent模块只支持TCP请求,因此需要自行完成HTTP数据包的封装
(1)使用Gevent模块+Requests模块实现:
import gevent
from gevent import monkey
monkey.patch_all()#找到所有Socket请求,并封装成异步请求
import requests#应在monkey.patch_all()之后,否则会报错
def fetch_async(method,url,req_kwargs):
print(method,url,req_kwargs)
response=requests.request(method=method,url=url,**req_kwargs)#进行HTTP数据包的封装并发送
print(response.url,response.content)
#发送请求:
gevent.joinall([
gevent.spawn(fetch_async,method='get',url='https://www.python.org/',req_kwargs={}),
gevent.spawn(fetch_async,method='get',url='https://www.baidu.com/',req_kwargs={}),
gevent.spawn(fetch_async,method='get',url='https://www.bing.com/',req_kwargs={}),
])
#发送请求(协程池控制最大协程数量,即最大请求数量):
#from gevent.pool import Pool
#pool=Pool(5)#最多同时保持5个请求的连接
#gevent.joinall([
#pool.spawn(fetch_async,method='get',url='https://www.python.org/',req_kwargs={}),
#pool.spawn(fetch_async,method='get',url='https://www.yahoo.com/',req_kwargs={}),
#pool.spawn(fetch_async,method='get',url='https://www.github.com/',req_kwargs={}),
#])
#结果:
get https://www.python.org/ {}
get https://www.baidu.com/ {}
get https://www.bing.com/ {}
https://www.baidu.com/ b'<!DOCTYPE html>\r\n...(省略)
https://cn.bing.com/ b'<!DOCTYPE html><html lang="zh">...(省略)
https://www.python.org/ b'<!doctype html>\n<!--[if lt IE 7]>...(省略)
(2)实现Grequests模块实现:
该模块就是将Gevent模块和Requests模块封装到了一起
import grequests
request_list=[
grequests.get('http://httpbin.org/delay/1',timeout=0.001),
grequests.get('http://example.com'),
grequests.get('http://httpbin.org/status/500')
]
#定义异常处理函数:
#def exception_handler(request,exception):
# print(request,exception)
# print("Request failed")
#执行并获取响应列表:
response_list=grequests.map(request_list)
#response_list=grequests.map(request_list,exception_handler=exception_handler)
print(response_list)
#结果:
#<grequests.AsyncRequest object at 0x000001FE3B424F48>...(省略)
#Request failed
[None, <Response [200]>, <Response [500]>]#出现错误,则返回None
3.使用Twisted框架实现
Twisted是用Python实现的基于事件驱动的网络引擎框架,除了实现爬虫的并发请求外,还有很多功能
from twisted.web.client import getPage,defer
from twisted.internet import reactor#,defer#从任何1个位置导入均可
def one_done(arg):
print("arg:",arg)
def all_done(arg):#所有请求都完成后执行
print("done")
reactor.stop()#停止查找defer_list#如果不添加本行,会不断检查defer_list,因为reactor.run()是个死循环
@defer.inlineCallbacks
def task(url):
res=getPage(bytes(url,encoding='utf8'))#发送HTTP请求
res.addCallback(one_done)#添加回调函数
print(res)
yield res
url_list=['http://www.bing.com','http://www.baidu.com',]
defer_list=[]
for url in url_list:
defer_list.append(task(url))
#会执行task(),不等对方响应就返回,然后执行下1个task()
#这里task()的返回值(即res)是1个特殊的对象(Deferred对象),表明向哪里发送的请求
print(defer_list)
dlist=defer.DeferredList(defer_list)
dlist.addBoth(all_done)#全部请求都收到了响应就执行all_done()
reactor.run()#这是1个死循环
#不断查找整个defer_list,有1个请求被响应了就执行1次one_done()
#结果:
<Deferred at 0x2262a14cdc8>#res
<Deferred at 0x2262a1555c8>#res
[<Deferred at 0x226291316c8>, <Deferred at 0x2261a39a588>]#defer_list
E:/program/PyCharmProject/多线程实现并发/a.py:13: DeprecationWarning: twisted.web.client.getPage was deprecated in Twisted 16.7.0; please use https://pypi.org/project/treq/ or twisted.web.client.Agent instead
res=getPage(bytes(url,encoding='utf8'))#发送HTTP请求
arg: b'<!DOCTYPE html><!--STATUS OK-->\n\n\n <html>...(省略)
arg: b'<!DOCTYPE html><html lang="zh">...(省略)
done
4.使用Tornado框架实现
可以实现爬虫的并发请求,不过较少单独作为爬虫使用
from tornado.httpclient import AsyncHTTPClient,HTTPRequest
from tornado import ioloop
COUNT=0
def handle_response(response):#单个响应的回调函数
print("AAA")
"""
处理返回值内容(需要维护计数器,来停止IO循环),调用ioloop.IOLoop.current().stop()
:param response:
:return:
"""
global COUNT#计数器
COUNT-=1
if response.error:
print("Error:",response.error)
elif COUNT==0:
ioloop.IOLoop.current().stop()#终止ioloop.IOLoop.current().start()这个死循环
else:
print(response.body)
def func():
url_list=[
'https://www.baidu.com',
'https://www.baidu.com',
]
global COUNT
COUNT=len(url_list)
for url in url_list:
print(url)
http_client=AsyncHTTPClient()
http_client.fetch(HTTPRequest(url),handle_response)
ioloop.IOLoop.current().add_callback(func)#执行func()
ioloop.IOLoop.current().start()#是1个死循环
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。