赞
踩
实现数据抓取;定时发送邮件
新建work.py文件
#!/usr/bin/python3 # pip3 install requests pandas lxml xlsxwriter openpyxl -i https://pypi.tuna.tsinghua.edu.cn/simple import re import requests import random,time from lxml import html import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import threading from queue import Queue import time #from func_timeout import FunctionTimedOut, func_timeout import pandas as pd gQueue = Queue() headers = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Encoding": "gzip, deflate", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", "Host": "blog.csdn.net", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0"} class MyLog: def __init__(self): import logging,os self.logger = logging.getLogger() self.logger.setLevel(logging.DEBUG) # Log等级总开关 #rq = time.strftime('%Y%m%d', time.localtime(time.time())) logdir = "/var/log/my_log" if not os.path.exists(logdir): os.mkdir(logdir) logfile = logdir + '/message' + '.log' fh = logging.FileHandler(logfile, mode='a') fh.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") fh.setFormatter(formatter) self.logger.addHandler(fh) def info(self,mes): self.logger.info(mes) return def debug(self,mes): self.logger.debug(mes) return def warning(self,mes): self.logger.warning(mes) return def error(self,mes): self.logger.error(mes) return log = MyLog() log.info('started') def get_page_count(): url = "https://xyk.cebbank.com/jfmall/search?keywords=" res = requests.get(url, headers, timeout=10) res.decoding = 'gbk' log.info(res.text) result = re.compile('<a href="javascript:void(0)" class="next" rel="next">(.*?)</a>',re.S) #result = re.compile('<a href=".*?" class="ep">(.*?)</a>',re.S) page = re.findall(result, res.text) log.info(page) def get_urls(page): global dict_score requests.DEFAULT_RETRIES = 15 # 增加重试连接次数 s = requests.session() s.keep_alive = False # 关闭多余连接 url0 = "https://xyk.cebbank.com/jfmall/search?keywords=&pageNo=" + str(page) req0 = requests.get(url0, headers, timeout=60) req0.decoding = 'gbk' #result = re.compile(r'class="t1 ">.*? <a target="_blank" title=".*?" href="(.*?)".*? <span class="t2">',re.S)#无re.S只在每一行内匹配 result1 = re.compile('<div class="main-item-list-title">.*?<a href="(.*?)" class="text-hover-black js-filter-title" rel="noopener noreferrer"',re.S) result2 = re.compile('<span class="text-color-red text-font-size-18 text-font-weight-bold">(.*?)</span>',re.S) url = re.findall(result1, req0.text) jifen = re.findall(result2, req0.text) urls = ["https://xyk.cebbank.com" + u1 for u1 in url] dict_score = {} for i in range(0,len(urls)): dict_score[urls[i]] = jifen[i] return urls def deal_size_color(data): color = '' size = '' if len(data) == 0: color,size = '无','无' if len(data) == 1: if '色' in data[0]: color = data[0] size = '无' else: size = data[0] color = '无' if len(data) == 2: if '色' in data[0]: color = data[0] size = data[1] else: size = data[0] if '色' in data[1]: color = data[1] else: color = '无' if ',' in color: color = color.replace(',',';') if ',' in size: size = size.replace(',',';') if '"' in size: size = size.replace('"','') return [color,size] def get_data(url): try: global dict_score requests.DEFAULT_RETRIES = 15 # 增加重试连接次数 s = requests.session() s.keep_alive = False # 关闭多余连接 res = requests.get(url, headers, timeout=60) res.encoding = 'utf-8' t1 = html.fromstring(res.text) name = t1.xpath('//div[@class="product-detail-content-title js-itemId"]/text()')[0].strip() duihuan = t1.xpath('//div[@class="text-color-red text-font-weight-bold"]/text()')[0].strip() score = dict_score[url] #color = t1.xpath('//span[@class="meta-title"]/text()')[0].strip() size_col = t1.xpath('//span[contains(@class,"meta-title")]/text()')#[0].strip() sc = deal_size_color(data=size_col) size = sc[1].strip() color = sc[0].strip() get_style = t1.xpath('//span[@class="exchangeWay"]/text()')[0].strip() categorys = t1.xpath('//a[@class="js-category-select"]/text()') tt = [i.strip() for i in categorys if i.strip()] category = tt[3] gongying = t1.xpath('//div[@class="real-information"]/span/text()') shop = gongying[1] shop_call = gongying[3] shop_time = gongying[5] content = str(name) + ',' + str(score) + ',' + str(color) + ',' + str(size) + ',' + str(get_style) + ',' + str(category) + ',' + str(duihuan) + ',' \ + str(shop) + ',' + str(shop_call) + ',' + str(shop_time) + ',' + str(url) + '\n' return content except Exception as e: log.info(e) log.info("##################this url is a no response: %s" % url) def get_data_all(pages): pages = int(pages) for page in range(1,pages + 1): log.info('正在获取第%s页商品...' %page) urls_one = get_urls(page) log.info("该页所有商品URL: %s" % urls_one) log.info("正在全力工作中......") count = 0 if not urls_one: continue try: for i in urls_one: content = get_data(i) gQueue.put(content) #count += 1 #today_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time())) #log.info(f'{today_time}###第{count}条###',content) time.sleep(3.4) #file.write(content) #log.info(content) except Exception as e: log.info(e) continue log.info('成功取到所有数据.') def save_data(): count = 0 title = '商品名,兑换积分,商品规格,商品颜色,购买方式,分类,兑换,供货商名,供货商电话,供货商工作时间,商品链接\n' times = time.time() local_time = time.localtime(times) today = time.strftime("%Y-%m-%d",local_time) today_time = time.strftime("%Y-%m-%d %H:%M:%S",local_time) file_name = '/root/py/work-' + today + '.csv' with open(file_name, 'w', encoding='gbk') as file: file.write(title) while True: time.sleep(0.1) if not gQueue.empty(): msg = gQueue.get() if msg == None: continue count = count + 1 today_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time())) log.info(f'%s###第%s条###%s' % (today_time,count,msg)) with open(file_name, 'a', encoding='gbk') as file: file.write(msg) def send_mail(): # 发信方的信息:发信邮箱,QQ 邮箱授权码 # 授权码password可以在qq邮箱网站申请:设置->账户->开启服务:POP3/SMTP服务 (如何使用 Foxmail 等软件收发邮件?)选择开启\即可生成授权码 from_addr = '......@qq.com' password = '......' # 收信方邮箱 #to_addr_qq = '......@qq.com' to_addr_qq = '......@qq.com' to_addr = '......@nooce.cn' # 发信服务器 smtp_server = 'smtp.qq.com' html_msg = """ <p>csv文件</p> """ # 创建一个带附件的实例msg msg = MIMEMultipart() msg['From'] = Header('Q') # 发送者 msg['To'] = Header('珍') # times = time.time() local_time = time.localtime(times) today = time.strftime("%Y-%m-%d",local_time) today_title = time.strftime("%Y年%m月%d日,",local_time) subject = today_title + '来自Q sir的邮件' msg['Subject'] = Header(subject, 'utf-8') # 邮件主题 # 邮件正文内容 msg.attach(MIMEText(html_msg, 'html', 'utf-8')) # 构造附件1,传送当前目录下的 test1.txt 文件 context = '/root/py/work-' + today + '.xlsx' att1 = MIMEText(open(context, 'rb').read(), 'base64', 'utf-8') att1["Content-Type"] = 'application/octet-stream' # 这里的filename可以任意写,写什么名字,邮件中显示什么名字 att1["Content-Disposition"] = 'attachment; filename="zhuzhu-2022.xlsx"' msg.attach(att1) try: smtpobj = smtplib.SMTP_SSL(smtp_server) smtpobj.connect(smtp_server, 465) # 建立连接--qq邮箱服务和端口号 smtpobj.login(from_addr, password) # 登录--发送者账号和口令 smtpobj.sendmail(from_addr, to_addr, msg.as_string()) smtpobj.sendmail(from_addr, to_addr_qq, msg.as_string()) log.info("给小可爱的邮件已经成功发送!") except smtplib.SMTPException: log.info("无法发送邮件哦") finally: # 关闭服务器 smtpobj.quit() def csv_excel(): try: times = time.time() local_time = time.localtime(times) today = time.strftime("%Y-%m-%d",local_time) #filename = '/root/py/work-' + today + '.csv' #csv_file=pd.read_csv(filename, low_memory=False, encoding='gbk') #csv_file.to_excel('/root/py/work-' + today + '.xlsx', index=False, encoding='gbk') xlsFilepath = '/root/py/work-' + today + '.xlsx' csv_path = '/root/py/work-' + today + '.csv' my_dataframe = pd.read_csv(csv_path, low_memory=False, encoding='gbk') writer = pd.ExcelWriter(xlsFilepath, engine='xlsxwriter') #写excel文件使用pandas to_excel my_dataframe.to_excel(writer, startrow = 1, sheet_name='Sheet1', index=False) workbook = writer.book worksheet = writer.sheets['Sheet1'] #遍历每一列并设置width ==该列的最大长度。填充长度也增加了2。 for i, col in enumerate(my_dataframe.columns): # 求列I的长度 column_len = my_dataframe[col].astype(str).str.len().max() # 如果列标题较大,则设置长度 # 大于最大列值长度 column_len = max(column_len, len(col)) + 2 # 设置列的长度 worksheet.set_column(i, i, column_len) writer.save() log.info("csv to excel success") return 0 except Exception as e: log.info('csv to excel failed,reason is%s' % e) return -1 def main(): try: pages = 15 #pages = func_timeout(10, lambda: input('请输入总页数(要输入整数哦):')) except ValueError as e: log.info('不和你玩了!') return #except FunctionTimedOut: # pages = 10 # log.info('输入超时,默认获取10页数据:') t1 = threading.Thread(target=get_data_all,args=(pages,)) t2 = threading.Thread(target=save_data) t1.setDaemon(True) t2.setDaemon(True) t1.start() t2.start() while True: time.sleep(10) if t1.isAlive(): pass else: exc = csv_excel() if exc != 0: return log.info("####准备发送邮件啦####") time.sleep(25) send_mail() log.info("####任务结束####") break if __name__ == '__main__': main()
crontab -e
05 09 * * * /usr/bin/python3 /root/py/work.py &
30 13 * * 1 /usr/bin/python3 /root/py/work.py &
#!/usr/bin/python3 import re import httpx import asyncio,aiohttp import random,time from lxml import html import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header #import threading from queue import Queue import time,os import pandas as pd from func_timeout import FunctionTimedOut, func_timeout gQueue = Queue() headers = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Encoding": "gzip, deflate", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", "Host": "xyk.cebbank.com", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" } dict_score = {} url_res_dict = {} def send_mail(): # 发信方的信息:发信邮箱,QQ 邮箱授权码 from_addr = '@qq.com' password = '' # 收信方邮箱 #to_addr_qq = '@qq.com' to_addr_qq = '@qq.com' to_addr = '@nooce.cn' # 发信服务器 smtp_server = 'smtp.qq.com' html_msg = """ <p>csv文件</p> """ # 创建一个带附件的实例msg msg = MIMEMultipart() msg['From'] = Header('Q') # 发送者 msg['To'] = Header('珍') # times = time.time() local_time = time.localtime(times) today = time.strftime("%Y-%m-%d",local_time) today_title = time.strftime("%Y-%m-%d:",local_time) subject = today_title + '来自Q sir的邮件' msg['Subject'] = Header(subject, 'utf-8') # 邮件主题 # 邮件正文内容 msg.attach(MIMEText(html_msg, 'html', 'utf-8')) # 构造附件1,传送当前目录下的 test1.txt 文件 context = 'work-' + today + '.xlsx' att1 = MIMEText(open(context, 'rb').read(), 'base64', 'utf-8') att1["Content-Type"] = 'application/octet-stream' # 这里的filename可以任意写,写什么名字,邮件中显示什么名字 att1["Content-Disposition"] = 'attachment; filename="zhuzhu-2022.xlsx"' msg.attach(att1) try: smtpobj = smtplib.SMTP_SSL(smtp_server) smtpobj.connect(smtp_server, 465) # 建立连接--qq邮箱服务和端口号 smtpobj.login(from_addr, password) # 登录--发送者账号和口令 smtpobj.sendmail(from_addr, to_addr, msg.as_string()) smtpobj.sendmail(from_addr, to_addr_qq, msg.as_string()) print("给小可爱的邮件已经成功发送!") except smtplib.SMTPException: print("无法发送邮件哦") finally: # 关闭服务器 smtpobj.quit() def get_urls(page): global dict_score url0 = "https://xyk.cebbank.com/jfmall/search?keywords=&pageNo=" + str(page) with httpx.Client() as client: req0 = client.get(url0,headers=headers,timeout=30) req0.decoding = 'gbk' #result = re.compile(r'class="t1 ">.*? <a target="_blank" title=".*?" href="(.*?)".*? <span class="t2">',re.S)#无re.S只在每一行内匹配 result1 = re.compile('<div class="main-item-list-title">.*?<a href="(.*?)" class="text-hover-black js-filter-title" rel="noopener noreferrer"',re.S) result2 = re.compile('<span class="text-color-red text-font-size-18 text-font-weight-bold">(.*?)</span>',re.S) url = re.findall(result1, req0.text) jifen = re.findall(result2, req0.text) urls = ["https://xyk.cebbank.com" + u1 for u1 in url] if 'This is 403 error page' in req0.text: print('have a 403 error,function not use') return for i in range(0,len(urls)): dict_score[urls[i]] = jifen[i] return urls def deal_size_color(data): color = '' size = '' if len(data) == 0: color,size = '无','无' if len(data) == 1: if '色' in data[0]: color = data[0] size = '无' else: size = data[0] color = '无' if len(data) == 2: if '色' in data[0]: color = data[0] size = data[1] else: size = data[0] if '色' in data[1]: color = data[1] else: color = '无' if ',' in color: color = color.replace(',',';') if ',' in size: size = size.replace(',',';') if '"' in size: size = size.replace('"','') return [color,size] async def get_data(url): global gQueue global dict_score global count global url_res_dict count += 1 try: #with httpx.Client() as client: async with asyncio.Semaphore(500): async with httpx.AsyncClient() as client: res = await client.get(url,headers=headers,timeout=20) url_res_dict[res] = url gQueue.put(res) except Exception as e: print('超时数据自动跳过.') ''' async with asyncio.Semaphore(10): async with aiohttp.ClientSession() as session: #res = await client.get(url,headers=headers,timeout=30) async with session.get(url,headers=headers,timeout=30) as response: res = await response.read() url_res_dict[res] = url gQueue.put(res) ''' def save_csv(): global url_res_dict,dict_score while not gQueue.empty(): try: res = gQueue.get() url = url_res_dict[res] # aiohttp #res = res.decode('UTF-8') #t1 = html.fromstring(res) # httpx res.encoding = 'utf-8' t1 = html.fromstring(res.text) name = t1.xpath('//div[@class="product-detail-content-title js-itemId"]/text()')[0].strip() duihuan = t1.xpath('//div[@class="text-color-red text-font-weight-bold"]/text()')[0].strip() score = dict_score[url] #color = t1.xpath('//span[@class="meta-title"]/text()')[0].strip() size_col = t1.xpath('//span[contains(@class,"meta-title")]/text()')#[0].strip() sc = deal_size_color(data=size_col) size = sc[1].strip() color = sc[0].strip() get_style = t1.xpath('//span[@class="exchangeWay"]/text()')[0].strip() categorys = t1.xpath('//a[@class="js-category-select"]/text()') tt = [i.strip() for i in categorys if i.strip()] category = tt[3] gongying = t1.xpath('//div[@class="real-information"]/span/text()') shop = gongying[1] shop_call = gongying[3] shop_time = gongying[5] content = str(name) + ',' + str(score) + ',' + str(color) + ',' + str(size) + ',' + str(get_style) + ',' + str(category) + ',' + str(duihuan) + ',' \ + str(shop) + ',' + str(shop_call) + ',' + str(shop_time) + ',' + str(url) + '\n' local_time = time.localtime(time.time()) today = time.strftime("%Y-%m-%d",local_time) today_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time())) #print(content) if content == None: return file_name = 'work-' + today + '.csv' with open(file_name, 'a', encoding='gbk') as file: file.write(content) except Exception as e: print(e) def csv_excel(): print("开始转换成excel数据") try: times = time.time() local_time = time.localtime(times) today = time.strftime("%Y-%m-%d",local_time) xlsFilepath = 'work-' + today + '.xlsx' csv_path = 'work-' + today + '.csv' my_dataframe = pd.read_csv(csv_path, low_memory=False, encoding='gbk') #print(my_dataframe['兑换积分'].sort_values()) if not len(my_dataframe): print("not data") return -1 my_dataframe = my_dataframe.sort_values(by='兑换积分') writer = pd.ExcelWriter(xlsFilepath, engine='xlsxwriter') #写excel文件使用pandas to_excel my_dataframe.to_excel(writer, startrow = 1, sheet_name='Sheet1', index=False) workbook = writer.book worksheet = writer.sheets['Sheet1'] #遍历每一列并设置width ==该列的最大长度。填充长度也增加了2。 for i, col in enumerate(my_dataframe.columns): # 求列I的长度 column_len = my_dataframe[col].astype(str).str.len().max() # 如果列标题较大,则设置长度 # 大于最大列值长度 column_len = max(column_len, len(col)) + 2 # 设置列的长度 worksheet.set_column(i, i, column_len) writer.save() print("转换成excel表格成功。") #return 0 except Exception as e: print('转换成excel表格失败,原因 is%s' % e) return -1 else: times = time.time() local_time = time.localtime(times) today = time.strftime("%Y-%m-%d",local_time) path = 'work-' + today + '.csv' if os.path.exists(path): os.remove(path) return 0 def get_tasks(): #a = input("请输入总页数") try: pages = func_timeout(15, lambda: input('请输入需要的数据总页数,默认每页20条数据(要输入整数哦):')) mail = func_timeout(15, lambda: input('是否发送邮件,请输入"yes" or "no":')) except FunctionTimedOut: pages = 10 mail = 'no' print('输入超时,默认获取10页数据,不发邮件哦') print("开始获取数据了哦,默认按照兑换积分升序排序。") global count #pages = 15 count = 0 title = '商品名,兑换积分,商品规格,商品颜色,购买方式,分类,兑换,供货商名,供货商电话,供货商工作时间,商品链接\n' times = time.time() local_time = time.localtime(times) today1 = time.strftime("%Y-%m-%d",local_time) today_time = time.strftime("%Y-%m-%d %H:%M:%S",local_time) file_name = 'work-' + today1 + '.csv' with open(file_name, 'w', encoding='gbk') as file: file.write(title) pages = int(pages) urls_all = [] for page in range(1,pages + 1): print('正在获取第%s页商品...' %page) time.sleep(0.5) urls_all.extend(get_urls(page)) print("所有商品URL: %s" % len(urls_all)) return (urls_all,mail) if __name__ == '__main__': result = get_tasks() if len(result[0]) > 500: a,b = len(result[0]),500 #由于windows最大并发数509,此处分割最大请求为500 res = lambda a,b:[(i*b,i*b+b) for i in range(0,int(a/b))] L1 = res(a,b) L1.append((L1[-1][-1],a)) print(f"{L1}--由于数据条数为:{len(result[0])}条,需要分{len(L1)}次运行") start = time.time() loop = asyncio.get_event_loop() print("开始进行并发请求中...") # 判断是否需要多次使用loop处理并发 if len(result[0]) > 500: for i in L1: tasks=[ loop.create_task(get_data(i)) for i in result[0][i[0]:i[1]] ] loop.run_until_complete(asyncio.wait(tasks)) else: tasks=[ loop.create_task(get_data(i)) for i in result[0] ] loop.run_until_complete(asyncio.wait(tasks)) loop.close() save_csv() end = time.time() print(f"执行完成,共耗时: {end - start}秒") csv_excel() if result[1] == 'yes' or result[1] == 'y': send_mail() print("邮件发送成功。") else: print("用户取消发送邮件邮件。")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。