赞
踩
本文实现的TextCNN模型来源于以下两篇论文
1、Convolutional Neural Networks for Sentence Classification.
2、A Sensitivity Analysis of (and Practitioners’ Guide to) ConvolutionalNeural Networks for Sentence Classification
网络模型结构如下图:
输入表示的是一句话或者一段文字,文字不像图片、语音信号,不是天然的数值类型,需要将一句话处理成数字之后,才能作为神经网络的输入。每个词使用一个向量进行表示,这个向量称为词向量。每一句话可以表示成一个二维的矩阵,如上图,“I like this movie very much!”,先对这句话进行分词,得到7个词(此处包括!),倘若每个词的词向量的维度d=5,则这句话可以表示成一个7x5的矩阵。需要注意的是,神经网络的结构输入的shape是固定的,但是每一篇评论的长度是不固定的,所以我们要固定神经网络输入的词数量。比如人为设定一篇评论的最大词数量sentence_max_size=300,d=5,则输入为300x5。对超过300词的评论进行截断,不足300词的进行padding,补0
对输入进行卷积操作,NLP中的卷积操作与图像的卷积操作略有不同,图像的卷积核一般为正方形,而NLP中的卷积核一般为矩形。对于一个7x5的input,卷积核的宽度width=词向量的大小,长度的取值按需选取。在论文中,选取了三种大小的的卷积核[2,3,4],也就是说卷积核的大小分别为2x5,3x5,4x5(分别对应黄色,绿色,黄色区域的矩形),每种卷积核的个数为2。每个卷积核经过卷积操作之后,会得到一个向量,即一共会得到6个向量
论文中使用MaxPooling Over Time,即在每个向量中选取一个最大值,6个向量则会选出6个最大值,然后将这6个最大值拼接,作为全连接层的输入
论文实现的是二分类问题,所以全连接层是6x2的,即输入是6维,输出是2维
本文的目的是训练TextCNN,实现对IMDB数据集进行二分类
数据集下载地址如下:IMDB数据集
解压后目录结构如下,其中neg与pos目录下存放的是评论的txt文件
get_file_list(source_dir):扫描文件夹source_dir下的所有文件,并将所有文件的路径名保存在file_list中
get_label_list(file_list):根据file_list,从文件路径名中提取出文件对应的label
def get_file_list(source_dir): file_list = [] # 文件路径名列表 # os.walk()遍历给定目录下的所有子目录,每个walk是三元组(root,dirs,files) # root 所指的是当前正在遍历的这个文件夹的本身的地址 # dirs 是一个 list ,内容是该文件夹中所有的目录的名字(不包括子目录) # files 同样是 list , 内容是该文件夹中所有的文件(不包括子目录) # 遍历所有评论 for root, dirs, files in os.walk(source_dir): file = [os.path.join(root, filename) for filename in files] file_list.extend(file) return file_list def get_label_list(file_list): # 提取出标签名 label_name_list = [file.split("\\")[4] for file in file_list] # 标签名对应的数字 label_list = [] for label_name in label_name_list: if label_name == "neg": label_list.append(0) elif label_name == "pos": label_list.append(1) return label_list
可以使用google或者glove训练好的词向量模型,本文使用glove的300维的词向量模型,下载地址:glove vectors
但是下载的glove词向量模型,gensim不能拿来直接使用,运行以下代码得到文件glove.model.6B.300d.txt,该文件可供gensim直接使用
import gensim import shutil from sys import platform # 计算行数,就是单词数 def getFileLineNums(filename): f = open(filename, 'r', encoding="utf8") count = 0 for line in f: count += 1 return count # Linux或者Windows下打开词向量文件,在开始增加一行 def prepend_line(infile, outfile, line): with open(infile, 'r', encoding="utf8") as old: with open(outfile, 'w', encoding="utf8") as new: new.write(str(line) + "\n") shutil.copyfileobj(old, new) def prepend_slow(infile, outfile, line): with open(infile, 'r', encoding="utf8") as fin: with open(outfile, 'w', encoding="utf8") as fout: fout.write(line + "\n") for line in fin: fout.write(line) def load(filename): num_lines = getFileLineNums(filename) gensim_file = 'E:/data_source/glove.6B/glove.model.6B.300d.txt' gensim_first_line = "{} {}".format(num_lines, 200) # Prepends the line. if platform == "linux" or platform == "linux2": prepend_line(filename, gensim_file, gensim_first_line) else: prepend_slow(filename, gensim_file, gensim_first_line) model = gensim.models.KeyedVectors.load_word2vec_format(gensim_file) load('E:/data_source/glove.6B/glove.6B.300d.txt')
wv.index2word:包含了词向量模型中所有的词
wv.vectors:包含了词向量模型中所有词的词向量
embedding对象:将wv.vectors中的词向量表示成Tensor
其中wv.index2word与wv.vectors(embedding.weight)相同位置的word与vector是一一对应的,为了从embedding.weight中获得word的vector,需要得到word在的index2word中的index,所以需要使用字典word2id 将其保存起来
word2vec_dir="glove.6B.300d.txt"# 训练好的词向量文件
# 加载词向量模型
wv = KeyedVectors.load_word2vec_format(datapath(word2vec_dir), binary=False)
word2id = {} # word2id是一个字典,存储{word:id}的映射
for i, word in enumerate(wv.index2word):
word2id[word] = i
# 根据已经训练好的词向量模型,生成Embedding对象
embedding = nn.Embedding.from_pretrained(torch.FloatTensor(wv.vectors))
sentence是一个list,对输入的一篇评论的内容进行分词,过滤停用词之后,便得到sentence
根据sentence,得到一篇评论的Tensor表示,需要注意的是:我们定义的神经网络的输入是四维的[batch_size,channel,sentence_max_size,vec_dim],第一维是批大小,第二维是通道数,这里输入通道均为1,第三维是词数量,第四维是词向量的维度
def generate_tensor(sentence, sentence_max_size, embedding, word2id): """ 对一篇评论生成对应的词向量矩阵 :param sentence:一篇评论的分词列表 :param sentence_max_size:认为设定的一篇评论的最大分词数量 :param embedding:词向量对象 :param word2id:字典{word:id} :return:一篇评论的词向量矩阵 """ tensor = torch.zeros([sentence_max_size, embedding.embedding_dim]) for index in range(0, sentence_max_size): if index >= len(sentence): break else: word = sentence[index] if word in word2id: vector = embedding.weight[word2id[word]] tensor[index] = vector elif word.lower() in word2id: vector = embedding.weight[word2id[word.lower()]] tensor[index] = vector return tensor.unsqueeze(0) # tensor是二维的,必须扩充为三维,否则会报错
一知文件结构如下:
训练集保存在一个个小文件中,对于小数据集来说,一次性将所有数据读入内存勉强可行,但对于大数据集则是不可行的。此时,通过继承Dataset来实现自己的MyDataset,主要重写以下几个方法(方法名前后均有两道下划线,显示不出):
class MyDataset(Dataset): def __init__(self, file_list, label_list, sentence_max_size, embedding, word2id, stopwords): self.x = file_list self.y = label_list self.sentence_max_size = sentence_max_size self.embedding = embedding self.word2id = word2id self.stopwords = stopwords def __getitem__(self, index): # 读取评论内容 words = [] with open(self.x[index], "r", encoding="utf8") as file: for line in file.readlines(): words.extend(segment(line.strip(), stopwords)) # 生成评论的词向量矩阵 tensor = generate_tensor(words, self.sentence_max_size, self.embedding, self.word2id) return tensor, self.y[index] def __len__(self): return len(self.x)
get_file_list()与get_label_list()函数详见2.2
Dataloader是个可遍历的对象,batch_size表示批大小,shuffle表示是否打乱数据
# 获取训练数据
logging.info("获取训练数据")
train_set = get_file_list(train_dir)
train_label = get_label_list(train_set)
train_dataset = MyDataset(train_set, train_label, sentence_max_size, embedding, word2id, stopwords)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 获取测试数据
logging.info("获取测试数据")
test_set = get_file_list(test_dir)
test_label = get_label_list(test_set)
test_dataset = MyDataset(test_set, test_label, sentence_max_size, embedding, word2id, stopwords)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
class TextCNN(nn.Module): def __init__(self, vec_dim, filter_num, sentence_max_size, label_size, kernel_list): """ :param vec_dim: 词向量的维度 :param filter_num: 每种卷积核的个数 :param sentence_max_size:一篇文章的包含的最大的词数量 :param label_size:标签个数,全连接层输出的神经元数量=标签个数 :param kernel_list:卷积核列表 """ super(TextCNN, self).__init__() chanel_num = 1 # nn.ModuleList相当于一个卷积的列表,相当于一个list # nn.Conv1d()是一维卷积。in_channels:词向量的维度, out_channels:输出通道数 # nn.MaxPool1d()是最大池化,此处对每一个向量取最大值,所有kernel_size为卷积操作之后的向量维度 self.convs = nn.ModuleList([nn.Sequential( nn.Conv2d(chanel_num, filter_num, (kernel, vec_dim)), nn.ReLU(), # 经过卷积之后,得到一个维度为sentence_max_size - kernel + 1的一维向量 nn.MaxPool2d((sentence_max_size - kernel + 1, 1)) ) for kernel in kernel_list]) # 全连接层,因为有2个标签 self.fc = nn.Linear(filter_num * len(kernel_list), label_size) # dropout操作,防止过拟合 self.dropout = nn.Dropout(0.5) # 分类 self.sm = nn.Softmax(0) def forward(self, x): # Conv2d的输入是个四维的tensor,每一位分别代表batch_size、channel、length、width in_size = x.size(0) # x.size(0),表示的是输入x的batch_size out = [conv(x) for conv in self.convs] out = torch.cat(out, dim=1) out = out.view(in_size, -1) # 设经过max pooling之后,有output_num个数,将out变成(batch_size,output_num),-1表示自适应 out = F.dropout(out) out = self.fc(out) # nn.Linear接收的参数类型是二维的tensor(batch_size,output_num),一批有多少数据,就有多少行 return out
train_loader就是一个Dataloader对象,是个可遍历对象。迭代次数为epoch,每训练一批数据则输出该批数据的平均loss
可以下载我已经训练好模型进行测试:链接:https://pan.baidu.com/s/1Gxu9Wt0lTcTNUsZlg0dLyQ 提取码:8fd8
复制这段内容后打开百度网盘手机App,操作更方便哦
def train_textcnn_model(net, train_loader, epoch, lr): print("begin training") net.train() # 必备,将模型设置为训练模式 optimizer = optim.Adam(net.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() for i in range(epoch): # 多批次循环 for batch_idx, (data, target) in enumerate(train_loader): optimizer.zero_grad() # 清除所有优化的梯度 output = net(data) # 传入数据并前向传播获取输出 loss = criterion(output, target) loss.backward() optimizer.step() # 打印状态信息 logging.info("train epoch=" + str(i) + ",batch_id=" + str(batch_idx) + ",loss=" + str(loss.item() / 64)) print('Finished Training')
test_loader也是一个Dataloader对象,累计每个batch的正确个数,并且每个batch都计算一次当前的accuracy
最终在测试集上的预测正确率为84%
def textcnn_model_test(net, test_loader):
net.eval() # 必备,将模型设置为训练模式
correct = 0
total = 0
test_acc = 0.0
with torch.no_grad():
for i, (data, label) in enumerate(test_loader):
logging.info("test batch_id=" + str(i))
outputs = net(data)
# torch.max()[0]表示最大值的值,troch.max()[1]表示回最大值的每个索引
_, predicted = torch.max(outputs.data, 1) # 每个output是一行n列的数据,取一行中最大的值
total += label.size(0)
correct += (predicted == label).sum().item()
print('Accuracy of the network on test set: %d %%' % (100 * correct / total))
先将一篇评论进行分词,去掉停用词,然后将每个词转换为对应的词向量,未登陆词使用零向量表示。最终每篇评论,会变成[sentence_max_size,vec_dim]的矩阵,但需要注意的是:我们定义的神经网络的输入是四维的[batch_size,channel,sentence_max_size,vec_dim],第一维是批大小,第二维是通道数,这里输入通道均为1,第三维是词数量,第四维是词向量的维度,batch_size的大小由Dataloader定义,在generate_tensor()方法中返回的是一个三维的数据。先定义了tensor = torch.zeros([sentence_max_size, embedding.embedding_dim]),然后再调用tensor.unsqueeze(0)将tensor扩展成3维[1,sentence_max_size, embedding.embedding_dim]
def generate_tensor(sentence, sentence_max_size, embedding, word2id): """ 对一篇文章生成对应的词向量矩阵 :param sentence:一篇文章的分词列表 :param sentence_max_size:认为设定的一篇文章的最大分词数量 :param embedding:词向量对象 :param word2id:字典{word:id} :return:一篇文章的词向量矩阵 """ tensor = torch.zeros([sentence_max_size, embedding.embedding_dim]) for index in range(0, sentence_max_size): if index >= len(sentence): break else: word = sentence[index] if word in word2id: vector = embedding.weight[word2id[word]] tensor[index] = vector elif word.lower() in word2id: vector = embedding.weight[word2id[word.lower()]] tensor[index] = vector return tensor.unsqueeze(0) # tensor是二维的,必须扩充为三维,否则会报错
使用如下代码可以调试,了解TextCNN模型中数据在各层的shape,在forward()函数中打断点进行观察即可
import torch import torch.nn as nn import torch.nn.functional as F class TextCNN(nn.Module): def __init__(self, vec_dim, filter_num, sentence_max_size, label_size, kernel_list): """ :param vec_dim: 词向量的维度 :param filter_num: 每种卷积核的个数 :param sentence_max_size:一篇文章的包含的最大的词数量 :param label_size:标签个数,全连接层输出的神经元数量=标签个数 :param kernel_list:卷积核列表 """ super(TextCNN, self).__init__() chanel_num = 1 # nn.ModuleList相当于一个卷积的列表,相当于一个list # nn.Conv1d()是一维卷积。in_channels:词向量的维度, out_channels:输出通道数 # nn.MaxPool1d()是最大池化,此处对每一个向量取最大值,所有kernel_size为卷积操作之后的向量维度 self.convs = nn.ModuleList([nn.Sequential( nn.Conv2d(chanel_num, filter_num, (kernel, vec_dim)), nn.ReLU(), # 经过卷积之后,得到一个维度为sentence_max_size - kernel + 1的一维向量 nn.MaxPool2d((sentence_max_size - kernel + 1, 1)) ) for kernel in kernel_list]) # 全连接层,因为有2个标签 self.fc = nn.Linear(filter_num * len(kernel_list), label_size) # dropout操作,防止过拟合 self.dropout = nn.Dropout(0.5) # 分类 self.sm = nn.Softmax(0) def forward(self, x): # Conv2d的输入是个四维的tensor,每一位分别代表batch_size、channel、length、width in_size = x.size(0) # x.size(0),表示的是输入x的batch_size out = [conv(x) for conv in self.convs] out = torch.cat(out, dim=1)#按维数1(列)拼接 out = out.view(in_size, -1) # 设经过max pooling之后,有output_num个数,将out变成(batch_size,output_num),-1表示自适应 out = F.dropout(out) out = self.fc(out) # nn.Linear接收的参数类型是二维的tensor(batch_size,output_num),一批有多少数据,就有多少行 return out sentence_max_size = 300 # 每篇文章的最大词数量 batch_size = 64 filter_num = 100 # 每种卷积核的个数 kernel_list = [3, 4, 5] # 卷积核的大小 label_size = 2 vec_dim = 300 input = torch.randn(batch_size, 1, sentence_max_size, 300) net = TextCNN(vec_dim, filter_num, sentence_max_size, label_size, kernel_list) output = net(input)
假设batch_size=64,channel=1,sentence_max_size=300,vec_dim=300,kernel_list = [3, 4, 5],label_size = 2
该模型还有几个可以改进的点
代码上可改进的点
初次尝试使用python复现,代码写得比较烂,但求勿喷
import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader import torch.optim as optim from gensim.test.utils import datapath import os from gensim.models import KeyedVectors from nltk.corpus import stopwords import logging import jieba class TextCNN(nn.Module): def __init__(self, vec_dim, filter_num, sentence_max_size, label_size, kernel_list): """ :param vec_dim: 词向量的维度 :param filter_num: 每种卷积核的个数 :param sentence_max_size:一篇文章的包含的最大的词数量 :param label_size:标签个数,全连接层输出的神经元数量=标签个数 :param kernel_list:卷积核列表 """ super(TextCNN, self).__init__() chanel_num = 1 # nn.ModuleList相当于一个卷积的列表,相当于一个list # nn.Conv1d()是一维卷积。in_channels:词向量的维度, out_channels:输出通道数 # nn.MaxPool1d()是最大池化,此处对每一个向量取最大值,所有kernel_size为卷积操作之后的向量维度 self.convs = nn.ModuleList([nn.Sequential( nn.Conv2d(chanel_num, filter_num, (kernel, vec_dim)), nn.ReLU(), # 经过卷积之后,得到一个维度为sentence_max_size - kernel + 1的一维向量 nn.MaxPool2d((sentence_max_size - kernel + 1, 1)) ) for kernel in kernel_list]) # 全连接层,因为有2个标签 self.fc = nn.Linear(filter_num * len(kernel_list), label_size) # dropout操作,防止过拟合 self.dropout = nn.Dropout(0.5) # 分类 self.sm = nn.Softmax(0) def forward(self, x): # Conv2d的输入是个四维的tensor,每一位分别代表batch_size、channel、length、width in_size = x.size(0) # x.size(0),表示的是输入x的batch_size out = [conv(x) for conv in self.convs] out = torch.cat(out, dim=1) out = out.view(in_size, -1) # 设经过max pooling之后,有output_num个数,将out变成(batch_size,output_num),-1表示自适应 out = F.dropout(out) out = self.fc(out) # nn.Linear接收的参数类型是二维的tensor(batch_size,output_num),一批有多少数据,就有多少行 return out class MyDataset(Dataset): def __init__(self, file_list, label_list, sentence_max_size, embedding, word2id, stopwords): self.x = file_list self.y = label_list self.sentence_max_size = sentence_max_size self.embedding = embedding self.word2id = word2id self.stopwords = stopwords def __getitem__(self, index): # 读取文章内容 words = [] with open(self.x[index], "r", encoding="utf8") as file: for line in file.readlines(): words.extend(segment(line.strip(), stopwords)) # 生成文章的词向量矩阵 tensor = generate_tensor(words, self.sentence_max_size, self.embedding, self.word2id) return tensor, self.y[index] def __len__(self): return len(self.x) # 加载停用词列表 def load_stopwords(stopwords_dir): stopwords = [] with open(stopwords_dir, "r", encoding="utf8") as file: for line in file.readlines(): stopwords.append(line.strip()) return stopwords def segment(content, stopwords): res = [] for word in jieba.cut(content): if word not in stopwords and word.strip() != "": res.append(word) return res def get_file_list(source_dir): file_list = [] # 文件路径名列表 # os.walk()遍历给定目录下的所有子目录,每个walk是三元组(root,dirs,files) # root 所指的是当前正在遍历的这个文件夹的本身的地址 # dirs 是一个 list ,内容是该文件夹中所有的目录的名字(不包括子目录) # files 同样是 list , 内容是该文件夹中所有的文件(不包括子目录) # 遍历所有文章 for root, dirs, files in os.walk(source_dir): file = [os.path.join(root, filename) for filename in files] file_list.extend(file) return file_list def get_label_list(file_list): # 提取出标签名 label_name_list = [file.split("\\")[4] for file in file_list] # 标签名对应的数字 label_list = [] for label_name in label_name_list: if label_name == "neg": label_list.append(0) elif label_name == "pos": label_list.append(1) return label_list def generate_tensor(sentence, sentence_max_size, embedding, word2id): """ 对一篇文章生成对应的词向量矩阵 :param sentence:一篇文章的分词列表 :param sentence_max_size:认为设定的一篇文章的最大分词数量 :param embedding:词向量对象 :param word2id:字典{word:id} :return:一篇文章的词向量矩阵 """ tensor = torch.zeros([sentence_max_size, embedding.embedding_dim]) for index in range(0, sentence_max_size): if index >= len(sentence): break else: word = sentence[index] if word in word2id: vector = embedding.weight[word2id[word]] tensor[index] = vector elif word.lower() in word2id: vector = embedding.weight[word2id[word.lower()]] tensor[index] = vector return tensor.unsqueeze(0) # tensor是二维的,必须扩充为三维,否则会报错 def train_textcnn_model(net, train_loader, epoch, lr): print("begin training") net.train() # 必备,将模型设置为训练模式 optimizer = optim.Adam(net.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() for i in range(epoch): # 多批次循环 for batch_idx, (data, target) in enumerate(train_loader): optimizer.zero_grad() # 清除所有优化的梯度 output = net(data) # 传入数据并前向传播获取输出 loss = criterion(output, target) loss.backward() optimizer.step() # 打印状态信息 logging.info("train epoch=" + str(i) + ",batch_id=" + str(batch_idx) + ",loss=" + str(loss.item() / 64)) print('Finished Training') def textcnn_model_test(net, test_loader): net.eval() # 必备,将模型设置为训练模式 correct = 0 total = 0 test_acc = 0.0 with torch.no_grad(): for i, (data, label) in enumerate(test_loader): logging.info("test batch_id=" + str(i)) outputs = net(data) # torch.max()[0]表示最大值的值,troch.max()[1]表示回最大值的每个索引 _, predicted = torch.max(outputs.data, 1) # 每个output是一行n列的数据,取一行中最大的值 total += label.size(0) correct += (predicted == label).sum().item() print('Accuracy of the network on test set: %d %%' % (100 * correct / total)) # test_acc += accuracy_score(torch.argmax(outputs.data, dim=1), label) # logging.info("test_acc=" + str(test_acc)) if __name__ == "__main__": logging.basicConfig(format='%(asctime)s:%(levelname)s: %(message)s', level=logging.INFO) train_dir = "E:\\data_source\\aclImdb\\train" # 训练集路径 test_dir = "E:\\data_source\\aclImdb\\test" # 测试集路径 stopwords_dir = "data\\stopwords.txt" # 停用词 word2vec_dir = "E:\\data_source\\glove.6B\\glove.model.6B.300d.txt" # 训练好的词向量文件,写成相对路径好像会报错 net_dir = "model\\net.pkl" sentence_max_size = 300 # 每篇文章的最大词数量 batch_size = 64 filter_num = 100 # 每种卷积核的个数 epoch = 8 # 迭代次数 kernel_list = [3, 4, 5] # 卷积核的大小 label_size = 2 lr = 0.001 # 加载词向量模型 logging.info("加载词向量模型") # 读取停用表 stopwords = load_stopwords(stopwords_dir) # 加载词向量模型 wv = KeyedVectors.load_word2vec_format(datapath(word2vec_dir), binary=False) word2id = {} # word2id是一个字典,存储{word:id}的映射 for i, word in enumerate(wv.index2word): word2id[word] = i # 根据已经训练好的词向量模型,生成Embedding对象 embedding = nn.Embedding.from_pretrained(torch.FloatTensor(wv.vectors)) # # requires_grad指定是否在训练过程中对词向量的权重进行微调 # embedding.weight.requires_grad = True # 获取训练数据 logging.info("获取训练数据") train_set = get_file_list(train_dir) train_label = get_label_list(train_set) train_dataset = MyDataset(train_set, train_label, sentence_max_size, embedding, word2id, stopwords) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # 获取测试数据 logging.info("获取测试数据") test_set = get_file_list(test_dir) test_label = get_label_list(test_set) test_dataset = MyDataset(test_set, test_label, sentence_max_size, embedding, word2id, stopwords) test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True) # 定义模型 net = TextCNN(vec_dim=embedding.embedding_dim, filter_num=filter_num, sentence_max_size=sentence_max_size, label_size=label_size, kernel_list=kernel_list) # 训练 logging.info("开始训练模型") train_textcnn_model(net, train_dataloader, epoch, lr) # 保存模型 torch.save(net, net_dir) logging.info("开始测试模型") textcnn_model_test(net, test_dataloader)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。