赞
踩
总结了一下multiprocessing
库的使用。之前发现西刺代理挂掉之后很多以前写好的多进程的爬虫模块都用不了了,最近yy跟我说发现原来西刺是换了张皮改名叫西拉代理,界面改善了不少,还加了些教程充实内容,实乃曲线救国。
之前写这块多进程爬虫时其实写得不是很好,思路是给每个进程分配大致同等数量的一批IP,然后每个进程各自独立地去用被分配地IP去跑,如果一些进程被分配的IP质量很差,很快这个进程就会发现没有一个IP能用,然后就快速进入僵尸状态,到最后可能只有一两个进程还活着,效率其实很差。而且出于保护像西拉代理这样为民造福的网点,也不太好意思频繁去访问它(其实这种网点的访问频率是非常高的,每天肯定都有一大批爬虫在抓它的IP地址,很容易挤不进去的)。
所以改良版的多进程选择使用一个队列(本文第3节 Queue模块),每个进程从队列中取IP,当队列的长度低于某个阈值时将调用XiLaProxy模块(XiLaProxy模块代码在第7节 一个完整的多进程爬虫项目示例,这是一个获取西拉代理上IP的类)调取一批IP填充到队列中。
测试下来效率会非常高,理论上所有进程都可以处于一个非常健康的状态,第7节依然以刷访问量为一个示例(不过最近发现CSDN似乎已经不能通过刷访问量提高积分值了,即便通过代理IP也无法使积分提高,是不是规则里10000阅读以下,100阅读=1积分已经失效了?)。
这里提一个多进程中常常会出现的问题:
if __name__ == '__main__'
里);否则是会报错的,至少如果是使用进程池是一定会出错的;from multiprocessing import Process class A: def __init__(self): pass def test(self): def f1(x): print(x) def f2(x): print(x) p1 = Process(target=f1, args=(1, )) p2 = Process(target=f2, args=(2, )) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': a = A() a.test()
test
时一定会报错(OSError
有时候可能会报拒绝访问的错误, 但是前一个AttributeError
是不会变的;):...
AttributeError: Can't pickle local object 'A.test.<locals>.f1'
...
OSError: [WinError 87] 参数错误。
from multiprocessing import Process
def f1(x):
print(x)
def f2(x):
print(x)
p1 = Process(target=f1, args=(1, ))
p2 = Process(target=f2, args=(2, ))
p1.start()
p2.start()
p1.join()
p2.join()
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
from multiprocessing import Process
def f1(x):
print(x)
def f2(x):
print(x)
if __name__ == '__main__':
p1 = Process(target=f1, args=(1, ))
p2 = Process(target=f2, args=(2, ))
p1.start()
p2.start()
p1.join()
p2.join()
from multiprocessing import Process class A: def __init__(self): pass def f1(self, x): print(x) def f2(self, x): print(x) def test(self): p1 = Process(target=self.f1, args=(1, )) p2 = Process(target=self.f2, args=(2, )) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': a = A() a.test()
f1
和f2
移到类外, 但是这样的话它们就不能调用类属性和方法了, 所以这可能一个很难两全的事情, 其实看起来类A
中多了两个莫名其妙的f1
和f2
函数看起来是挺胃疼的;在本文第7节的实例中你会看到相同的处理问题, 以8核CPU开16个进程为例, 分配一个进程给获取西拉代理IP的爬虫, 其余15个进程全部用来作代理多进程爬虫, 这里会涉及到两个目标函数, 笔者选择将它们写在了CSDN
类中, 虽然看起来不是很整洁…
Process模块就是用于定义进程的;
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
:process = Process(target=f, args=(1, ))
;process.run()
: 直接运行该进程, 即执行f(1)
;process.start()
: 进程准备就绪, 等待调度;process.terminate()
: 终止进程(发出SIGTERM信号);process.kill()
: 杀死进程(发出SIGKILL信号);process.close()
: 关闭进程;
process.join(timeout=None)
:
timeout
是None
, 则该方法将阻塞, 直到join()
调用其方法的进程终止;timeout
是一个正数, 它最多会阻塞timeout
秒;process.exitcode
以确定它是否终止;process.name
: 进程名称;process.is_alive()
: 进程是否存活;
start()
方法返回到子进程终止的那一刻, 进程对象将处于活动状态;process.daemon
: 进程的守护进程标志;
False
;start()
调用之前设置;True
, 当进程退出时, 它会尝试终止其所有守护进程子进程;process.pid
: 进程ID;process.exitcode
: 进程的退出代码;
None
;-N
, 表示进程被信号N
终止;process.daemon
的对比说明示例:import os import time from multiprocessing import Process def child(): print('module name: ', __name__) print('parent process: ', os.getppid()) print('process id: ', os.getpid()) def parent(name): child() time.sleep(3) print('hello', name) if __name__ == '__main__': process = Process(target=parent, args=('caoyang', )) #process.daemon = True print('process.daemon: ', process.daemon) process.start() process.join(1) print('name: ', process.name) print('is_alive: ', process.is_alive()) print('exitcode: ', process.exitcode)
process.daemon: False
module name: __mp_main__
parent process: 8896
process id: 12480
name: Process-1
is_alive: True
exitcode: None
hello caoyang
process.daemon = True
取消注释, 则输出结果变为:process.daemon: True
module name: __mp_main__
parent process: 11720
process id: 7836
hello caoyang
name: Process-1
is_alive: False
exitcode: 0
process.exitcode
从None
变为0
), 由于子进程被终止, 所以print('hello', name)
将无法得到执行输出;Pool模块是用于管理进程的仓库;
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes
: 即总进程数, 理解为进程池的最大容量, 默认值为os.cpu_count()
;initializer
: 初始化器, 不用改默认None
即可;maxtasksperchild
: 工作进程退出之前可以完成的任务数, 默认值None
即进程池会无休止的工作, 来多少收多少;context
: 用在制定工作进程启动时的上下文; 不需要设置;pool = Pool(processes=16)
;pool.apply(f, args=(1, ))
: 这是会阻塞的, 即单纯的一个接一个的执行加入到进程池中的任务;pool.apply_async(f, args=(1, ))
: 这是非阻塞的, 即是并行执行任务, 效果上是按批运行的;pool.map(f, range(500))
: 即执行f(0)
到f(499)
; 会阻塞, 即是一一运行的;pool.map_async(f, range(500))
: 即执行f(0)
到f(499)
; 非阻塞, 即是并行的;pool.close()
或pool.terminate()
;
close()
: 防止任何更多的任务被提交到池中, 一旦完成所有任务, 工作进程将退出;terminate()
: 立即停止工作进程而不完成未完成的工作; 当池对象被垃圾收集时, terminate()
将立即调用;pool.join()
;
pool.close()
或pool.terminate()
;import os
import time
from multiprocessing import Process, Pool
def f(n):
print(n)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=16)
for i in range(500):
pool.apply_async(f, args=(i, ))
pool.close()
pool.join()
import os
import time
from multiprocessing import Process, Pool
def f(n):
print(n)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=16)
pool.map_async(f, range(500))
pool.close()
pool.join()
Queue模块是用于存储共享数据的队列;
class multiprocessing.Queue(maxsize)
:maxsize
即队列的最大长度; 使用queue = Queue(100)
可以生成多进程队列;queue.put(obj, block=True, timeout=None)
: 向队列里添加数据obj
;
block
默认值True
即在放入队列时是默认阻塞的, 即需要一个个地放入;timeout
定义了阻塞最大时间, 默认值None
即无穷大, 一定要等待前一个数据放入队列才能进行下一次放入;block=False
)需要预防防止队满报错;queue.get(block=True, timeout=None)
: 读取队列中的数据, 同样默认阻塞, 参数含义与put
类似;queue.qsize()
: 获取当前队列长度;put
与get
的示例:import os import time import random from multiprocessing import Process, Queue def write(q,urls): print('Process({}) is writing...'.format(os.getpid())) for url in urls: q.put(url) print('Put {} to queue...'.format(url), q.qsize()) def read(q): print('Process({}) is reading...'.format(os.getpid())) while True: url = q.get(True) print('Get {} from queue.'.format(url), q.qsize()) if __name__=='__main__': q = Queue() writer1 = Process(target=write, args=(q, ['url1', 'url2', 'url3'])) writer2 = Process(target=write, args=(q, ['url4', 'url5', 'url6'])) reader = Process(target=read, args=(q,)) writer1.start() writer2.start() reader.start() writer1.join() writer1.join() reader.terminate()
read()
函数中编写地是一个死循环, 需要使用terminate()
将进程reader
强行终止;Process(9776) is writing...
Put url1 to queue...
Process(13196) is writing...
Put url4 to queue...
Process(13892) is reading...
Get url1 from queue.
Get url4 from queue.
Put url5 to queue...
Get url5 from queue.
Put url6 to queue...
Get url6 from queue.
Put url2 to queue...
Get url2 from queue.
Put url3 to queue...
Get url3 from queue.
Pipe模块是用于两个进程间通信的, 两个进程分别位于管道两端;
from multiprocessing import Process, Pipe
def send(pipe):
pipe.send([1, 2, 3, 4, 5])
pipe.close()
if __name__ == '__main__':
con1, con2 = Pipe()
sender = Process(target=send, args=(con1, ))
sender.start()
print("con2 got: {}".format(con2.recv()))
con2.close()
from multiprocessing import Process, Pipe
def talk(pipe):
pipe.send({'name': 'Bob', 'spam': 42})
reply = pipe.recv()
print('talker got: {}'.format(reply))
if __name__ == '__main__':
parentEnd, childEnd = Pipe()
child = Process(target=talk, args=(childEnd, ))
child.start()
print('parent got: {}'.format(parentEnd.recv()))
parentEnd.send({x * 2 for x in 'spam'})
child.join()
print('parent exit.')
parent got: {'name': 'Bob', 'spam': 42}
talker got: {'aa', 'pp', 'mm', 'ss'}
parent exit.
Lock模块本身就是自定义阻塞; 想在哪里加阻塞就在哪里添加阻塞;
v
;import multiprocessing import time def job(v, num): for i in range(5): time.sleep(0.1) v.value += num print(v.value, end=",") def multicore(): v = multiprocessing.Value('i', 0) p1 = multiprocessing.Process(target=job, args=(v, 1)) p2 = multiprocessing.Process(target=job, args=(v, 3)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
3, 7, 11, 15, 19, 4, 8, 12, 16, 20,
;import multiprocessing import time # lock = multiprocessing.Lock() lock = multiprocessing.RLock() def job(v, num,lock): lock.acquire() for _ in range(5): time.sleep(0.1) v.value += num print(v.value, end=", ") lock.release() def multicore(): v = multiprocessing.Value('i', 0) p1 = multiprocessing.Process(target=job, args=(v, 1, lock)) p2 = multiprocessing.Process(target=job, args=(v, 3, lock)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
1, 2, 3, 4, 5, 8, 11, 14, 17, 20,
Lock
与RLock
的区别:release
后才能再次acquire
; 如下面的代码时不可行的, 会产生死锁:lock.acquire()
lock.acquire() # deadlock
lock.release()
lock.release()
acquire
, 然后再依次release
, 只要两者数量相等即可; 即上述形式的代码时可行的;Manager模块主要是封装了更多类型的进程共享数据, 最常用的就是
list
和dict
, 其实差不多也就这两个能用;
注意实例化Manager类的对象时一定要在main()函数中进行, 否则会报RuntimeError
;
dict
为例给个简单的代码示例, list
完全就照搬就行了:import time from multiprocessing import Process, Manager def f(mdict, key, value): mdict[key] = value if __name__ == '__main__': manager = Manager() mdict = manager.dict() processes = [Process(target=f, args=(mdict, i, i**2)) for i in range(10)] for processe in processes: processe.start() for processe in processes: processe.join() print ('Results: ') for key, value in dict(mdict).items(): print("{}: {}".format(key, value))
Results:
1: 1
0: 0
2: 4
3: 9
4: 16
5: 25
6: 36
7: 49
9: 81
8: 64
proxy.py
# -*- coding: UTF-8 -*- # @author: caoyang # @email: caoyang@163.sufe.edu.cn import math import pandas import requests from bs4 import BeautifulSoup class XiLaProxy(object): """ http://www.xiladaili.com/ """ def __init__(self) -> None: self.index_url = 'http://www.xiladaili.com' self.total_proxy_per_page = 50 # There are exactly 50 proxies on one page. Please modifiy the number if the page source changes in the future. self.proxy_list_urls = { 'gaoni': self.index_url + '/gaoni', 'https': self.index_url + '/https', 'http': self.index_url + '/http', } self.headers = { # Request headers. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0', } def get_proxy_table(self, proxy_type: str='https', total: str=50, export_path: str=None) -> list: """ Request for the proxy table on xiladaili website. You can call ```table_df.values[:, 0].tolist()```to get the proxy list. :param proxy_type: range from values {'gaoni', 'https', 'http'}. :param total: total number of ip needed. :param export_path: export IP table to CSV file. :return table_df: proxy table as the type of ```pandas.DataFrame```. """ assert proxy_type in self.proxy_list_urls total_page = math.ceil(total / self.total_proxy_per_page) proxy_list_url = self.proxy_list_urls.get(proxy_type) def _get_proxy_table(url, table_dict): while True: try: response = requests.get(url, headers=self.headers, timeout=30) break except Exception as e: print('Fail to connect {} ...'.format(url)) print(e) continue html = response.text soup = BeautifulSoup(html, 'lxml') table = soup.find('table', class_='fl-table') if table_dict is None: table_dict = {} for th in table.find('thead').find_all('th'): table_dict[str(th.string)] = [] for tr in table.find('tbody').find_all('tr'): for td, column in zip(tr.find_all('td'), table_dict): table_dict[column].append(str(td.string)) return table_dict table_dict = None for page in range(1, total_page+1): print('Fetch proxies on page {}'.format(page)) if page==1: table_dict = _get_proxy_table(proxy_list_url, table_dict) else: table_dict = _get_proxy_table(proxy_list_url + '/{}'.format(page), table_dict) print(' - There are total {} proxies.'.format(len(table_dict[list(table_dict.keys())[0]]))) table_df = pandas.DataFrame(table_dict, columns=list(table_dict.keys())) if export_path is not None: table_df.to_csv(export_path, header=True, index=False, sep='\t') return table_df if __name__ == '__main__': xila = XiLaProxy() #gaoni_df = xila.get_proxy_table(proxy_type='gaoni', export_path='gaoni.csv') https_df = xila.get_proxy_table(proxy_type='https', export_path='https.csv') #http_df = xila.get_proxy_table(proxy_type='http', export_path='http.csv') print(gaoni_df.values[:, 0].tolist())
csdn.py
:# -*- coding: UTF-8 -*- # @author: caoyang # @email: caoyang@163.sufe.edu.cn import os import sys sys.path.append(os.path.dirname(os.getcwd())) import re import time import requests from functools import wraps from bs4 import BeautifulSoup from multiprocessing import Process, Pool, Queue, Lock from proxy import XiLaProxy class CSDN(object): def __init__(self) -> None: self.headers = { # Request headers. 'Host': 'blog.csdn.net', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate, br', 'Referer': 'https://blog.csdn.net/baidu_39633459', 'Connection': 'keep-alive', # 'Cookie': 'Your Cookies', # 20201217备注: 最近发现不带cookie也可行,想带可以自己加上 'Upgrade-Insecure-Requests': '1', 'Cache-Control': 'max-age=0', 'TE': 'Trailers', } self.article_ids = [ # 你可以自行添加一些文章ID进来,这里只放了一篇文章 '110294311', ] assert self.article_ids # It should not be an empty list. self.article_url = 'https://blog.csdn.net/baidu_39633459/article/details/{}'.format # url可以改成你自己的,不过这个也不是我的... 我找了位朋友的测试了一下 def read_article(self, proxy: str=None, start_article_id: int=0, end_article_id: int=-1, with_cookie: bool=True, early_stop: bool=True, early_stop_point: int=95, max_read_count: int=10000, regular_interval: int=75, reset_interval: int=300) -> None: """ A simple crawler used to increase pageview of several articles. :param proxy: IP address of proxy server, default None means localhost. :param start_article_id: Start index in ```self.article_ids```. :param end_article_id: End index in ```self.article_ids```. :param with_cookie: Whether to use cookie in crawler. :param early_stop: Whether to stop before whole hundred(count for bonus). :param early_stop_point: Early stop at ```pageview%100 == early_stop_point```, default 95. :param max_read_count: Required upper bound of pageview, default 10000. :param regular_interval: Sleep interval of seconds between two successive requests, default 75. :param reset_interval: Sleep interval of seconds when occuring request exception, default 300. """ headers = self.headers.copy() if not with_cookie: headers.pop('Cookie') if early_stop: early_stop_point = early_stop_point % 100 article_ids = self.article_ids[start_article_id:] if end_article_id == -1 else self.article_ids[start_article_id:end_article_id+1] compiler = re.compile('\d+') # An regular expression compiler used to parse digital data. previous_read_counts = [None] * len(article_ids) # A list used to save current page view of each article. count = 0 while True: if not article_ids: # If it is an empty list, then break the loop and return. break try: start_time = time.time() count += 1 print('Pid: {} - Count: {} - '.format(os.getpid(), str(count).zfill(3)), end='') for i, article_id in enumerate(article_ids): if proxy is None: response = requests.get(self.article_url(article_id), headers=self.headers, timeout=30) else: try: response = requests.get(self.article_url(article_id), headers=self.headers, proxies={'https': 'https://{}'.format(proxy)}, timeout=30) except: print('Proxy {} cannot be used ...'.format(proxy)) return False html = response.text soup = BeautifulSoup(html, 'lxml') span = soup.find('span', class_='read-count') span_string = str(span.string) print(span_string, end='') read_count = int(compiler.findall(span_string)[0]) if previous_read_counts[i] is None: print('(..)', end='\t') previous_read_counts[i] = read_count else: read_increment = read_count - previous_read_counts[i] if read_increment == 0: print('(!!)', end='\t') elif read_increment == 1: print('(..)', end='\t') elif read_increment > 1: print('(+{})'.format(read_increment), end='\t') else: print('(??)', end='\t') previous_read_counts[i] = read_count div = soup.find('div', id='asideProfile') dls = div.find_all('dl', class_='text-center') for dl in dls: # Parse blog profile data such as 'like', 'favorite', 'followers'. try: print(int(dl.attrs['title']), end=',') except: continue if proxy is not None: print(proxy, end=',') print(time.strftime('%Y-%m-%d %H:%M:%S')) if early_stop: # Do early stopping. index = -1 for article_id, previous_read_count in zip(article_ids[:], previous_read_counts[:]): index += 1 if previous_read_count % 100 >= early_stop_point or previous_read_count>=10000: previous_read_counts.pop(index) article_ids.pop(index) index -= 1 end_time = time.time() consumed_time = end_time - start_time if consumed_time < regular_interval: time.sleep(regular_interval - consumed_time) except Exception as exception: print(exception) time.sleep(reset_interval) def read_article_with_proxies(self, total_processes=15, start_article_id: int=0, end_article_id: int=-1, with_cookie: bool=True, early_stop: bool=True, early_stop_point: int=95, max_read_count: int=10000, regular_interval: int=75, reset_interval: int=300) -> None: """ A multiprocessing crawler used to increase pageview of several articles. Note that if :param ip_list: is None, then ```read_article```will be called instead. :param total_processes: total number of multiprocesses. :param start_article_id: Start index in ```self.article_ids```. :param end_article_id: End index in ```self.article_ids```. :param with_cookie: Whether to use cookie in crawler. :param early_stop: Whether to stop before whole hundred(count for bonus). :param early_stop_point: Early stop at ```pageview%100 == early_stop_point```, default 95. :param max_read_count: Required upper bound of pageview, default 10000. :param regular_interval: Sleep interval of seconds between two successive requests, default 75. :param reset_interval: Sleep interval of seconds when occuring request exception, default 300. """ xila = XiLaProxy() queue = Queue() proxy_table = xila.get_proxy_table(proxy_type='https', total=50, export_path=None) proxies = proxy_table.values[:, 0].tolist() print('Total {} proxies, list as below: '.format(len(proxies))) print(proxies) for proxy in proxies: queue.put(proxy) params = { 'queue': queue, 'start_article_id': start_article_id, 'end_article_id': end_article_id, 'with_cookie': with_cookie, 'early_stop': early_stop, 'early_stop_point': early_stop_point, 'max_read_count': max_read_count, 'regular_interval': regular_interval, 'reset_interval': reset_interval, } processes = [Process(target=self._read_article_with_proxies, kwargs=params) for i in range(total_processes)] processes.append(Process(target=self._put_proxies_queue, args=(queue, total_processes, 30, 50, ))) for process in processes: process.start() for process in processes: process.join() time.sleep(1000) for process in processes: process.terminate() def _put_proxies_queue(self, queue: Queue, min_queue_size: int=15, check_interval: int=30, batch_size: int=50) -> None: """ Target function for multiprocessing script ```read_article_with_proxies()```. """ xila = XiLaProxy() while True: _queue_size = queue.qsize() if _queue_size < min_queue_size: print('Queue size is {}, which is lower than than {} ...'.format(_queue_size, min_queue_size)) print(' - Fetching a batch of proxies ...') _proxy_table = xila.get_proxy_table(proxy_type='https', total=batch_size, export_path=None) _proxies = _proxy_table.values[:, 0].tolist() print(' - Successfully fetch a batch proxies of {}.'.format(len(_proxies))) for _proxy in _proxies: queue.put(_proxy) print('Queue size is refreshed to {}.'.format(queue.qsize())) time.sleep(check_interval) def _read_article_with_proxies(self, queue: Queue, start_article_id: int=0, end_article_id: int=-1, with_cookie: bool=True, early_stop: bool=True, early_stop_point: int=95, max_read_count: int=10000, regular_interval: int=75, reset_interval: int=300) -> None: """ Target function for multiprocessing script ```read_article_with_proxies()```. """ while True: _proxy = queue.get() self.read_article( proxy=_proxy, start_article_id=start_article_id, end_article_id=end_article_id, with_cookie=with_cookie, early_stop=early_stop, early_stop_point=early_stop_point, max_read_count=max_read_count, regular_interval=regular_interval, reset_interval=reset_interval, ) if __name__ == '__main__': csdn = CSDN() #csdn.read_article(early_stop_point=60, early_stop=True) #https_df = xila.get_proxy_table(proxy_type='https', export_path='https.csv') csdn.read_article_with_proxies(total_processes=15, early_stop=False)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。