赞
踩
很多地方需要用到 解析地理区域
这块数据,但是人家是一个网页。
2016
年我用 Java
实现了一版,感觉使用起来不是很方便,后来又在 2020
年 用 Python
又实现了一次,代码量明显少了很大一部分。
地理区域的数据结构比较经典,完全是树形数据结构,随着树的深度增大,数据增长在10倍左右。
在实现上,使用了 Python
的 pymysql
的类库。
在数据存储上,利用了数据库,在数据库选择上,本着简单易用的原则选择了 MySQL
,数据库表设计如下:
CREATE TABLE areas (
code VARCHAR(30) DEFAULT NULL,
name VARCHAR(100) DEFAULT NULL,
lv INT(11) DEFAULT NULL,
sup_code VARCHAR(30) DEFAULT NULL,
flag VARCHAR(6) DEFAULT NULL,
url VARCHAR(60) DEFAULT NULL
) ;
DbUtil.py
插件 | 说明 |
---|---|
pymysql | |
BeautifulSoup |
通过 Python Interpreter
安装 或者 pip
安装都可以,自行视实际情况自行选择
在 AreaMa.py
中,依次执行 init_province()
、 init_city()
、 init_county()
、 init_town()
、 init_village()
。
import urllib.request from bs4 import BeautifulSoup from sip.Area import Area from util.DbUtil import * G_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36'} G_URL = r'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021/index.html' """ 获取URL前缀 """ G_URL_PREFIX = G_URL[0:G_URL.rindex('/') + 1] ''' 爬取国家统计局网站中 关于行政区划的数据,按照层级进行编辑,最终形成一个树形结构。 包含:省/直辖市/自治区、地级市、区县、乡/镇/街道、居委会/村 ''' def main(): """ 第一级 省|直辖市 """ # init_province() """ 第二级 地级市 """ # init_city() """ 第三级 区县 """ # init_county() """ 第四级 乡镇街道 """ init_town() """ 第四级 自然村/社区 """ # init_village() if __name__ == '__main__': main() """ Init 执行入口,按照 1、2、3层级顺序执行 """ def get_by_lv(lv): with UsingMysql(commit=True) as um: um.cursor.execute(f'select code,name,lv,sup_code,url from areas where lv=%s', lv) data = um.cursor.fetchall() return data """ 由于街道数量量比较大,所以要分页处理 """ def init_village(): do_init_village(4) def do_init_village(lv): pag = UsingMysql(numPerPage=20) sql = r'select code,name,lv,sup_code,url from areas where lv=%s' param = lv for ret in pag.queryForList(sql, param): paramsList = DataToJson(ret) get_village(paramsList) """ 由于街道数量量比较大,所以要分页处理 """ def init_town(): do_init_town(3) # print(get_by_lv(1)) ''' code,name,lv,sup_code,url ''' def DataToJson(data): jsonData = [] for row in data: result = {} result['code'] = row[0] result['name'] = row[1] result['lv'] = row[2] result['sup_code'] = row[3] result['url'] = row[4] jsonData.append(result) return jsonData def do_init_town(lv): pag = UsingMysql(numPerPage=20) sql = r'select code,name,lv,sup_code,url from areas where lv=%s' param = lv for ret in pag.queryForList(sql, param): paramsList = DataToJson(ret) # print(len(paramsList)) get_town(paramsList) ''' 批量插入数据库 ''' def batch_insert(_list): with UsingMysql(commit=True) as um: um.cursor.executemany(""" INSERT INTO areas (code,name,lv,sup_code,url) VALUES (%s,%s,%s,%s,%s)""", _list) def splicing(paramsList): _list = list() for el in paramsList: _list.append((el.code, el.name, el.lv, el.sup_code, el.url)) batch_insert(_list) ''' 数据字典,维护 层级对应的 DIV标签 ''' G_LV_DICT = {1: 'provincetr', 2: 'citytr', 3: 'countytr', 4: 'towntr', 5: 'villagetr'} ''' 核心方法 通用处理方法,封装处理解析逻辑 ''' def commons(_list, lv): all_area = [] ''' 利用父级来遍历子级 ''' for lp in _list: _url = lp['url'] if _url is None or len(_url) == 0: continue else: req = urllib.request.Request(url=get_to_url(_url, lp['code'], lv), headers=G_HEADERS) res = urllib.request.urlopen(req) html = res.read() soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312') all_tr = soup.find_all('tr', attrs={'class': G_LV_DICT[lv]}, limit=30) for row in all_tr: if lv == 5: """ 社区的元素发生变更,需要特殊处理 """ a_ary = row.find_all('td') all_area.append(Area(None, a_ary[2].get_text(), a_ary[0].get_text(), lp['code'], lv)) else: a_ary = row.find_all('a') if len(a_ary): all_area.append( Area(a_ary[0].get('href'), a_ary[1].get_text(), a_ary[0].get_text(), lp['code'], lv)) else: aa_ary = row.find_all('td') all_area.append(Area(None, aa_ary[1].get_text(), aa_ary[0].get_text(), lp['code'], lv)) ''' 每次批量写入 ''' splicing(all_area) ''' 写完后 置空 ''' all_area = [] ''' 性化处理URL地址,并根据层级获取对应URL ''' def get_to_url(url, code, lv): urs = { 2: G_URL_PREFIX + url, 3: G_URL_PREFIX + url, 4: G_URL_PREFIX + code[0:2] + '/' + url, 5: G_URL_PREFIX + code[0:2] + '/' + code[2:4] + '/' + url, } return urs.get(lv, None) ''' 省/自治区 ''' def init_province(): all_area = [] req = urllib.request.Request(url=G_URL, headers=G_HEADERS) res = urllib.request.urlopen(req) html = res.read() soup = BeautifulSoup(html, 'html.parser', from_encoding='gb2312') all_tr = soup.find_all('tr', attrs={'class': 'provincetr'}, limit=10) for row in all_tr: for r_td in row.find_all('a'): ars = Area(r_td.get('href'), r_td.get_text(), r_td.get('href')[0:2], '0', 1) all_area.append(ars) splicing(all_area) ''' 市 ''' def init_city(): _list = get_by_lv(1) commons(_list, 2) ''' 区县 ''' def init_county(): _list = get_by_lv(2) commons(_list, 3) ''' 街道 ''' def get_town(paramsList): return commons(paramsList, 4) ''' 居委会 ''' def get_village(paramsList): return commons(paramsList, 5)
名称 | 说明 |
---|---|
name | 名称 |
code | 编码 |
url | 地址 |
sup_code | 上级编码 |
lv | 层级 |
class Area: "Area class 行政区域实例" def __init__(self, url, name, code, sup_code, lv): self.name = name self.code = code self.url = url self.sup_code = sup_code self.lv = lv def __str__(self) -> str: return 'URL:%s\t NAME:%s\t Code:%s\t SUPER_CODE:%s\t LV:%s\t' % ( self.url, self.name, self.code, self.sup_code, self.lv)
import pymysql import math """ 用pymysql 操作数据库 """ def get_connection(): host = '127.0.0.1' port = 13306 db = 'area' user = 'root' password = 'xxx' conn = pymysql.connect(host=host, port=port, db=db, user=user, password=password) return conn def initClientEncode(conn): '''mysql client encoding=utf8''' curs = conn.cursor() curs.execute("SET NAMES utf8") conn.commit() return curs """ 使用 with 的方式来优化代码 """ class UsingMysql(object): def __init__(self, commit=True, log_label=' In total', numPerPage=20): """ :param commit: 是否在最后提交事务(设置为False的时候方便单元测试) :param log_label: 自定义log的文字 """ self._commit = commit self._log_label = log_label self.numPerPage = numPerPage def queryForList(self, sql, param=None): totalPageNum = self.__calTotalPages(sql, param) for pageIndex in range(totalPageNum): yield self.__queryEachPage(sql, pageIndex, param) def __createPaginaionQuerySql(self, sql, currentPageIndex): startIndex = self.__calStartIndex(currentPageIndex) qSql = r'select * from (%s) total_table limit %s,%s' % (sql, startIndex, self.numPerPage) return qSql def __queryEachPage(self, sql, currentPageIndex, param=None): curs = initClientEncode(get_connection()) qSql = self.__createPaginaionQuerySql(sql, currentPageIndex) if param is None: curs.execute(qSql) else: curs.execute(qSql, param) result = curs.fetchall() curs.close() return result def __calTotalRowsNum(self, sql, param=None): ''' 计算总行数 ''' tSql = r'select count(*) from (%s) total_table' % sql curs = initClientEncode(get_connection()) if param is None: curs.execute(tSql) else: curs.execute(tSql, param) result = curs.fetchone() curs.close() totalRowsNum = 0 if result != None: totalRowsNum = int(result[0]) return totalRowsNum def __calTotalPages(self, sql, param): ''' 计算总页数 ''' totalRowsNum = self.__calTotalRowsNum(sql, param) totalPages = 0 tempTotal = totalRowsNum / self.numPerPage if (totalRowsNum % self.numPerPage) == 0: totalPages = tempTotal else: totalPages = math.ceil(tempTotal) return totalPages def __calStartIndex(self, currentPageIndex): startIndex = currentPageIndex * self.numPerPage return startIndex; def __calLastIndex(self, totalRows, totalPages, currentPageIndex): '''计算结束时候的索引''' lastIndex = 0 if totalRows < self.numPerPage: lastIndex = totalRows elif ((totalRows % self.numPerPage == 0) or (totalRows % self.numPerPage != 0 and currentPageIndex < totalPages)): lastIndex = currentPageIndex * self.numPerPage elif (totalRows % self.numPerPage != 0 and currentPageIndex == totalPages): # 最后一页 lastIndex = totalRows return lastIndex def __enter__(self): """ 在进入的时候自动获取连接和cursor """ conn = get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) conn.autocommit = False self._conn = conn self._cursor = cursor return self def __exit__(self, *exc_info): """ 提交事务 """ if self._commit: self._conn.commit() """ 在退出的时候自动关闭连接和cursor """ self._cursor.close() self._conn.close() @property def cursor(self): return self._cursor
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。