赞
踩
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(20)
pool.map(job_worker, result_cursor)
pool.close()
pool.join()
""" 可以实现并发 但是,请求发送出去后和返回之前,中间时期线程空闲 编写方式: - 直接返回处理 - 通过回调函数处理 """ ########### 编写方式一 ########### """ from concurrent.futures import ThreadPoolExecutor import requests import time def task(url): response = requests.get(url) print(url,response) # 写正则表达式 pool = ThreadPoolExecutor(7) url_list = [ 'http://www.cnblogs.com/wupeiqi', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: pool.submit(task,url) pool.shutdown(wait=True) """
########### 编写方式二 ########### from concurrent.futures import ThreadPoolExecutor import requests import time def task(url): """ 下载页面 :param url: :return: """ response = requests.get(url) return response def done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content) pool = ThreadPoolExecutor(7) url_list = [ 'http://www.cnblogs.com/wupeiqi', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: v = pool.submit(task,url) v.add_done_callback(done) pool.shutdown(wait=True)
""" 可以实现并发 但是,请求发送出去后和返回之前,中间时期进程空闲 编写方式: - 直接返回处理 - 通过回调函数处理 """ ########### 编写方式一 ########### """ from concurrent.futures import ProcessPoolExecutor import requests import time def task(url): response = requests.get(url) print(url,response) # 写正则表达式 pool = ProcessPoolExecutor(7) url_list = [ 'http://www.cnblogs.com/wupeiqi', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: pool.submit(task,url) pool.shutdown(wait=True) """ ########### 编写方式二 ########### from concurrent.futures import ProcessPoolExecutor import requests import time def task(url): response = requests.get(url) return response def done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content) pool = ProcessPoolExecutor(7) url_list = [ 'http://www.cnblogs.com/wupeiqi', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: v = pool.submit(task,url) v.add_done_callback(done) pool.shutdown(wait=True)
# 协程只是切换,不能控制什么时候切回来,异步IO实现回调 角色:使用者 - 多线程 - 多线程 - 协程(微线程) + 异步IO =》 1个线程发送N个Http请求 - asyncio - 示例1:asyncio.sleep(5) - 示例2:自己封装Http数据包 - 示例3:asyncio+aiohttp aiohttp模块:封装Http数据包 pip3 install aiohttp - 示例4:asyncio+requests requests模块:封装Http数据包 pip3 install requests - gevent(内部异步IO+切换),greenlet+异步IO pip3 install greenlet pip3 install gevent - 示例1:gevent+requests - 示例2:gevent(协程池,最多发多少个请求)+requests - 示例3:gevent+requests => grequests pip3 install grequests - Twisted pip3 install twisted - Tornado pip3 install tornado =====> gevent > Twisted > Tornado > asyncio
import asyncio
@asyncio.coroutine
def func1():
print('before...func1......')
yield from asyncio.sleep(5)
print('end...func1......')
tasks = [func1(), func1()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) reader, writer = yield from asyncio.open_connection(host, 80) header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
import aiohttp import asyncio @asyncio.coroutine def fetch_async(url): print(url) response = yield from aiohttp.request('GET', url) print(url, response) response.close() tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
import asyncio import requests @asyncio.coroutine def fetch_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) response = yield from future print(response.url, response.content) tasks = [ fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'), fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
import gevent import requests from gevent import monkey monkey.patch_all() def fetch_async(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url, response.content) # ##### 发送请求 ##### gevent.joinall([ gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}), ]) # ##### 发送请求(协程池控制最大协程数量) ##### # from gevent.pool import Pool # pool = Pool(None) # gevent.joinall([ # pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}), # pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}), # pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}), # ])
import grequests request_list = [ grequests.get('http://httpbin.org/delay/1', timeout=0.001), grequests.get('http://fakedomain/'), grequests.get('http://httpbin.org/status/500') ] # ##### 执行并获取响应列表 ##### # response_list = grequests.map(request_list) # print(response_list) # ##### 执行并获取响应列表(处理异常) ##### # def exception_handler(request, exception): # print(request,exception) # print("Request failed") # response_list = grequests.map(request_list, exception_handler=exception_handler) # print(response_list)
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor def one_done(arg): print('----------------------------------------------- %s' % arg) def all_done(arg): print('done===========================================') reactor.stop() @defer.inlineCallbacks # 发送Http请求,立即返回 def task(url): res = getPage(bytes(url, encoding='utf8')) # 发送Http请求 res.addCallback(one_done) yield res url_list = [ 'http://www.cnblogs.com', 'http://www.cnblogs.com', 'http://www.cnblogs.com', 'http://www.cnblogs.com', ] defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)] for url in url_list: v = task(url) defer_list.append(v) d = defer.DeferredList(defer_list) d.addBoth(all_done) # d特殊对象里有特殊url发送列表 reactor.run() # 死循环 DeferredList查询,检测完成对象执行one_done,有计数器,所有执行完,执行all_done
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop COUNT = 0 def handle_response(response): global COUNT COUNT -= 1 if response.error: print("Error:", response.error) else: print(response.body) # 方法同twisted # ioloop.IOLoop.current().stop() if COUNT == 0: ioloop.IOLoop.current().stop() def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] global COUNT COUNT = len(url_list) for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() # 死循环
角色:NB开发者 1. socket客户端、服务端 连接阻塞 setblocking(0): 无数据(连接无响应;数据未返回)就报错 传0或者false所有socket都不会阻塞(包括连接和接收) 2. IO多路复用 就是while循环监听多个socket对象 客户端: try: socket对象1.connet() socket对象2.connet() socket对象3.connet() except Ex.. pass while True: r(接收端),w(发送端),e(异常) = select.select([socket对象1,socket对象2,socket对象3,],[socket对象1,socket对象2,socket对象3,],[],0.05) r = [socket对象1,] # 表示有人给我发送数据 xx = socket对象1.recv() w = [socket对象1,] # 表示我已经和别人创建连接成功: socket对象1.send('"""GET /index HTTP/1.0\r\nHost: baidu.com\r\n\r\n"""') 3. class Foo: def fileno(self): obj = socket() return obj.fileno() r,w,e = select.select([socket对象?,对象?,对象?,Foo()],[],[]) # 对象必须有: fileno方法,并返回一个文件描述符 ======== a. select内部:对象.fileno() b. Foo()内部封装socket文件描述符 IO多路复用: 就是while循环监听多个socket对象 异步IO: 非阻塞的socket+IO多路复用
class HttpRequest: def __init__(self, sk, host, callback): self.socket = sk self.host = host self.callback = callback def fileno(self): return self.socket.fileno() class HttpResponse: def __init__(self, recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b'\r\n\r\n', 1) self.body = body header_list = headers.split(b'\r\n') for h in header_list: h_str = str(h, encoding='utf-8') v = h_str.split(':', 1) if len(v) == 2: self.header_dict[v[0]] = v[1] class AsyncRequest: def __init__(self): self.conn = [] self.connection = [] # 用于检测是否已经连接成功 def add_request(self, host, callback): try: sk = socket.socket() sk.setblocking(0) sk.connect((host, 80,)) except BlockingIOError as e: pass request = HttpRequest(sk, host, callback) self.conn.append(request) self.connection.append(request) def run(self): while True: rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05) for w in wlist: print(w.host, '连接成功...') # 只要能循环到,表示socket和服务器端已经连接成功 tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" % (w.host,) w.socket.send(bytes(tpl, encoding='utf-8')) self.connection.remove(w) for r in rlist: # r,是HttpRequest recv_data = bytes() while True: try: chunck = r.socket.recv(8096) recv_data += chunck except Exception as e: break response = HttpResponse(recv_data) r.callback(response) r.socket.close() self.conn.remove(r) if len(self.conn) == 0: break def f1(response): print('保存到文件', response.header_dict) def f2(response): print('保存到数据库', response.header_dict) url_list = [ {'host': 'www.baidu.com', 'callback': f1}, {'host': 'cn.bing.com', 'callback': f2}, {'host': 'www.cnblogs.com', 'callback': f2}, ] req = AsyncRequest() for item in url_list: req.add_request(item['host'], item['callback']) req.run()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。