当前位置:   article > 正文

python搭建ip池(多线程)

ip池

之前有讲过怎么搭建ip池,但由于单线程的效率太低,于是我们升级改造一下,将单线程变成多线程来搭建ip池,之前的方法可以参考一下:python搭建ip池 (如果会简单的request和提取文字就可以直接不看)

本文将会重点放在多线程的部分。

过程分为两部分:

一、从网站上获取所有的ip信息

1、获取待爬取的url列表

2、对多线程类进行重写

3、多线程访问前面获取的url列表,获取ip信息

4、将爬取的ip信息提取并处理,返回一个列表,方便后续的保存

5、将ip信息保存到本地csv

二、将爬取的ip进行验证

1、读取前面保存的文件

2、创建多线程来验证ip是否可用

3、保存可用的ip

 

 随便找到一个免费的ip代理网站:http://www.66ip.cn/

定义函数列表:

老样子,先附上全部代码(这个代码修改一下保存地址就可以直接使用的),后面再对每个模块进行详解。

  1. # -*- coding: gbk -*- # 防止出现乱码等格式错误
  2. # ip代理网站:http://www.66ip.cn/areaindex_19/1.html
  3. import requests
  4. from fake_useragent import UserAgent
  5. import pandas as pd
  6. from lxml import etree # xpath
  7. import threading # 多线程
  8. # --------------爬取该网站全国ip----------------
  9. # ----先获取一共多少页,然后修改url得到url列表------
  10. def get_url():
  11. url_list = []
  12. url = 'http://www.66ip.cn/index.html'
  13. data_html = requests.get(url)
  14. data_html.encoding = 'gbk'
  15. data_html = data_html.text
  16. html = etree.HTML(data_html)
  17. page = html.xpath('//*[@id="PageList"]/a[12]/text()') # 获取全球代理的页码
  18. for i in range(int(page[0])):
  19. country_url = 'http://www.66ip.cn/{}.html'.format(i+1)
  20. url_list.append(country_url)
  21. for i in range(1,35):
  22. city_url = 'http://www.66ip.cn/areaindex_{}/1.html'.format(i)
  23. url_list.append(city_url)
  24. return url_list
  25. # ---------------爬取该网站城市ip----------------
  26. def get_all_ip(url_list):
  27. headers = {
  28. 'User-Agent': UserAgent().random,
  29. }
  30. test_ip = [] # 用于存放爬取下来的ip
  31. for url in url_list:
  32. try: # 防止有时访问异常抛出错误
  33. data_html = requests.get(url=url, headers=headers)
  34. data_html.encoding = 'gbk'
  35. data_html = data_html.text
  36. html = etree.HTML(data_html)
  37. etree.tostring(html)
  38. response = html.xpath('//div[@align="center"]/table/tr/td/text()') # 获取html含有ip信息的那一行数据
  39. test_ip += dispose_list_ip(response) # 调用下面的处理函数,将不必要的数据筛掉
  40. except:
  41. continue
  42. print("本次获取ip信息的数量:",len(test_ip))
  43. return test_ip
  44. # --------------将爬取的list_ip关键信息进行提取、方便后续保存----------------
  45. def dispose_list_ip(list_ip):
  46. num = int((int(len(list_ip)) / 5) - 1) # 5个一行,计算有几行,其中第一行是标题直接去掉
  47. test_list = []
  48. for i in range(num):
  49. a = i * 5
  50. ip_index = 5 + a # 省去前面的标题,第5个就是ip,往后每加5就是相对应ip
  51. location_index = 6 + a
  52. place_index = 7 + a
  53. items = []
  54. items.append(list_ip[ip_index])
  55. items.append(list_ip[location_index])
  56. items.append((list_ip[place_index]))
  57. test_list.append(items)
  58. return test_list
  59. # -----------将列表的处理结果保存在csv-------------
  60. def save_list_ip(list,file_path):
  61. columns_name=["ip","port","place"]
  62. test=pd.DataFrame(columns=columns_name,data=list) # 去掉索引值,否则会重复
  63. test.to_csv(file_path,mode='a',encoding='utf-8')
  64. print("保存成功")
  65. # ------------读取文件,以df形式返回--------------
  66. def read_ip(file_path):
  67. file = open(file_path,encoding='utf-8')
  68. df = pd.read_csv(file,usecols=[1,2,3]) # 只读取2,3,4,列(把第一列的索引去掉)
  69. df = pd.DataFrame(df)
  70. return df
  71. # ----------------验证ip是否合格-----------------
  72. def verify_ip(ip_list):
  73. verify_ip = []
  74. for ip in ip_list:
  75. ip_port = str(ip[0]) + ":" + str(ip[1]) # 初步处理ip及端口号
  76. headers = {
  77. "User-Agent": UserAgent().random
  78. }
  79. proxies = {
  80. 'http': 'http://' + ip_port,
  81. 'https': 'https://'+ip_port
  82. }
  83. '''http://icanhazip.com访问成功就会返回当前的IP地址'''
  84. try:
  85. p = requests.get('http://icanhazip.com', headers=headers, proxies=proxies, timeout=3)
  86. item = [] # 将可用ip写入csv中方便读取
  87. item.append(ip[0])
  88. item.append(ip[1])
  89. item.append(ip[2])
  90. verify_ip.append(item)
  91. print(ip_port + "验证成功!")
  92. except Exception as e:
  93. print(ip_port,"验证失败")
  94. continue
  95. return verify_ip
  96. # ----------------多线程重写----------------------
  97. class MyThread(threading.Thread):
  98. def __init__(self,func,args):
  99. """
  100. :param func: run方法中的函数名
  101. :param args: func函数所需的参数
  102. """
  103. threading.Thread.__init__(self)
  104. self.func = func
  105. self.args = args
  106. def run(self):
  107. print('当前子线程:{}启动'.format(threading.current_thread().name))
  108. self.result = self.func(self.args)
  109. return self.func
  110. def get_result(self): # 获取返回值
  111. try:
  112. return self.result # 如果子线程不使用join方法,此处可能会报没有self.result的错误
  113. except:
  114. return None
  115. # -----将待处理任务进行平均分割为线程数,方便线程执行----
  116. def split_list(list,thread_num):
  117. list_total = []
  118. num = thread_num # 线程数量
  119. x = len(list) // num # 将参数进行分批(5批)方便传参
  120. count = 1 # 计算这是第几个列表
  121. for i in range(0, len(list), x):
  122. if count < num:
  123. list_total.append(list[i:i + x])
  124. count += 1
  125. else:
  126. list_total.append(list[i:])
  127. break
  128. return list_total
  129. # -----------多线程访问网址获取ip信息---------------
  130. def create_thread_get_ip_list(list,thread_num):
  131. list_total = split_list(list,thread_num) # 调用上面的方法,将任务平均分配给线程
  132. thread_list =[] # 线程池
  133. for url in list_total: # 添加线程
  134. t = MyThread(func=get_all_ip,args=url)
  135. thread_list.append(t)
  136. # thread1 = MyThread(func=get_all_ip,args=list_total[0])
  137. # thread2 = MyThread(func=get_all_ip,args=list_total[1])
  138. for t in thread_list: # 批量启动线程
  139. t.start()
  140. for t in thread_list: # 主线程等待子线程
  141. t.join()
  142. ip=[] # 存放爬取的ip
  143. for t in thread_list: # 将数据存入ip中
  144. ip += t.get_result()
  145. print("总共线程获取ip数量为:",len(ip))
  146. return ip
  147. # ------------创建线程验证ip----------------------
  148. def create_thread_verify_ip(list,thread_num):
  149. list_total = split_list(list, thread_num)
  150. thread_list = [] # 存放线程池
  151. ip = [] # 存放验证成功的ip
  152. for list in list_total:
  153. t = MyThread(func=verify_ip,args=list)
  154. thread_list.append(t)
  155. for t in thread_list:
  156. t.start()
  157. for t in thread_list:
  158. t.join()
  159. for t in thread_list:
  160. ip += t.get_result()
  161. return ip
  162. if __name__ == '__main__':
  163. # ----------# 获取待爬取的全部url---------
  164. url_list = get_url()
  165. print(url_list)
  166. # ----------# 创建多线程爬取--------------
  167. thread_num1 = 100 # 第一个线程数量
  168. test_ip = create_thread_get_ip_list(url_list,thread_num)
  169. # ----------# 保存数据-------------------
  170. test_path = 'test.csv'
  171. save_list_ip(test_ip,test_path)
  172. # -----这里建议先运行上面,结束后再运行下面---------
  173. # ----------# 读取、初步处理数据--------------
  174. df = read_ip(test_path)
  175. print("去重前数据有:",len(df))
  176. df = df.drop_duplicates() # 去除重复数据
  177. print("去重后数据有:",len(df))
  178. ip_list = df.values.tolist() # df转列表(方便等会多线程的时候分配任务)
  179. print(ip_list)
  180. # ----------# 创建多线程验证ip--------------
  181. thread_num2 = 100 # 第二个线程的数量
  182. ip = create_thread_verify_ip(list=ip_list,thread_num=thread_num2)
  183. print("验证失败ip数量:",len(ip_list)-len(ip))
  184. print("可用ip数量:",len(ip))
  185. # 保存
  186. save_path = "verify_ip.csv"
  187. save_list_ip(ip,save_path)
  188. '''
  189. # ----第二次验证(这里可以按自己需求写一个for循环来多验证几次,从而来提高ip池的质量)----
  190. verify_path = 'verify_ip.csv'
  191. df = read_ip(verify_path)
  192. print("去重前数据有:",len(df))
  193. df = df.drop_duplicates() # 去除重复数据
  194. print("去重后数据有:",len(df))
  195. ip_list = df.values.tolist() # df转列表(方便等会多线程的时候分配任务)
  196. print(ip_list)
  197. # ----------# 创建多线程验证ip--------------
  198. thread_num3 = 20 # 第三个线程的数量
  199. ip = create_thread_verify_ip(list=ip_list,thread_num=thread_num3)
  200. print("验证失败ip数量:",len(ip_list)-len(ip))
  201. print("可用ip数量:",len(ip))
  202. # 保存
  203. save_path = "优质ip.csv"
  204. save_list_ip(ip,save_path)
  205. '''

一、从网站上爬取所有的ip信息

1、获取待爬取的url列表

        先获取页码,然后对url进行一个简单的拼接,返回url的列表,(这部分是比较简单的可以直接略过)

  1. # ---------先获取url列表----------
  2. def get_url():
  3. url_list = []
  4. url = 'http://www.66ip.cn/index.html'
  5. data_html = requests.get(url)
  6. data_html.encoding = 'gbk'
  7. data_html = data_html.text
  8. html = etree.HTML(data_html)
  9. page = html.xpath('//*[@id="PageList"]/a[12]/text()') # 获取全球代理的页码
  10. for i in range(int(page[0])):
  11. country_url = 'http://www.66ip.cn/{}.html'.format(i+1)
  12. url_list.append(country_url)
  13. for i in range(1,35): # 因为那个网站只有35个城市
  14. city_url = 'http://www.66ip.cn/areaindex_{}/1.html'.format(i)
  15. url_list.append(city_url)
  16. return url_list

2、-----------------------------------多线程重写----------------------------------

在调用多线程去访问ip之前先对多线程类进行重写,方便后续调用多线程,并获取返回值。

  1. # ----------------多线程重写----------------------
  2. class MyThread(threading.Thread):
  3. def __init__(self,func,args):
  4. """
  5. :param func: run方法中的函数名
  6. :param args: func函数所需的参数
  7. """
  8. threading.Thread.__init__(self)
  9. self.func = func
  10. self.args = args
  11. def run(self):
  12. print('当前子线程:{}启动'.format(threading.current_thread().name))
  13. self.result = self.func(self.args)
  14. return self.func
  15. def get_result(self): # 获取返回值
  16. try:
  17. return self.result # 如果子线程不使用join方法,此处可能会报没有self.result的错误
  18. except:
  19. return None

3、---------------多线程爬取网站,获取ip信息(重点!!!)------------------------

创建多线程需要具备两个前提条件:函数、参数

因此我们先定义一个函数。携带url列表后进行访问、爬取页面的ip数据(可以在for循环里面让他停零点几秒,不然容易被识别成ddos攻击然后有几个url访问不进去):

  1. def get_all_ip(url_list):
  2. headers = {
  3. 'User-Agent': UserAgent().random,
  4. }
  5. test_ip = [] # 用于存放爬取下来的ip
  6. for url in url_list:
  7. try: # 防止有时访问异常抛出错误
  8. data_html = requests.get(url=url, headers=headers)
  9. data_html.encoding = 'gbk'
  10. data_html = data_html.text
  11. html = etree.HTML(data_html)
  12. etree.tostring(html)
  13. response = html.xpath('//div[@align="center"]/table/tr/td/text()') # 获取html含有ip信息的那一行数据
  14. test_ip += dispose_list_ip(response) # 调用下面的处理函数,将不必要的数据筛掉
  15. except:
  16. continue
  17. print("本次获取ip信息的数量:",len(test_ip))
  18. return test_ip

dispose_list_ip函数(用来处理一下返回的信息,将一些不必要的信息给筛掉):

让最后的结果变成这种格式:

  1. # --------------将爬取的list_ip关键信息进行提取、方便后续保存----------------
  2. def dispose_list_ip(list_ip):
  3. num = int((int(len(list_ip)) / 5) - 1) # 5个一行,计算有几行,其中第一行是标题直接去掉
  4. test_list = []
  5. for i in range(num):
  6. a = i * 5
  7. ip_index = 5 + a # 省去前面的标题,第5个就是ip,往后每加5就是相对应ip
  8. location_index = 6 + a
  9. place_index = 7 + a
  10. items = []
  11. items.append(list_ip[ip_index])
  12. items.append(list_ip[location_index])
  13. items.append((list_ip[place_index]))
  14. test_list.append(items)
  15. return test_list

在爬取网站之前我们要先明白,多线程需要函数、参数,我们这里参数只有一个url列表,如果每个线程都用这个url列表的话那就会重复了,因此我们要先将url列表进行一个切割。

将url列表平均切割成与线程数量相等的多个列表:

  1. # -----将待处理任务进行平均分割为线程数,方便线程执行----
  2. def split_list(list,thread_num):
  3. list_total = []
  4. num = thread_num # 线程数量
  5. x = len(list) // num # 将参数进行分批(批数 = 线程数)方便传参
  6. count = 1 # 计算这是第几个列表
  7. for i in range(0, len(list), x):
  8. if count < num:
  9. list_total.append(list[i:i + x])
  10. count += 1
  11. else:
  12. list_total.append(list[i:])
  13. break
  14. return list_total

具体的运行效果可以先参考一下这篇文章:平均切割列表,并分配给多个线程

在切割完毕后,函数有了,参数有了,那么我们就可以开始进行线程的配置了

  1. # -----------多线程访问网址获取ip信息---------------
  2. # thread_num为线程数量(调用的时候自己设置)
  3. # list 为总的url列表
  4. def create_thread_get_ip_list(list,thread_num):
  5. list_total = split_list(list,thread_num) # 调用上面的方法,将任务平均分配给线程(切割列表)
  6. thread_list =[] # 线程池
  7. for url in list_total: # 添加线程
  8. t = MyThread(func=get_all_ip,args=url)
  9. thread_list.append(t) # 等同于下面两句话
  10. # thread1 = MyThread(func=get_all_ip,args=list_total[0])
  11. # thread2 = MyThread(func=get_all_ip,args=list_total[1])
  12. for t in thread_list: # 批量启动线程
  13. t.start()
  14. for t in thread_list: # 主线程等待子线程
  15. t.join()
  16. ip=[] # 存放爬取的ip
  17. for t in thread_list: # 将数据存入ip中
  18. ip += t.get_result()
  19. print("总共线程获取ip数量为:",len(ip))
  20. # print(ip)
  21. return ip

效果:(我这里启动了100个线程,由于url比较多可能会等二十秒左右)

5、将ip信息保存到本地csv

  1. # -----------将列表的处理结果保存在csv-------------
  2. # list:列表数据
  3. #file_path:保存地址、名称 ag:file_path = 'test.csv'
  4. def save_list_ip(list,file_path):
  5. columns_name=["ip","port","place"]
  6. test=pd.DataFrame(columns=columns_name,data=list) # 去掉索引值,否则会重复
  7. test.to_csv(file_path,mode='a',encoding='utf-8')
  8. print("保存成功")

到这里第一步已经完成了,我们可以看看保存在本地的数据:

二、将爬取的ip进行验证

1、读取前面保存的文件

  1. # ------------读取文件,以df形式返回--------------
  2. def read_ip(file_path):
  3. file = open(file_path,encoding='utf-8')
  4. df = pd.read_csv(file,usecols=[1,2,3]) # 只读取2,3,4,列(把第一列的索引去掉)
  5. df = pd.DataFrame(df)
  6. return df

2、-------------------------多线程验证ip是否可用(重点!!!!)---------------------------------

前面讲到,多线程的创建必须具备:函数、参数,二者必不可少

我们先来创建验证ip的函数(传入ip列表,判断是否可用):

有个可以用来验证ip是否可用的网站:http://icanhazip.com访问成功就会返回当前的IP地址

  1. # -----------读取爬取的ip并验证是否合格-----------
  2. def verify_ip(ip_list):
  3. verify_ip = []
  4. for ip in ip_list:
  5. ip_port = str(ip[0]) + ":" + str(ip[1]) # 初步处理ip及端口号
  6. headers = {
  7. "User-Agent": UserAgent().random
  8. }
  9. proxies = {
  10. 'http': 'http://' + ip_port,
  11. 'https': 'https://'+ip_port
  12. }
  13. '''http://icanhazip.com访问成功就会返回当前的IP地址'''
  14. try:
  15. p = requests.get('http://icanhazip.com', headers=headers, proxies=proxies, timeout=3)
  16. item = [] # 将可用ip写入csv中方便读取
  17. item.append(ip[0])
  18. item.append(ip[1])
  19. item.append(ip[2])
  20. verify_ip.append(item)
  21. print(ip_port + "验证成功!")
  22. except Exception as e:
  23. print(ip_port,"验证失败")
  24. continue
  25. return verify_ip

函数有了,我们继续解决参数的问题,前面讲到:列表需要进行切割用来平均分配给各个线程,否者就会出现线程重复访问的情况,这里还是调用前面定义好的split_list()函数

def split_list(list,thread_num):

函数、列表都有了,我们就可以开始配置线程:

  1. # ------------创建线程验证ip----------------------
  2. def create_thread_verify_ip(list,thread_num):
  3. list_total = split_list(list, thread_num)
  4. thread_list = [] # 存放线程池
  5. ip = [] # 存放验证成功的ip
  6. for list in list_total:
  7. t = MyThread(func=verify_ip,args=list)
  8. thread_list.append(t)
  9. for t in thread_list:
  10. t.start()
  11. for t in thread_list:
  12. t.join()
  13. for t in thread_list:
  14. ip += t.get_result()
  15. return ip

3、保存可用的ip

上面返回得到一个可用的ip列表后,我们就可以继续调用之前的保存列表的save_list_ip(list,file_path)函数啦

三、main函数部分:

  1. if __name__ == '__main__':
  2. # ----------# 获取待爬取的全部url---------
  3. url_list = get_url()
  4. print(url_list)
  5. # ----------# 创建多线程爬取--------------
  6. thread_num1 = 100 # 第一个线程数量
  7. test_ip = create_thread_get_ip_list(url_list,thread_num1)
  8. # ----------保存数据-------------------
  9. test_path = 'test.csv'
  10. save_list_ip(test_ip,test_path)
  11. # -----这里建议先运行上面,结束后再运行下面,否则容易弄乱---------
  12. # ----------# 读取、初步处理数据--------------
  13. df = read_ip(test_path)
  14. print("去重前数据有:",len(df))
  15. df = df.drop_duplicates() # 去除重复数据
  16. print("去重后数据有:",len(df))
  17. ip_list = df.values.tolist() # df转列表(方便等会多线程的时候分配任务)
  18. print(ip_list)
  19. # ----------# 创建多线程验证ip--------------
  20. thread_num2 = 100 # 第二个线程的数量
  21. ip = create_thread_verify_ip(list=ip_list,thread_num=thread_num2)
  22. print("验证失败ip数量:",len(ip_list)-len(ip))
  23. print("可用ip数量:",len(ip))
  24. # 保存
  25. save_path = "verify_ip.csv"
  26. save_list_ip(ip,save_path)
  27. '''
  28. # ----第二次验证(这里可以按自己需求写一个for循环来多验证几次,提高ip池的质量)----
  29. verify_path = 'verify_ip.csv'
  30. df = read_ip(verify_path)
  31. print("去重前数据有:",len(df))
  32. df = df.drop_duplicates() # 去除重复数据
  33. print("去重后数据有:",len(df))
  34. ip_list = df.values.tolist() # df转列表(方便等会多线程的时候分配任务)
  35. print(ip_list)
  36. # ----------# 创建多线程验证ip--------------
  37. thread_num3 = 20 # 第三个线程的数量
  38. ip = create_thread_verify_ip(list=ip_list,thread_num=thread_num3)
  39. print("验证失败ip数量:",len(ip_list)-len(ip))
  40. print("可用ip数量:",len(ip))
  41. # 保存
  42. save_path = "优质ip.csv"
  43. save_list_ip(ip,save_path)
  44. '''

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/525628
推荐阅读
相关标签
  

闽ICP备14008679号