赞
踩
关于新型冠状病毒感染的肺炎的研究,研究学者已经发出了大量论文,由kaggle提供的COVID-19的数据集被认为是搜集最全的相关论文。该数据集提供了论文网站和论文摘要。
本项目爬取了网页版的COVID-19论文集(未爬取只提供pdf的),COVID-19论文,但是爬到1千多篇时本机IP被封。在分析处理时依然采用题目和摘要进行分析。
原始数据及处理的文件在项目地址
数据如下
结果:网络限制只能爬到一千多篇,本机IP地址已经被拉黑。
1、统计所给出的网站有哪些类型的网站
import pandas as pd import numpy as np data = pd.read_csv("F:\\metadata.csv") data_url=data['url'].values data_all = [''] * 10000 data_med = ['0'] * 10000 j = -1 data_pdf = [] count = 0 for i in data_url: if i is np.nan: count = count + 1 else: if 'pdf' in i: data_pdf.append(i) else: flag = i.split('/') if flag[2] not in data_med: j = j + 1 data_med[j] = flag[2] flag_index = data_med.index(flag[2]) data_all[flag_index] += i + ',' # 前5个是关键(0,2,3,4)
经统计data_all与data_med情况可得以下情况
2、对两个网页版的论文集进行下载。
result1=[]
result2=[]
a = data_all[0].split(',')
b = data_all[2].split(',')
import asyncio import requests async def get(url): status = requests.get(url) return status i = 0 alldata_a = '' for url in a: # 此出写入需要查的网站名 i = i + 1 if i > 1181: print(i) alldata_a = get(url) with open("F:\\alldata_a\\" + str(i), "w", encoding='utf-8') as f: f.write(str(alldata_a)) f.close() tasks = [asyncio.ensure_future(get(a)) for _ in range(5)] # print('Tasks:', tasks) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) alldata_b = '' for url in b: # 此出写入需要查的网站名 i = i + 1 print(i) with open("F:\\alldata_b\\" + str(i), "w", encoding='utf-8') as f: f.write(str(alldata_b)) f.close() tasks = [asyncio.ensure_future(get(a)) for _ in range(5)] # print('Tasks:', tasks) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
3、正则化去标签
import os import re dirna=r"F:\COVID19_Project\COVID19_Project\original_data" dirna2=r"D:\\COV19_data\\" i =0 for dirpath, dirnames, filenames in os.walk(dirna): for filename in filenames: print(filename) pa = os.path.join(dirna,filename) file = open(pa, 'r') data = file.readlines() file.close() #正则匹配 data1=re.findall('(?<=<p id="__p).*',data[0]) #寻找到文章内容 str1 = re.compile('(?<=\>).*?(?=\<)') result2=''.join(str1.findall(data1[0]))#去网页标签 re_final=re.sub('\\\\n.+', " ",result2)#删除特定字符 f2=open(dirna2+filename,'w') #过滤的内容重新写入文件 f2.write(re_final) f2.close()
1、知乎或者其他博客上的医学单词。复制粘贴存入medical.txt文件中。
2、爬取某个医学单词网站,该网站由十多个子网页构成
import numpy as np
import requests
from lxml import html
content=[]
for i in range(45686,45809):
url = "https://www.qeto.com/article_"+str(i)+'/'
header = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36"
}
page = requests.Session().get(url, headers=header)
tree = html.fromstring(page.text)
result = tree.xpath('//div[@itemprop="text"]/text()')
content.extend(result)
其中tree.xpath里内容:
打开所爬取的网站,按f12
如图所示,点击右上角后,点击所要获取的内容,在右边代码框便有所显示。按照此格式写出//div[@itemprop="text"]
另加/text() 获取内容,存入 medical2.txt文件中。
3、正则化过滤音标和中文注释和其他无关键词,提取医学单词形成医学单词文本作为分词的前提准备
(此处为了提高自身的spark水平使用了集群平台)
4、将寻找到的医学单词传入hdfs中
使用了hdfs脚本语言
hdfs -dfs -mkdir /ori_medicaldata
hdfs -dfs -put medical.txt /ori_medicaldata`
hdfs -dfs -put medical2.txt /ori_medicaldata
初始化spark文件
def _create_spark_session(self): conf = SparkConf() # 创建spark config对象 config = ( ("spark.app.name", self.SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 # ("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY), # 设置该app启动时占用的内存用量,默认2g ("spark.master", self.SPARK_URL), # spark master的地址 # ("spark.executor.cores", self.SPARK_EXECUTOR_CORES), # 设置spark executor使用的CPU核心数,默认是1核心 # ("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES) ) conf.setAll(config) sc=SparkContext(conf=conf) # 利用config对象,创建spark session if self.ENABLE_HIVE_SUPPORT: return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() else: return SparkSession.builder.config(conf=conf).getOrCreate()
开始操作
import os import sys # 如果当前代码文件运行测试需要加入修改路径,避免出现后导包问题 BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd())) sys.path.insert(0, os.path.join(BASE_DIR)) PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON from kaggle_cov import SparkSessionBase class OriginArticleData(SparkSessionBase): SPARK_APP_NAME = "mergeArticle" SPARK_URL = "yarn" ENABLE_HIVE_SUPPORT = True def __init__(self): self.spark = self._create_spark_session() oa = OriginArticleData() sc = oa.spark.sparkContext #SparkSession提供sparkContext medical_data = sc.textFile(medical_word) import re medical_data = medical_data.flatMap(lambda x: re.findall('[a-zA-Z][a-zA-Z]{3,}',x) ) medical_data.saveAsTextFile("/medicalword")
使用kaggle网站提供的数据集中的摘要
#文档读取阶段
medicalword="/COVID19_data/medicalword"
metadata="/COVID19_data/metadata.csv"
stopwords="/COVID19_data/stopwords.txt"
stopwords_data=sc.textFile(stopwords)
medicalword_data=sc.textFile(medicalword)
df = oa.spark.read.csv(metadata, header=True, escape='"')
#header=true指给出表头的值,不给下面select根据表中列名取不到
import pyspark.sql.functions as F
needdata = df.select(df['cord_uid'],df['title'],F.concat_ws(",",df['abstract'],df['title']).alias("sentence"))`
``#F.concat_ws将两列以逗号合并
def get_stopwords_list():
stopwords_list=stopwords_data.flatMap(lambda x:x.split(" ")).distinct().collect()
return stopwords_list
def get_medicalword_list():
medicalword_list=medicalword_data.flatMap(lambda x:x.split(" ")).distinct().collect()
return medicalword_list
stopwords_list = get_stopwords_list()
medicalword_list = get_medicalword_list()
def segmentation(partition):
import jieba
def cut_sentence(sentence):
seg_list = jieba.lcut(sentence)
seg_list=[i for i in seg_list if i not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
if seg in medicalword_list:
filtered_words_list.append(seg)
return filtered_words_list
for row in partition:
sentence = row.sentence
words = cut_sentence(sentence)
yield row.cord_uid,words
#通过mapPartitions方式返回迭代对象
words_df = needdata.rdd.mapPartitions(segmentation).toDF(["cord_uid", "words"])
rdd = words_df.select('words').na.drop()
count = rdd.flatMap(lambda x :x).flatMap(lambda x :x).map(lambda x :(x,1)).reduceByKey(lambda x,y:x+y).collect()..map(lambda x:(x[1],x[0])).sortByKey(ascending=False).map(lambda x :(x[1],x[0]))
得到如下结果
from pyspark.ml.feature import CountVectorizer
# 总词汇的大小,文本中必须出现的次数
cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200, minDF=1.0)
#CountVectorizer将根据语料库中的词频排序从高到低进行选择,词汇表的最大含量由vocabsize超参数来指定,超参数minDF,则指定词汇表中的词语至少要在多少个不同文档中出现。
# 训练词频统计模型
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://192.168.19.137:9000/COVID19_data/CV.model") #虚拟机运行很久的结果,以防虚拟机崩溃,重新运行,将模型结果保存
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://192.168.19.137:9000/COVID19_data/CV.model")#CountVectorizerModel加载hdfs的CountVectorizerModel类型文件
cv_result = cv_model.transform(words_df)
#最终会返回一个已编码的向量, 其长度为索引的个数,该向量还携带有文档中每个单词出现的次数信息。
idfModel.write().overwrite().save("hdfs://192.168.19.137:9000/COVID19_data/IDF.model")
tfidf_result = idf_model.transform(cv_result)
def func(partition):
TOPK = 20
for row in partition:
# 找到索引与IDF值并进行排序
_ = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
_ = sorted(_, key=lambda x: x[1], reverse=True)
result = _[:TOPK]
for word_index, tfidf in result:
yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)
_keywordsByTFIDF = tfidf_result.rdd.mapPartitions(func).toDF(["cord_uid", "index", "tfidf"])
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。