赞
踩
仅学习参考,切勿使用其他用途
import re import schedule import time from urllib.request import urlopen class Spider: def __init__(self): # 初始化代码... pass # self.start_schedule() # 需要爬的网址 url = 'https://www.**.com/game/' # 可以匹配文档中任何一个位置 # 贪婪匹配,因为没有? # \s 空白符 # \S 非空白符 # [\s\S]任意字符 # [\s\S]* 0个到任意多个字符 # [\s\S]*? 0个字符,匹配任何字符前的位置 # ([\s\S]*?) 加括号就可以排除 <div></div> 标签, 只获取里面的信息 # ------------------------------ # root_pattern = r'<div class="w-video-module-videolist w-video-module-videolist-withtags">(.*?)</div>' root_pattern = '<div class="w-video-module-videolist w-video-module-videolist-withtags">([\s\S]*?)</div>' # 正则:获取主播名称 name_pattern = '<span class="intro">([\s\S]*?)</span>' # 正则:获取标题 title_pattern = '<span class="title">([\s\S]*?)</span>' # 正则:获取视频浏览量 watched_pattern = '<i>([\s\S]*?)</i>' # 定义一个私有方法, 读取URL里面内容 def __fetch_content(self): try: # 实例里面读取类变量 response = urlopen(Spider.url) # 读取 url 内容 htmls = response.read() # 设置字符串编码 UTF-8 htmls = str(htmls, encoding='utf-8') # print(htmls) return htmls except Exception as e: print("Error decoding the response:", e) # 定义一个私有方法 # 1. 分析html文本, 通过正则表达式获取 <div class="w-video-module-videolist w-video-module-videolist-withtags"> 标签里的内容 # 2. 去除多余的 '/n' 字符 # 3. for 循环解析 # 3.1. 获取到的内容, 使用正则表达式获取 <span class="intro"> 标签里的内容 # 3.2. 获取到的内容, 使用正则表达式获取 <span class="title"> 标签里的内容 # 3.3. 获取到的内容, 使用正则表达式获取 <i> 标签里的内容 # 4. 通过for循环解析得到的数据, 定义键值对 # 4.1 存放到字典里面 (类似Java的集合) def __analysis(self, htmls): # 定义字典 anchors = [] # 使用正则表达式转换成需要获取的内容 root_html = re.findall(Spider.root_pattern, htmls) # 使用正则表达式去除每个元素中带有 \n 的 root_html = [re.sub('\n', '', item) for item in root_html] for html in root_html: name = re.findall(Spider.name_pattern, html) title = re.findall(Spider.title_pattern, html) watched = re.findall(Spider.watched_pattern, html) # 定义字典的键值对 anchor = { 'name': name, 'title': title, 'watched': watched } # 添加到字典里面 anchors.append(anchor) # print(anchors) return anchors # 定义一个私有方法: 用于组装List数据 def __refine(self, anchors): # 这个函数的作用是对传入的 anchors 列表进行处理,将每个字典元素中的 'name'、'title' 和 'watched' 键对应的值组合成一个新的字典, # 并将这些新的字典对象存储在 refined_data 列表中 # 这是通过使用列表推导式和 zip() 函数实现的,zip() 函数将三个列表中对应位置的元素打包成一个元组,然后通过列表推导式将每个元组中的元素取出来,组合成一个新的字典对象 # 最后,函数返回处理后的 refined_data 列表 # # refined_data = [{ # 'name': name, # 'title': title, # 'watched': watched # } for name, title, watched in zip(anchors[0]['name'], anchors[0]['title'], anchors[0]['watched'])] # # print(refined_data) # return refined_data # 使用了 lambda 函数来创建一个匿名函数,该函数接受一个元组 x 作为参数,并返回一个包含 'name'、'title' 和 'watched' 键的字典 # 然后,我们使用 map 函数将这个 lambda 函数应用于 zip(anchors[0]['name'], anchors[0]['title'], anchors[0]['watched']) 返回的元组序列中的每个元组, # 最终得到处理后的字典对象列表 refined_data = list(map(lambda x: {'name': x[0], 'title': x[1], 'watched': x[2]}, zip(anchors[0]['name'], anchors[0]['title'], anchors[0]['watched']))) # print(refined_data) return refined_data # 定义一个私有方法: # 排序规则: 包含“万”表示的字符串转换为数字, 并且转换成整型(int) def __sort_seed(self, anchor): # 从anchor字典中获取"watched"键对应的值,然后通过"正则表达式"找到其中的数字部分并转换为浮点数 r = re.findall('[1-9]\d*.?', anchor["watched"]) watched = float(r[0]) if '万' in anchor["watched"]: # 如果值中包含"万"这个字符,就将数字乘以10000 watched = watched * 10000 return watched # 定义一个私有方法: 排序函数 def __soft(self, anchors): # 根据观看数量倒序排序 anchors = sorted(anchors, key=self.__sort_seed, reverse=True) return anchors # 定义一个私有方法: 展示数据,将已经排序好的数据打印出来 def __show(self, anchors): # 不带序号 # for a in anchors: # print(a['name'] + '---' + a['title'] + '---' + str(a['watched'])) # 带序号 print("---------------------[王者荣耀]---------------------") print("----------------" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "----------------") print("---------------------------------------------------") for a in range(0, len(anchors)): print("Seq.", a + 1, ": ", "Name: ", anchors[a]['name'], ", Title: ", anchors[a]['title'], ", Watched: ", anchors[a]['watched'] ) # 定义一个公有方法: 入口方法 def go(self): # 获取HTML内容 htmls = self.__fetch_content() # 分析HTML内容 anchors = self.__analysis(htmls) # 组装List数据 anchors = self.__refine(anchors) # 排序 anchors = self.__soft(anchors) # 展现数据 self.__show(anchors) # 设置定时任务 def start_schedule(self): schedule.every(30).seconds.do(lambda: self.go()) # 循环执行定时任务 while True: schedule.run_pending() time.sleep(1) # 创建类的实例并开始定时任务 spider = Spider() # 调用入口方法 spider.go()
仅学习参考,切勿使用其他用途
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。