赞
踩
输入:职位关键词keyword和城市city_name,如key_word=Python开发,city_name=北京
输出:以输入参数为名的文档,如collections_name=Python开发_北京
通过网络抓包分析,页面URL路径为:https://www.zhipin.com/job_detail/?query=PYTHON&city=101010100&page=1
该路径也有其他变种,如city_id由query参数变为path参数,效果一样
关键参数
query:职位关键词
city:城市ID
page:当前页码
注意到参数中的城市ID为内部ID,需要建立城市名称与ID的映射
经过抓包找到了这个映射文件路径,https://www.zhipin.com/wapi/zpCommon/data/city.json
该json分为三部分:热点城市列表、当前所处城市,全国全量省份及城市列表,我们需要的只是全国城市列表,这里需要做下数据预处理
通过上述路径下载的city.json重命名为china.json,然后进行分析处理生成真正的city.json,代码如下:
# -*- coding:utf-8 -*- import json import os # 读取json文件 with open(os.path.join(os.path.dirname(__file__),'china.json'),'r',encoding='utf-8') as f: china_list=json.loads(f.read())['zpData'] province_dict,city_dict={},{} for province in china_list['cityList']: province_dict[province['name']]=province['code'] for city in province['subLevelModelList']: city_dict[city['name']]=city['code'] # 生成省份列表,非必要 with open(os.path.join(os.path.dirname(__file__),'province.json'),'w',encoding='utf-8') as f: f.write(str(province_dict).replace('\'','"')) with open(os.path.join(os.path.dirname(__file__),'city.json'),'w',encoding='utf-8') as f: f.write(str(city_dict).replace('\'','"')) print('处理完成!')
生成的city.json如下:
接下是重要的数据爬取阶段,也是踩坑阶段。
初步方案是使用scrapy对路径 https://www.zhipin.com/job_detail/?query=PYTHON&city=101010100&page=1 进行请求,然后解析处理入库,但后来发现Boss直聘已经使用了Ajax动态加载和cookie限制技术,经过艰难的代码解读和技术尝试,最后决定放弃这个方案。
最终选定的方案是使用selenium模拟访问,在进行实操时发现官网也使用了手动验证码的反扒技术,如此严密多重的反扒技术对于数据驱动的招聘企业也是可以理解的,最后只能退而求其次,采用代码自动爬取为主,手动拖动验证辅助的方式进行数据爬取。
默认模拟静默访问爬取,如果需要进行手动验证,则打开浏览器进行手动验证。
# -*- coding:utf-8 -*- """ 根据关键词和城市爬取BOSS直聘招聘信息 """ from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from pymongo import MongoClient import time import os import json import logging logging.getLogger().setLevel(logging.INFO) def get_jobs_info(url): ''' 根据URL获取响应招聘职位信息 ''' job_list = [] browser.get(url) try: # 等待查询结果加载完毕 wait = WebDriverWait(browser, 20) wait.until(EC.presence_of_element_located( (By.CSS_SELECTOR, 'div.job-primary'))) except: return job_list job_list_elements = browser.find_elements_by_class_name('job-primary') for job_elements in job_list_elements: job = {} job['job-name'] = job_elements.find_element_by_class_name( 'job-name').text job['job-area'] = job_elements.find_element_by_class_name( 'job-area').text job['company-name'] = job_elements.find_element_by_class_name( 'company-text').find_element_by_class_name('name').text try: salary_info = job_elements.find_element_by_class_name( 'red').text.replace('K', '') salary_list = salary_info.split('·') job['salary-extra'] = salary_list[1] if len( salary_list) == 2 else '' job['salary-min'], job['salary-max'] =map(int, salary_list[0].split('-')) except Exception as e: logging.error(e) try: company_info = job_elements.find_element_by_class_name('company-text').find_element_by_css_selector( 'p').get_attribute('innerHTML').replace('<em class="vline"></em>', ' ').split() job['company-industry'] = company_info[0] job['company-ipo'] = company_info[1] job['company-size'] = company_info[2] except Exception as e: logging.error(e) job['skill-tags'] = [] for element in job_elements.find_elements_by_class_name('tag-item'): if len(element.text.strip()) > 0: job['skill-tags'].append(element.text) job_list.append(job) return job_list def mongo_db(db_name, collections_name): ''' 获取数据库文档实例 ''' MONGO_HOST = '***.***.***.***' MONGO_PORT = 000 MONGO_USER = '***' MONGO_PASSWORD = '***' try: client = MongoClient(host=MONGO_HOST, port=MONGO_PORT) db = client.admin.authenticate(MONGO_USER, MONGO_PASSWORD) logging.info('登陆成功!') except Exception as e: logging.error(e) return client[db_name][collections_name] if __name__ == '__main__': # 创建城市ID与名称字典 with open(os.path.join(os.path.dirname(__file__), 'city.json'), 'r', encoding='utf-8') as f: city_dict = json.loads(f.read()) # 输入关键词和城市ID keyword, city_name = 'Python开发', '北京' # 查询对应城市ID city_id = city_dict[city_name] # 最大翻页数 page_num, max_page_num = 1, 15 # 基础URL模板 base_url = 'https://www.zhipin.com/job_detail/?query={keyword}&city={city_id}&page={page_num}' # 存入MongoDB db_name = 'job_bosszp' collections_name = '{0}_{1}'.format(keyword, city_name) # 停歇时间 idle_time = 5 # 禁用测试软件提示 option = webdriver.ChromeOptions() option.set_headless(True) option.add_experimental_option('excludeSwitches', ['enable-automation']) # 建立文档,若已存在则清除 collection = mongo_db(db_name, collections_name) if collection.find().count(): collection.drop() logging.info('文档存在,已被清除并覆盖!') while page_num <= max_page_num: browser = webdriver.Chrome(options=option) logging.info('浏览器已启动。') logging.info('正在爬取第{0}页...'.format(page_num)) job_list = get_jobs_info(base_url.format( keyword=keyword, city_id=city_id, page_num=page_num)) if len(job_list) == 0: logging.error('爬取出现问题,正在重试,需要手动验证!') option.set_headless(False) continue else: page_num = page_num+1 option.set_headless(True) logging.info('第{0}页爬取完成'.format(page_num)) logging.info('正在存取第{0}页...'.format(page_num)) for job in job_list: collection.insert_one(job) logging.info('第{0}页存取完成!'.format(page_num)) time.sleep(idle_time) browser.quit() logging.info('全部存取完成!')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。