赞
踩
首先需要搭建一个简单的flask框架,使用flask框架的原因是由于项目需要开放一些api接口供外部调用,意图识别由于只是项目的一部分,所以在多人做不同模块的情况下,使用api接口调用方式非常合理,只需规定输出格式,就可以多个模块之间相互调用,可以理解为最简单的spring cloud。
以下为目录结构:
--app
--main
--__init__.py
--view.py
--error.py
--dao
--model
--service
--log
--trainModel
--utils
--repositories
--manage.py
--config.py
相当简单的项目结构
由于咱们主要讲意图识别,flask使用就不过多介绍,本项目只使用最基本的flask,蓝图功能也可以忽略
收集相关数据当然是所有机器学习项目的重中之重,当然也是个相当困难的问题。
词库、语料库在国内及其匮乏,当然就需要爬虫的帮忙来获取一些资源,由于意图识别的需要,我们首先需要取得分类词库,即对一些专有名词的标注,如{肖申克的救赎:电影} 这样的指示性词语来使用elasticsearch进行搜索,可以有效保证准确率并且效率会对比模型预测有着很大的提高。
输入法的词库可以说是目前比较好的词库来源,当然诸如百度输入法词库等都可以拿来进行使用,由于对爬虫一开始掌握知识不足,并且搜狗的scel文件解码我确实也是一知半解,所以参考了
搜狗词库爬虫在github上的一个开源项目,使用了其解码方法scel2Text.py,然后在网上学习了下搜狗如何爬取,以下是爬取代码:
import urllib3 from bs4 import BeautifulSoup from fake_useragent import UserAgent from urllib.request import * import requests import sys def callbackfunc(blocknum, blocksize, totalsize): '''回调函数 @blocknum: 已经下载的数据块 @blocksize: 数据块的大小 @totalsize: 远程文件的大小 ''' percent = 100.0 * blocknum * blocksize / totalsize if percent > 100: percent = 100 sys.stdout.write("\r%6.2f%%"% percent) sys.stdout.flush() if __name__ == '__main__': BaseUrl = "http://pinyin.sogou.com" homePageUrl = "https://pinyin.sogou.com/dict/" html = urlopen(url=homePageUrl).read() soup = BeautifulSoup(html,"html.parser") soup = soup.find(id="dict_category_show").find_all("div", class_="dict_category_list") fc = 0 sc = 0 tc = 0 for ii in soup: # fc += 1 firstClass = ii.find(class_='dict_category_list_title').find('a').contents[0] print("Level 1 :" + firstClass) if firstClass != "生活": continue for k in ii.find(class_='catewords').find_all('a'): secondclass = k.contents[0] if secondclass != "饮食": continue secondUrl = BaseUrl + "%s" % (k['href']) print(" " * 4 + "Level 2 :" + secondclass) # + " " * 8 + secondUrl sc += 1 soup2 = BeautifulSoup(urlopen(secondUrl).read(), "html.parser") totalpagenum = soup2.find(id='dict_page_list').find('ul').find_all('span')[-2].a.contents[0] for pageind in range(1, int(totalpagenum) + 1): soup2 = BeautifulSoup( urlopen("%s/default/%d" % (secondUrl.replace("?rf=dictindex", ""), pageind)).read(), "html.parser") for kk in soup2.find_all('div', class_='dict_detail_block'): thirdclass = kk.find(class_='detail_title').find('a').contents[0] thirdUrl = kk.find(class_='dict_dl_btn').a['href'] print(" " * 8 + "Level 3 :" + thirdclass + " " * 10 + "Downloading.....") tc += 1 urlretrieve(url=thirdUrl, filename="saveScel/%s-%s.scel" % (secondclass, thirdclass), reporthook=callbackfunc) print("Total :%d, %d, %d" % (fc, sc, tc))
有所参考,见谅
爬取后为scel文件,使用解码的scel2Text.py执行后会获得txt文本,大致内容如下:
饮食-中国小吃.txt
艾蒿饽饽
安多面片
安康窝窝面
八宝酿枇杷
八宝山药泥
八宝甜粽
八宝银耳羹
八宝油糕
八卦鱼肚
吧啦饼
由于搜狗词库对店铺的描述太少,而我们项目着重于类似美团的意图识别接口,需要的门类搜狗词库大多没有,所以以上资源还是不足,我就去了多个网站寻找资源。
美团、饿了么想获取太难,token验证以及用户行为识别做的太好,所以不太敢动。
正当发愁的时候,一个网站进入了我的眼睛,买购网这个网站简单研究一下其实爬数据比较简单,而且对于美食相关的分类比较契合我们项目的门类,然后学网络安全的同学又推荐了selenium这个包,确实很好用,而且在运行时需要借助浏览器进行运行,完全伪装成浏览器浏览的样子,并且方法也简单易懂,所以研究网站加写代码2个小时左右就调试完成了,下面是我的代码:
from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException import time class WebSpider: chrome_path = "chromedriver所在位置" browser = webdriver.Chrome(executable_path=chrome_path) url = "https://www.maigoo.com/brand/list_1321.html?dynamic=1&catid=1322" savePath = "保存路径" index = 0 total = 0 @classmethod def save_target_content(cls): print("累计完成10次点击") content_list = [] input_first = cls.browser.find_elements_by_class_name("brandbang") print("第一层长度", len(input_first)) for i in range(cls.index, len(input_first)): input_second = input_first[i].find_elements_by_class_name("info") # 解析第二层 cls.total += len(input_second) for content in input_second: # 获取店铺名 targetContent = content.find_element_by_tag_name("a").text content_list.append(targetContent) print(content_list) cls.index = len(input_first) - 1 print("开始进行保存...") with open(cls.savePath, "a+", encoding="utf-8") as saveFile: for data in content_list: saveFile.write(data + "\n") saveFile.close() content_list.clear() print("完成保存...") @classmethod def get_category_names(cls): cls.browser.get(cls.url) click_times = 0 while True: # 进行循环点击事件 try: clickElement = cls.browser.find_element_by_class_name("blockmorebox"). \ find_element_by_class_name("morebtn") click_text = clickElement.find_element_by_tag_name("span").text if click_text == "已经到底了": # 将剩余的内容进行存储 print("已经到底") cls.save_target_content() click_times = 0 print("总条数:", cls.total) break sleepTime = 0.5 time.sleep(sleepTime) # print("此次点击相隔时长", sleepTime) clickElement.click() except StaleElementReferenceException: clickElement = cls.browser.find_element_by_class_name("blockmorebox"). \ find_element_by_class_name("morebtn") clickElement.click() click_times += 1 if click_times == 10: # 更新index并且清空click次数 cls.save_target_content() click_times = 0 if __name__ == '__main__': WebSpider.get_category_names()
比较菜,所以也就没能写成全品类循环爬取
大概思路就是通过点击事件点击加载更多按钮,点击10次就将所有店铺名保存一次,保存一下当前brandbang的div的个数,然后再进入下一次循环。
尴尬的是我们项目还需要电影门类,所以不得不再访问豆瓣进行爬取,当然使用selenium包这件事就比较简单了,只需要调查一下豆瓣网站结构,爬下来电影名当然是水到渠成。
from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException import time class DoubanSpider: chrome_path = "chromedriver.exe所在位置" browser = webdriver.Chrome(executable_path=chrome_path) url = "https://movie.douban.com/explore#!type=movie&tag={}&sort=recommend" save_path = "保存路径" index = 0 @classmethod def get_movie_titles(cls, movie_classify): cls.url = cls.url.format(movie_classify) cls.browser.get(url=cls.url) movie_title_list = [] click_time = 0 while True: try: click_element = cls.browser.find_element_by_class_name("more") # 停顿1秒,防止点击过快 time.sleep(1) click_element.click() click_time += 1 except StaleElementReferenceException: continue # 如果点击够5次,进行电影名称采集以及保存 if click_time == 5: target_divs = cls.browser.find_elements_by_class_name("cover-wp") for i in range(cls.index, len(target_divs)): movie_title = target_divs[i].find_element_by_tag_name("img").get_attribute("alt") movie_title_list.append(movie_title) cls.index = len(target_divs)-1 print("当前保存电影名:", movie_title_list) # 进行保存 print("开始进行保存....") with open(cls.save_path, "a+", encoding="utf-8") as target_file: for mov_title in movie_title_list: target_file.write(mov_title+"\n") movie_title_list.clear() target_file.close() print("保存成功...") click_time = 0 if __name__ == '__main__': DoubanSpider.get_movie_titles("欧美")
get_movie_title传入参数为电影类别,访问原网站即可很容易理解代码内容
使用mongodb、elasticsearch、jieba等当然需要这样那样的工具类,所以这一步也相当关键,可以很大程度上减少项目后续的复杂度,这里大体贴一下我自己写的一部分工具类,希望有所帮助(当然,以后也方便我自己过来粘贴haha)
MongoDB连接工具类
MongoConn.py
from pymongo import MongoClient import json from utils.Log import log with open("./cfg.json") as file: jsonFile = json.load(file) mongoHost = jsonFile["mongoHost"] mongoPort = jsonFile["mongoPort"] username = jsonFile["username"] password = jsonFile["password"] class MongoConn: def __init__(self, database, collection): self.host = mongoHost self.port = mongoPort self.database = database self.collection = collection self.client = MongoClient(host=self.host, port=self.port, username=username if username!="" else None, password=password if password!="" else None) log.logger.debug("mongo connected") def __enter__(self): if self.database and self.collection: return self.client[self.database][self.collection] def __exit__(self, exc_type, exc_val, exc_tb): self.client.close()
使用时只需要
with MongoConn(database,collection) as conn:
conn.find()
即可方便实用
elasticsearch连接类
Elastic.py
from elasticsearch import Elasticsearch, helpers from utils.MongoConn import MongoConn from threading import Thread import json from utils.Log import log import time with open("./cfg.json") as file: jsonFile = json.load(file) class Elastic: esHost = jsonFile["elasticHost"] esPort = jsonFile["elasticPort"] elastic = Elasticsearch(":".join([esHost, esPort]), timeout=3000) log.logger.info("elastic connected") query = { "sort": [ {"_score": {"order": "desc"}} ], "query": { "multi_match": { "query": "", "fields": ["name", "parentName"], "type": "phrase", "tie_breaker": 0.3 } } } precise_query = { "query": { "bool": { "should": [ {"match_phrase": {"name": ""}}, {"match_phrase": {"parentName": ""}} ] } } } @classmethod def search(cls, words, index): word_str = " ".join(words) if isinstance(words, list) else words cls.query["query"]["multi_match"]["query"] = word_str return cls.elastic.search(index=index, doc_type="text", body=cls.query) @classmethod def precise_search(cls, words, index): """ 精确搜索模式 :param words: 搜索词 :param index: 搜索库名 :return: """ cls.precise_query["query"]["bool"]["should"][0]["match_phrase"]["name"] = words cls.precise_query["query"]["bool"]["should"][1]["match_phrase"]["name"] = words return cls.elastic.search(index=index, doc_type="text", body=cls.query) @classmethod def get_search_result(cls, words, index): ''' 返回查询结果列表 :param words: :param index: :return: ''' return cls.search(words=words, index=index)["hits"]["hits"] @classmethod def add_document(cls, index, body): try: cls.elastic.index(index=index, body=body, doc_type="_doc") except Exception: log.logger.error("elastic add failed") @classmethod def delete_document(cls, index, id_value): """ 删除文档 :param index: :param id_value: """ try: cls.elastic.delete(index=index, doc_type="_doc",id=id_value) except Exception: log.logger.error("elastic delete failed") @classmethod def delete_by_query(cls,index, **condition): delete_query = {"query": {"match": condition}} cls.elastic.delete_by_query(index=index, body=delete_query, doc_type="_doc") @classmethod def update_by_query(cls, index, **condition): update_query = {"query": {"match": condition}} cls.elastic.update_by_query(index=index, body=update_query, doc_type="_doc") @classmethod def create_index(cls, collection, fields=None)->(str, list): assert isinstance(collection, str) with MongoConn("machine_learning", collection) as col: if fields: ret = col.find(filter=fields) else: ret = col.find() map(lambda x: (delattr(x, x["_id"])), ret) actions = [{ "_index": collection.lower(), "_type": "text", "_source": { key: value } } for key, value in enumerate(ret) ] start = time.time() Thread(target=helpers.bulk(cls.elastic, actions=actions)) spend = time.time()-start log.logger.info("create spend time:{}".format(spend))
MongoDB库导入ElasticSearch中工具类
from bson import ObjectId from utils.MongoConn import MongoConn import json import time from elasticsearch import Elasticsearch, helpers with open("./cfg.json") as file: jsonFile = json.load(file) class ElasticExport: esHost = jsonFile["elasticHost"] esPort = jsonFile["elasticPort"] elastic = Elasticsearch(":".join([esHost, esPort]), timeout=3000) @classmethod def export_mongo_to_elastic(cls): """ 描述: 将mongodb数据库中数据导入到elastic中 @Author: joker """ database = "machine_learning" collections = ["amusement", "corpus", "division_labels", "food", "hotel", "movie"] actions = [] # 将所有collection进行导出 for collectionName in collections: with MongoConn(database=database, collection=collectionName) as conn: ret = conn.find() if ret is None: continue for idx, item in enumerate(ret): content = { "_index": collectionName, "_type": "text", "_id": idx, "_source": { key: value for key, value in item.items() if type(value) is not ObjectId } } print(content) actions.append(content) start = time.time() helpers.bulk(cls.elastic, actions) end = time.time() print("耗时:", end - start) if __name__ == '__main__': ElasticExport.export_mongo_to_elastic()
ElasticSearch作为搜索引擎我必须毫不夸张的说,速度快并且准确性高,其中的打分机制更是能够判定当前搜索的匹配程度,有利于搭建模型的一系列评估。
希望以上所写能够对大家有所帮助。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。