赞
踩
首先,导入需要的库
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2022/3/30 21:51
-
- #导包
- import warnings
- from pandas import DataFrame, concat
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import jieba
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
- warnings.filterwarnings('ignore') #忽视警告
然后定义导入数据的函数。
- def get_data(data_file): #定义一个读取文件的函数
- """
- 解析文本数据
- :param data_file: 数据文件
- :return: 分词结果,标记
- """
- target = []
- data = []
- with open(data_file, 'r', encoding='utf-8') as f: #打开文件
- for line in f.readlines(): #按行读取
- line = line.strip().split("\t") #滤除行首行尾空格,以\t作为分隔符,对这行进行分解
- if len(line) == 1:
- continue
- target.append(int(line[0])) #\t之前的是标签
- data.append(line[1]) #\t之后的是文本数据
- # data = list(map(jieba.lcut, data))
- # data = [" ".join(d) for d in data]
- return data, target #返回文本和标签列表
-
- # 导入数据
- print('正在导入数据!')
- DATAFILE = "data.txt"
- data, target = get_data(DATAFILE) #读取数据
- data = DataFrame(data) #将文本转换化成表格形式
- target = DataFrame(target) #将标签转化成表格形式
- df = concat([data, target], axis=1) #拼接为一个表格
- df.columns = ['text', 'label']
- print('数据导入完成!显示前5行数据:')
- print(df.head(5))
对数据进行预处理。预处理内容包括用jieba库将句子分词处理成单词,然后再将单词转换为词向量,进而将每句话转换成了一个向量,便于输入网络进行训练。
- # 数据预处理
- print('开始数据预处理')
- print('1、按标签打乱训练数据!')
- df = df.sample(frac=1) # 将正面文本数据与负面文本数据进行打乱
- print('数据已按标签打乱!显示前5行:')
- print(df.head(5)) #显示前5行
-
- # 分词处理
- import jieba
- word_cut = lambda x: jieba.lcut(x)
- print('2、分词处理!')
- df['words'] = df["text"].apply(word_cut) #分词处理
- print('分词已完成!显示前5行:')
- print(df.head())
-
- # 去除停用词
- with open("hit_stopwords.txt", "r", encoding='utf-8') as f:
- stopwords = f.readlines()
-
- stopwords_list = []
- for each in stopwords:
- stopwords_list.append(each.strip('\n'))
-
- # 添加自定义停用词
- stopwords_list += ["…", "也", ".", "都", "是", "而", "了"," "]
-
- def remove_stopwords(ls): # 去除停用词
- return [word for word in ls if word not in stopwords_list]
-
- print('3、去除停用词处理!')
- df['去除停用词后的数据'] = df["words"].apply(lambda x: remove_stopwords(x))
- print('去除停用词完成!显示前5行:')
- print(df.head(5))
-
- # 词词向量处理
- from gensim.models.word2vec import Word2Vec
- x = df["去除停用词后的数据"] #处理对象是去除停用词后的数据
- # 训练 Word2Vec 浅层神经网络模型
- w2v = Word2Vec(vector_size=300, # 是指特征向量的维度,默认为100。
- min_count=10) # 可以对字典做截断. 词频少于min_count次数的单词会被丢弃掉, 默认值为5。
- w2v.build_vocab(x)
- w2v.train(x,
- total_examples=w2v.corpus_count,
- epochs=20)
- # 保存 Word2Vec 模型及词向量
- w2v.save('w2v_model.pkl')
-
- # 将文本转化为向量
- def average_vec(text):
- vec = np.zeros(300).reshape((1, 300)) #每个单词对应一个1行300列的向量
- for word in text:
- try:
- vec += w2v.wv[word].reshape((1, 300)) #每句话的向量等于每个单词的向量相加
- except KeyError:
- continue
- return vec
-
- # 将词向量保存为 Ndarray
- print('4、词向量处理!')
- x_vec = np.concatenate([average_vec(z) for z in x]) #把训练数据处理成词向量
- y = df['label'].values
- print('词向量处理完成!')
- print('数据预处理完成!')
接下来开始训练我们的网络,这里使用的是sklearn中的K邻近算法。
- #划分训练集,测试集
- from sklearn.model_selection import train_test_split
- X_train,X_test,y_train,y_test = train_test_split(x_vec,y,test_size=0.3)
-
- ####定义模型
- from sklearn.metrics import accuracy_score
- from sklearn.metrics import mean_squared_error
- from sklearn.neighbors import KNeighborsClassifier
- from sklearn.model_selection import train_test_split
-
- def train(k=5): #定义训练函数
- # 创建分类器
- clf = KNeighborsClassifier(n_neighbors=k) #k取5
- # 训练数据
- clf.fit(X_train, y_train)
- # 测试数据
- print('训练完成!')
- print('开始在验证集测试准确率!')
- predictions = clf.predict(X_test)
- print('Accuracy:', accuracy_score(y_test, predictions))
- return clf,predictions
-
- print('正在训练模型!')
- clf,predictions=train(k=6)
模型训练好后,我们拿来测试。
这里使用了两种测试方法,mode=1时我们使用手动输入的方式进行测试 ,这样方便演示这个模型的功能。mode=2时我们可以直接使用test.txt文件中的数据进行识别,并计算准确率和识别消耗的时间。
- num=10 #手动输入条数
- def one_pridect(text):
- words = word_cut(text)
- words = remove_stopwords(words)
- words = average_vec(words)
- result = clf.predict(words.reshape(1, -1))
- return result
-
- import time
- mode=2 #mode=1时手动输入,用于演示, mode=2时,计算100条测试样本
- if mode == 1:
- result_list=[]
- time_all=0
- correct=0
- for i in range(num):
- a=input('测试数据:')
- time_start = time.time() # 记录开始时间
- result=one_pridect(a)
- if result==0:
- print('正常语句')
- else:
- print('内含诈骗信息')
- time_end = time.time() # 记录结束时间
- time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
- print('识别用时%f' %time_sum)
- result_list.append(result)
- time_all+=time_sum
- error=np.sum(result_list)
- correct=num-error
- accuracy=accuracy_score(y_test, predictions)
- print('正常语句数量:%d' %correct)
- print('内含诈骗信息语句数量:%d' % error)
- print('总用时:%f' %time_all)
- elif mode==2 :
- TESTFILE="test.txt"
- data, target = get_data(TESTFILE) # 读取数据
- data = DataFrame(data) # 将文本转换化成表格形式
- target = DataFrame(target) # 将标签转化成表格形式
- df = concat([data, target], axis=1) # 拼接为一个表格
- df.columns = ['text', 'label']
- df['words'] = df["text"].apply(word_cut) # 分词处理
- df['去除停用词后的数据'] = df["words"].apply(lambda x: remove_stopwords(x))
- x = df["去除停用词后的数据"] # 处理对象是去除停用词后的数据
- x_vec = np.concatenate([average_vec(z) for z in x]) # 把训练数据处理成词向量
- y = df['label'].values
- time_start = time.time() # 记录开始时间
- predictions = clf.predict(x_vec)
- time_end = time.time() # 记录结束时间
- time_sum = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
- print('识别100条数据用时%f' % time_sum)
- print('Accuracy:', accuracy_score(y, predictions)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。