当前位置:   article > 正文

Boss直聘数据爬取并根据关键词和城市分析职位需求工资水平-数据爬取、预处理及入库_boss招聘数据分析案例

boss招聘数据分析案例

数据爬取、预处理及入库

目标设定

输入:职位关键词keyword和城市city_name,如key_word=Python开发,city_name=北京

输出:以输入参数为名的文档,如collections_name=Python开发_北京

爬取路径分析

通过网络抓包分析,页面URL路径为:https://www.zhipin.com/job_detail/?query=PYTHON&city=101010100&page=1

该路径也有其他变种,如city_id由query参数变为path参数,效果一样

关键参数

query:职位关键词

city:城市ID

page:当前页码

注意到参数中的城市ID为内部ID,需要建立城市名称与ID的映射

经过抓包找到了这个映射文件路径,https://www.zhipin.com/wapi/zpCommon/data/city.json

该json分为三部分:热点城市列表、当前所处城市,全国全量省份及城市列表,我们需要的只是全国城市列表,这里需要做下数据预处理

在这里插入图片描述

城市ID与名称映射文件预生成

通过上述路径下载的city.json重命名为china.json,然后进行分析处理生成真正的city.json,代码如下:

# -*- coding:utf-8 -*-
import json
import os

# 读取json文件
with open(os.path.join(os.path.dirname(__file__),'china.json'),'r',encoding='utf-8') as  f:
    china_list=json.loads(f.read())['zpData']

province_dict,city_dict={},{}

for province in china_list['cityList']:

    province_dict[province['name']]=province['code']

    for city in province['subLevelModelList']:
        city_dict[city['name']]=city['code']

# 生成省份列表,非必要
with open(os.path.join(os.path.dirname(__file__),'province.json'),'w',encoding='utf-8') as  f:
    f.write(str(province_dict).replace('\'','"'))

with open(os.path.join(os.path.dirname(__file__),'city.json'),'w',encoding='utf-8') as  f:
    f.write(str(city_dict).replace('\'','"'))

print('处理完成!')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

生成的city.json如下:

在这里插入图片描述

踩坑

接下是重要的数据爬取阶段,也是踩坑阶段。

初步方案是使用scrapy对路径 https://www.zhipin.com/job_detail/?query=PYTHON&city=101010100&page=1 进行请求,然后解析处理入库,但后来发现Boss直聘已经使用了Ajax动态加载和cookie限制技术,经过艰难的代码解读和技术尝试,最后决定放弃这个方案。

最终选定的方案是使用selenium模拟访问,在进行实操时发现官网也使用了手动验证码的反扒技术,如此严密多重的反扒技术对于数据驱动的招聘企业也是可以理解的,最后只能退而求其次,采用代码自动爬取为主,手动拖动验证辅助的方式进行数据爬取。

在这里插入图片描述

爬取、预处理及入库

默认模拟静默访问爬取,如果需要进行手动验证,则打开浏览器进行手动验证。

代码实现
# -*- coding:utf-8 -*-
"""
根据关键词和城市爬取BOSS直聘招聘信息
"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pymongo import MongoClient

import time
import os
import json
import logging
logging.getLogger().setLevel(logging.INFO)


def get_jobs_info(url):
    '''
    根据URL获取响应招聘职位信息
    '''
    job_list = []

    browser.get(url)

    try:
        # 等待查询结果加载完毕
        wait = WebDriverWait(browser, 20)
        wait.until(EC.presence_of_element_located(
            (By.CSS_SELECTOR, 'div.job-primary')))
    except:
        return job_list

    job_list_elements = browser.find_elements_by_class_name('job-primary')

    for job_elements in job_list_elements:
        job = {}
        job['job-name'] = job_elements.find_element_by_class_name(
            'job-name').text
        job['job-area'] = job_elements.find_element_by_class_name(
            'job-area').text
        job['company-name'] = job_elements.find_element_by_class_name(
            'company-text').find_element_by_class_name('name').text

        try:
            salary_info = job_elements.find_element_by_class_name(
                'red').text.replace('K', '')
            salary_list = salary_info.split('·')

            job['salary-extra'] = salary_list[1] if len(
                salary_list) == 2 else ''
            job['salary-min'], job['salary-max'] =map(int, salary_list[0].split('-'))

        except Exception as e:
            logging.error(e)

        try:
            company_info = job_elements.find_element_by_class_name('company-text').find_element_by_css_selector(
                'p').get_attribute('innerHTML').replace('<em class="vline"></em>', ' ').split()
            job['company-industry'] = company_info[0]
            job['company-ipo'] = company_info[1]
            job['company-size'] = company_info[2]
        except Exception as e:
            logging.error(e)

        job['skill-tags'] = []
        for element in job_elements.find_elements_by_class_name('tag-item'):
            if len(element.text.strip()) > 0:
                job['skill-tags'].append(element.text)

        job_list.append(job)

    return job_list


def mongo_db(db_name, collections_name):
    '''
    获取数据库文档实例
    '''
    MONGO_HOST = '***.***.***.***'
    MONGO_PORT = 000
    MONGO_USER = '***'
    MONGO_PASSWORD = '***'

    try:
        client = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
        db = client.admin.authenticate(MONGO_USER, MONGO_PASSWORD)
        logging.info('登陆成功!')
    except Exception as e:
        logging.error(e)

    return client[db_name][collections_name]


if __name__ == '__main__':

    # 创建城市ID与名称字典
    with open(os.path.join(os.path.dirname(__file__), 'city.json'), 'r', encoding='utf-8') as f:
        city_dict = json.loads(f.read())

    # 输入关键词和城市ID
    keyword, city_name = 'Python开发', '北京'
    # 查询对应城市ID
    city_id = city_dict[city_name]
    # 最大翻页数
    page_num, max_page_num = 1, 15
    # 基础URL模板
    base_url = 'https://www.zhipin.com/job_detail/?query={keyword}&city={city_id}&page={page_num}'
    # 存入MongoDB
    db_name = 'job_bosszp'
    collections_name = '{0}_{1}'.format(keyword, city_name)
    # 停歇时间
    idle_time = 5

    # 禁用测试软件提示
    option = webdriver.ChromeOptions()
    option.set_headless(True)
    option.add_experimental_option('excludeSwitches', ['enable-automation'])

    # 建立文档,若已存在则清除
    collection = mongo_db(db_name, collections_name)

    if collection.find().count():
        collection.drop()
        logging.info('文档存在,已被清除并覆盖!')

    while page_num <= max_page_num:

        browser = webdriver.Chrome(options=option)

        logging.info('浏览器已启动。')

        logging.info('正在爬取第{0}页...'.format(page_num))

        job_list = get_jobs_info(base_url.format(
            keyword=keyword, city_id=city_id, page_num=page_num))

        if len(job_list) == 0:
            logging.error('爬取出现问题,正在重试,需要手动验证!')
            option.set_headless(False)
            continue
        else:
            page_num = page_num+1
            option.set_headless(True)

        logging.info('第{0}页爬取完成'.format(page_num))

        logging.info('正在存取第{0}页...'.format(page_num))

        for job in job_list:
            collection.insert_one(job)

        logging.info('第{0}页存取完成!'.format(page_num))

        time.sleep(idle_time)

        browser.quit()

    logging.info('全部存取完成!')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160

数据预览

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/517008
推荐阅读
相关标签
  

闽ICP备14008679号