赞
踩
并发指在一个时间段内发生若干事件的情况,各个任务时间段短,经常切换,所以感觉是“同时”进行。
并行值在同一时刻发生若干事件的情况,真正的同时
同步,各个任务不是独自运行的,任务之间有交替顺序,运行完一个任务后才运行另一个。
异步,各个任务独立运行
多线程爬虫是以并发方式执行。即通过进程的快速切换来加快爬虫速度。
通过访问1000个网站,来统计时间
由于运行时间较长,不展示结果
txt文本数据在这里
import requests import time link_list = [] with open('../alexa.txt', 'r') as file: file_list = file.readlines() for each in file_list: link = each.split('\t')[1] link = link.replace('\n', '') link_list.append(link) start = time.time() for each in link_list: try: r = requests.get(each) print(r.status_code, each) except Exception as e: print(e) end = time.time() print("串行总时间为", end - start)
调用_thread
模块中的start_new_thread
创建新线程
import _thread import time # 为线程定义一个函数 def print_time(threadName, delay): count = 0 while count < 3: time.sleep(delay) count += 1 print(threadName, time.ctime()) if __name__ == '__main__': try: _thread.start_new_thread(print_time, ("Thread-1", 1)) _thread.start_new_thread(print_time, ("Thread-2", 2)) print("Main Finished") except: print("Error: unable to start thread") while 1: pass
结果:
可以看到,主线程先完成。然后两个新的线程还会继续运行。
_thread.start_new_thread(function, args[, kwargs])
启动一个线程,并返回其标识符。线程会用 args 作为参数(必须是元组)执行 function 函数。可选的 kwargs 参数使用字典来指定有名参数。当函数返回时,线程会静默退出,当函数由于未处理的异常而中止时,会打印一条堆栈追踪信息,然后该线程会退出(但其他线程还是会继续运行)。
threading 模块基于_thread提供了更易用的高级多线程 API。
调用Threading
库创建线程,从threading.Thread
继承
start()
开始线程活动。
它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。
run()
代表线程活动的方法。
join(timeout=None)
等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。
当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 – 如果线程仍然存活,则 join() 超时。
当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。
一个线程可以被 join() 很多次。
is_alive()
返回线程是否存活。
getName()
setName()
import threading import time class myThread(threading.Thread): def __init__(self, name, delay): threading.Thread.__init__(self) self.name = name self.delay = delay def run(self): print("Starting " + self.name) print_time(self.name, self.delay) print("Exiting " + self.name) def print_time(threadName, delay): counter = 0 while counter < 3: time.sleep(delay) print(threadName, time.ctime()) counter += 1 threads = [] # 创建新线程 thread1 = myThread("Thread-1", 1) thread2 = myThread("Thread-2", 2) # 开启新线程 thread1.start() thread2.start() # 添加线程到线程列表 threads.append(thread1) threads.append(thread2) # 等待所有线程完成 for t in threads: t.join() print("Exiting Main Thread")
import requests import time import threading link_list = [] with open('../alexa.txt', 'r') as file: file_list = file.readlines() for eachone in file_list: link = eachone.split('\t')[1] link = link.replace('\n', '') link_list.append(link) start = time.time() class myThread(threading.Thread): def __init__(self, name, link_range): threading.Thread.__init__(self) self.name = name self.link_range = link_range def run(self): print("Starting " + self.name) crawler(self.name, self.link_range) print("Exiting " + self.name) def crawler(threadName, link_range): for i in range(link_range[0], link_range[1] + 1): try: r = requests.get(link_list[i]) print(threadName, r.status_code, link_list[i]) except Exception as e: print(e) thread_list = [] link_range_list = [(0, 200), (201, 400), (401, 600), (601, 800), (801, 1000)] # 创建新线程 for i in range(1, 6): thread = myThread("Thread-" + str(i), link_range_list[i - 1]) thread.start() thread_list.append(thread) # 等待所有线程完成 for thread in thread_list: thread.join() end = time.time() print('简单多线程爬虫的总时间为:', end - start) print("Exiting Main Thread")
使用thread_list = [] link_range_list = [(0, 200), (201, 400), (401, 600), (601, 800), (801, 1000)]
将1000个网页分为5份,每一份200个网页
将这5份分给5个线程处理
问题:如果某个线程先完成200个网页的爬取,就会退出线程。此时5个线程就变成了4个线程,速度就有所下降,到最后只剩下一个线程时,就会变成单线程。
import threading import requests import time import queue as Queue link_list = [] with open('../alexa.txt', 'r') as file: file_list = file.readlines() for eachone in file_list: link = eachone.split('\t')[1] link = link.replace('\n', '') link_list.append(link) start = time.time() class myThread(threading.Thread): def __init__(self, name, q): threading.Thread.__init__(self) self.name = name self.q = q def run(self): print("Starting " + self.name) while True: try: crawler(self.name, self.q) except: break print("Exiting " + self.name) def crawler(threadName, q): url = q.get(timeout=2) try: r = requests.get(url, timeout=20) print(q.qsize(), threadName, r.status_code, url) except Exception as e: print(threadName, 'Error: ', e) threadList = ["Thread-1", "Thread-2", "Thread-3", "Thread-4", "Thread-5"] workQueue = Queue.Queue(1000) threads = [] # 创建新线程 for tName in threadList: thread = myThread(tName, workQueue) thread.start() threads.append(thread) # 填充队列 for url in link_list: workQueue.put(url) # 等待所有线程完成 for t in threads: t.join() end = time.time() print('简单多线程爬虫的总时间为:', end - start) print("Exiting Main Thread")
像线程一样管理进程
from multiprocessing import Process, Queue, cpu_count import time import requests print(cpu_count()) link_list = [] with open('../alexa.txt', 'r') as file: file_list = file.readlines() for eachone in file_list: link = eachone.split('\t')[1] link = link.replace('\n', '') link_list.append(link) start = time.time() class MyProcess(Process): def __init__(self, q): Process.__init__(self) self.q = q def run(self): print("Starting ", self.pid) while not self.q.empty(): crawler(self.q) print("Exiting ", self.pid) def crawler(q): url = q.get(timeout=2) try: r = requests.get(url, timeout=20) print(q.qsize(), r.status_code, url) except Exception as e: print(q.qsize(), url, 'Error: ', e) if __name__ == '__main__': ProcessNames = ["Process-1", "Process-2", "Process-3"] workQueue = Queue(1000) # 填充队列 for url in link_list: workQueue.put(url) for i in range(0, 3): p = MyProcess(workQueue) p.daemon = True p.start() p.join() end = time.time() print('Process + Queue多进程爬虫的总时间为:', end - start) print('Main process Ended!')
p.daemon = True
表示当父进程结束后,子进程也会被终止
前面使用Process动态生成进程,如果进程太多不方便,使用Pool——进程池。
from multiprocessing import Pool, Manager import time import requests link_list = [] with open('../alexa.txt', 'r') as file: file_list = file.readlines() for eachone in file_list: link = eachone.split('\t')[1] link = link.replace('\n', '') link_list.append(link) start = time.time() def crawler(q, index): Process_id = 'Process-' + str(index) while not q.empty(): url = q.get(timeout=2) try: r = requests.get(url, timeout=20) print(Process_id, q.qsize(), r.status_code, url) except Exception as e: print(Process_id, q.qsize(), url, 'Error: ', e) if __name__ == '__main__': manager = Manager() workQueue = manager.Queue(1000) # 填充队列 for url in link_list: workQueue.put(url) pool = Pool(processes=3) for i in range(4): pool.apply_async(crawler, args=(workQueue, i)) print("Started processes") pool.close() pool.join() end = time.time() print('Pool + Queue多进程爬虫的总时间为:', end - start) print('Main process Ended!')
pool.apply_async
为非阻塞方法,不需要等进程完就可以添加新进程
pool.apply
为阻塞方法,等某个进程完了之后才会添加另一个进程
import gevent from gevent.queue import Queue, Empty import time import requests from gevent import monkey # 把下面有可能有IO操作的单独做上标记 monkey.patch_all() # 将IO转为异步执行的函数 link_list = [] with open('../alexa.txt', 'r') as file: file_list = file.readlines() for eachone in file_list: link = eachone.split('\t')[1] link = link.replace('\n', '') link_list.append(link) start = time.time() def crawler(index): Process_id = 'Process-' + str(index) while not workQueue.empty(): url = workQueue.get(timeout=2) try: r = requests.get(url, timeout=20) print(Process_id, workQueue.qsize(), r.status_code, url) except Exception as e: print(Process_id, workQueue.qsize(), url, 'Error: ', e) def boss(): for url in link_list: workQueue.put_nowait(url) if __name__ == '__main__': workQueue = Queue(1000) gevent.spawn(boss).join() jobs = [] for i in range(10): jobs.append(gevent.spawn(crawler, i)) gevent.joinall(jobs) end = time.time() print('gevent + Queue多协程爬虫的总时间为:', end - start) print('Main Ended!')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。