赞
踩
在前面的几篇博客中都是针对图像进行的深度学习,那在本文中将把目光转向文本分类的深度学习,并从处理数据开始,完整实现一个简单的TextCNN
模型。目的是文本分类的TextCNN
在网络搭建的处理上和VGG-16
的过程大同小异,且它的网络结构相比VGG-16
简单很多。然而,由于我暂时没有找到面向文本数据的好用的数据转换库,所以本文更多的笔墨放在了如何根据自己的数据来搭建一个TextCNN
模型。
pytorch系列的过去几篇博客为:
本文代码主要参考了Pytorch 文本(短文本)分类任务 Demo。这是一个非常适合使用Pytorch进行文本分类的入门参考代码,里面不仅有原始的文本数据,而且还有结构清晰的数据处理模块和除TextCNN外的各种不同的分类网络结构,使用的外部库也比较基础。
和图像分类相比,文本分类的CNN实现过程有什么不同之处?其实最大的不同就在于如何将文本数据转换成类似图像的二维矩阵以供卷积。TextCNN
的原理可以参考下图。
3*3
。但是文本CNN所用的卷积核大小要大得多,通常宽度由词向量的维度数量决定,例如词向量的长度为50
时,卷积核的宽就为50
,而高度则有三种,即2,3,4,对应形成了三种不同的卷积核。由于卷积核宽度是词向量的长度,因此卷积核移动只能向下移动,卷积的结果就变成了一维的特征向量,而不是图像卷积后的二维特征图。2*2
区间, 而是整个由卷积得到的向量。也就是说,在文本上进行的一次卷积+池化过程只能得到一个特征值。256
个,这样全连接层前的特征向量能够长一点。先来了解一下使用的数据集,知己知彼才能得心应手嘛。文本数据集仍是在Pytorch02:使用自己的数据搭建VGG16网络中使用的,用来描述灾难的CrisisMMD数据集,可以在Multimodal Damage Identification for Humanitarian Computing Data Set中下载。简单起见,我们只使用其中的一个文件夹。数据集如下图,它们的命名和图片的命名完全一致,对应的分类标签在其命名中。
每个txt文件中为一段从社交媒体上爬取的文字。例如:
Homes affected by the blast. #accrafloods photo by: @teresameka we'll be distributing care packages soon, if you are interested in assembling them with us email ghanafloodrelief@gmail.com. We will also let you know how to donate soon #ghana #life
那该如何把这些原始的文本文件转成二维矩阵呢?
别急别急,我们先来构建一个关于这个数据集的单词表,这是为了获得单词和二维矩阵的映射关系。也就是说,在获得了一个单词后,应该如何将这个单词转成一个数字,而这个数字将决定我们去哪里找词向量,然后把它填入到二维矩阵中。
仿照图像中的Dataset
类,下面构建关于文本的数据集类:
import nltk.stem import re import pickle as pkl class CrisisTextDataset(object): """ 数据集类,完成词表的生成和数据集的获取 """ def __init__(self, root_dir: str, transform=None, save_path='vocab.pkl', max_chars=10000, min_freq=1): self.root_dir = root_dir # 文本所在路径 self.transform = transform # 转换Tensor对象 self.max_chars = max_chars # 词汇表能够保存的最大词汇数量 self.min_freq = min_freq # 最小词频,低于该词频的词语将被剔除 self.save_path = save_path # 词汇表序列化保存地址 self.word2index = {} # 词汇表,key=word, value=index self.textname_list = os.listdir(self.root_dir) # 获取root_dir中的所有文件 # 读入停用词表for english stopword_file = './data/stopwords/english' with open(stopword_file, 'r') as f: stopwords = f.read() # 以\n分割字符串,返回字符串列表 stopwords = stopwords.split('\n') # 遍历列表中的每个元素,strip()默认删除首尾的空白符(包括'\n', '\r', '\t', ' ') self.stopwords = [item.strip() for item in stopwords if len(item) > 0] # 生成词汇表 def generate_vocab(self) -> list: # 正则编译器,使用正则表达式,只获取字母串 cop = re.compile("[a-zA-Z]+") # 词干提取器,将不同形式的英语单词通过词干合并 ss = nltk.stem.SnowballStemmer('english') vocab = {} # 初始输入的字典 for textn in self.textname_list: # 默认文件打开的编码方式是gbk,如果遇到复杂的编码就会读取出错,改用utf-8能够满足大部分的解码需求 with open(os.path.join(self.root_dir, textn), 'r', encoding='utf-8') as f: text_content = f.read() # 转为小写 text_content = text_content.lower() # 用正则表达式过滤提取单词 word_list = cop.findall(text_content) # 去掉停用词 word_list = [w for w in word_list if w not in self.stopwords] # 提取词干 word_list = [ss.stem(w) for w in word_list] for word in word_list: # 舍弃单字母 if len(word) > 1: # 记录到字典中 # dict.get(key, default)在字典中获取key的value,如果没有找到,则返回default # dict[key]也是在字典中获取key的value,但如果没有找到会报错 # dict[key]还能够修改key对应的value值 vocab[word] = vocab.get(word, 0) + 1 # 字典按照统计的降序排序 # sorted(iterable, cmp=None, key=None, reverse=False) # cmp比较函数,从迭代对象中取两个来比较 # key函数,返回每个迭代对象要比较的内容,如在这,是取dict的value值进行比较,速度比cmp快 # 相比之下,sort函数只适用于list # lambda匿名函数,冒号前是传入参数,冒号后面是返回值,可以为表达式,可以使代码简洁,但不会加快速度 # 返回的是列表[(key, value),...] vocab_sort = sorted(vocab.items(), key=lambda x: x[1], reverse=True) # 过滤掉词频不够大的单词 vocab_sort = [item for item in vocab_sort if item[1] >= self.min_freq] # 从字典中将key取出来,形成列表,但是这个列表是按词频排序过的 # [PAD]是填充位的标记,序号为0,[UNK]是未知词汇,序号为1 word_list = ['[PAD]', '[UNK]'] + [item[0] for item in vocab_sort] # 转成字典,提高查找效率 self.word2index = {c: i for i, c in enumerate(word_list)} return self.word2index # 保存词表 def save_vocab(self): pkl.dump(self.word2index, open(self.save_path, 'wb')) # 读入词表 def load_vocab(self): self.word2index = pkl.load(open(self.save_path, 'rb'))
读入每个txt文件后进行文本的处理。
(1)转小写。
(2)使用正则表达式提出英文单词,这是因为原来的数据集中还有非英文字母的字符,如标点符号,简单的表情符号,非英语语言文字等,这些均要去除。使用正则运算可以比较方便地进行英文单词的提取,当然也可以使用其他方式,可以参考这篇博客。关于正则表达式的测试可以在这个网站上进行,能够比较快速地验证正则式的效果。
(3)去掉停用词。停用词(stopword)是指那些在文本中大量出现但是对分类没有什么作用的词,比如英文中的a, the等。一般可以从网上搜索停用词表,也可以自己构建。本文是采用了nltk.corpus
库中的英语停用词表,为了方便读者理解和借鉴使用,将该词表放在了./data/stopwords/english
路径下。自定义或者从其他途径获取的词表也可以按照该方法进行调用。
(4)针对英文单词,还需要进行词干的提取,使用的是nltk.stem.SnowballStemmer
类。该词干提取方法用的是正则过滤,过滤后的单词可能不是一个正常的英语单词,但是能让不同形式的单词映射到同一个序列中,减少词表的规模,在使用上不会影响算法的性能。提取词干的原理和源代码参见:The English (Porter2) stemming algorithm。
(5)过滤掉单字母的单词,这是考虑到文本中会产生不规范的单词。
使用字典统计每个单词出现的次数,然后根据单词的词频对单词进行排序。关于排序函数sorted
的介绍参见[转].Python中sorted函数的用法。
在字典中加入[PAD]
和[UNK]
,有两个目的:一是待会构建的二维矩阵的长度必须固定,但由于文本的长度是不固定的,所以后续需要用字符进行填充,需要用到[PAD]
;二是单词表不一定能够包含即将要处理文本中的所有单词(虽然在本文的实验中可以,但当测试集数据来源任意的时候就不能保证了),需要用[UNK]
来填充所有未知的字符。
最后,返回的word2index
就是单词表,能够将单词转换成数字序号。
词表的保存和加载。生成了词表之后,可以将它保存下来,每次使用的时候进行加载即可,不用重新生成。词表的序列化和反序列化用到了pickle
库。
那刚刚花了半天功夫构建的单词表有什么用呢?别急,接下来,我们就要用这个单词表来将所有的文本数据转换成数字序列了。
# 目的是要将文本数据转换成一个<index list, int>的样本list输出 def get_dataset(self, train: bool = False, max_len: int = 50, ratio: float = 0.8) -> list: """ 获取测试集或者训练集 :param train: 是否要获取训练集,默认返回测试集 :param max_len: 文本的长度,不够用PAD补全 :param ratio: 训练集占数据集的比例 :return: 训练集或者测试集 """ # 正则编译器,使用正则表达式,只获取字母串 cop = re.compile("[a-zA-Z]+") # 词干提取器,将不同形式的英语单词通过词干合并 ss = nltk.stem.SnowballStemmer('english') # 填充词,如果没有定义,则默认使用0 pad_id = self.word2index.get('[PAD]', 0) samples = [] # 打乱顺序来划分训练集和测试集 index = np.arange(len(self.textname_list)) np.random.shuffle(index) if train: sub_textname_list = [self.textname_list[item] for item in index[: int(ratio*len(self.textname_list))]] else: sub_textname_list = [self.textname_list[item] for item in index[int(ratio*len(self.textname_list)):]] for txtname in sub_textname_list: with open(os.path.join(self.root_dir, txtname), 'r', encoding='utf-8') as f: text_content = f.read() # 转为小写 text_content = text_content.lower() # 用正则表达式过滤提取单词 word_list = cop.findall(text_content) # 去掉停用词 word_list = [w for w in word_list if w not in self.stopwords] # 提取词干 word_list = [ss.stem(w) for w in word_list] # 转单词列表为序号列表,不足用pad_id填充 # 使用字典的get函数,如果找不到单词,则返回1,对应字典中的[UNK] data = [self.word2index.get(w, 1) for w in word_list] + [pad_id]*(max_len - len(word_list)) # 序列过长则删除 data = data[: max_len] data = np.array(data) # 获取标签 label = label_set[txtname.split('_')[0]] label = np.array(label) sample = (data, label) samples.append(sample) return samples
samples
的时候获取的东西是什么。在这里,每一条数据是<data, label>
对。其中,data
是每个txt文件中的单词转换后对应的数字序列;label
则是通过解析txt文件名得到的类别序号,获取方式同之前处理图像数据的方式。这一步是实现和图片数据处理的DataLoader
类相同的功能。主要实现的功能有:打乱数据,构建迭代器,和将数据打包成一个batch对应输出。
class CrisisDataLoader(object): """ 数据处理类,完成shuffle和批处理等功能 """ def __init__(self, samples, batch_size=32, shuffle=False): # 拆分sample datas = [item[0] for item in samples] labels = [item[1] for item in samples] # 要将数组转成numpy.ndarray类型后续才能方便打乱 datas = np.array(datas) labels = np.array(labels) # 打乱顺序 if shuffle: datas, labels = self._shuffle_data(datas, labels) self.datas = datas self.labels = labels self.batch_size = batch_size # batch的数量,//为整除 self.batch_num = len(samples) // self.batch_size # print(self.batch_num) # 是否能整除,若不能整除,则有剩余 self.residue = (len(samples) % self.batch_size != 0) # print(self.residue) # 目前是第几个batch self.index = 0 def _shuffle_data(self, datas, labels): # 形成随机化序列 shuffle_index = np.arange(len(datas)) np.random.shuffle(shuffle_index) # 按照随机化序列打乱顺序 datas = datas[shuffle_index] labels = labels[shuffle_index] return datas, labels def __next__(self): if (self.index == self.batch_num) and (self.residue is True): # 最后不能整除的部分 # numpy.ndarray切割后仍是numpy.ndarray sub_datas = self.datas[self.index*self.batch_size: len(self.datas)] sub_labels = self.labels[self.index*self.batch_size: len(self.labels)] self.index += 1 return sub_datas, sub_labels elif self.index >= self.batch_num: self.index = 0 # 终止迭代器 raise StopIteration else: # 正常的部分 sub_datas = self.datas[self.index*self.batch_size: (self.index+1)*self.batch_size] sub_labels = self.labels[self.index*self.batch_size: (self.index+1)*self.batch_size] self.index += 1 return sub_datas, sub_labels def __iter__(self): return self def __len__(self): # 返回可以划分的batch数量,如果不能整除就要+1 if self.residue: return self.batch_num + 1 else: return self.batch_num
_shuffle_data
函数用于打乱数据。__next__
,__iter__
和__len__
。该迭代器每次并不是返回一条数据,而是一个batch的数据,所以需要对原来的sample
进行拆分,然后再组合成一个一个batch。注意,拆分后,data
和label
的长度相同,且是一一对应的。由于传入的数据集长度可能不会刚好被batch_size
整除,所以需要考虑到最后不够一个batch_size
的数据应该如何处理。在这里是不够一个batch_size
就统一当成一个batch
处理。Tensor
类型,所以data
和label
最好都提前转换成numpy.ndarray
类型。这样构成的迭代器能够满足单次迭代的过程,但如果是需要进行多次重复的迭代过程的话,在某些情况下就会因为raise StopIteration
而出现问题,甚至终止程序。如何构建一个可以像标准迭代器那样重复使用的迭代器呢?一个简单的方法就是另外构建一个类,然后由该类对象每次返回一个新的CrisisDataLoader
对象。
# 构建可重复使用的迭代器
class MultiCrisisDataLoader(object):
def __init__(self, samples, batch_size=32, shuffle=True):
self.samples = samples
self.batch_size = batch_size
self.shuffle = shuffle
def __iter__(self):
return CrisisDataLoader(self.samples, self.batch_size, self.shuffle)
这样,每次for ... in obj
或者使用iter(obj)
的时候,就会调用迭代器obj
的__iter__
方法,获得一个新的CrisisDataLoader
对象。
终于来到搭建模型的时候了。模型的搭建过程很短,基本上是根据上面的示意图来构建的。
import torch import torch.nn as nn import torch.nn.functional as F class TextCNN(nn.Module): def __init__(self, vocab_size=10000, n_class=22, embed_dim=100, num_filters=256): """ textCNN文本分类网络 :param vocab_size: 总词汇表大小 :param n_class: 文本有几个分类 :param embed_dim: 词向量长度 :param num_filters: 每种类型的卷积核有多少个 """ super(TextCNN, self).__init__() # 词向量层,大小为[vacab_size, embed_dim] self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embed_dim) # 卷积层 # nn.ModuleList是一个nn.Module的容器,相当于list,但是专门对网络进行了适配 # 输入是1通道,核大小为[几行,全部列],一共有3种类型的卷积核 self.convs = nn.ModuleList([nn.Conv2d(1, num_filters, (f, embed_dim)) for f in [2, 3, 4]]) # 全连接层,num_filter*3=>n_class self.fc = nn.Linear(in_features=num_filters*3, out_features=n_class) # 前向传播,输入x为[32, 50] def forward(self, x: torch.Tensor): x = self.embedding(x) # [batch_size, 文本长度, 词向量长度], [32, 50, 100] # unsqueeze函数在第1维中增加一个大小为1的维度 x = x.unsqueeze(1) # [batch_size, 1, 文本长度, 词向量长度], [32, 1, 50, 100] pooled = [] # 遍历nn.ModuleList调用其中的卷积核 for conv in self.convs: out = conv(x) # 若两行大小的核,则为[32, 256, 49, 1] out = F.relu(out) # 最大池化,一次卷积过程只池化成一个值 out = F.max_pool2d(out, (out.shape[-2], 1)) # [32, 256, 1, 1] # squeeze函数将所有为1的维度都删掉 out = out.squeeze() # [32, 256] pooled.append(out) # 按行拼接,即要求列数相同,以行数增加的方式堆叠;按列拼接,即要求行数相同,以列数增加的方式堆叠 # torch.cat相当于将pooled中的三个[32, 256]的二维张量按照列进行拼接,形成一个[32, 768]的张量 # pooled中的元素一共2维,dim=-1取最后一维,即按列拼接 x = torch.cat(pooled, dim=-1) # [32, 768] x = self.fc(x) return x
nn.Embedding
是词向量的构建,它会产生一个能够容纳整个词表的二维词向量矩阵,大小为[vacab_size, embed_dim]
。每个单词都能够通过序号在该词向量矩阵中找到其对应的词向量。值得注意的是,词向量起初是随机生成的,实际上并不能表示一个单词的含义。但通过神经网络的训练,词向量也会被梯度更新,等到训练完成,所得到的词向量矩阵中的对应词向量就能够表示一个单词的含义了。这种模型为词嵌入模型。4
个维度,但是nn.Embedding
后只有三个维度,所以需要使用unsqueeze
进行升维,相当于是将二维词向量矩阵当作一个1通道的图片。batch
维度虽然存在,但是不影响实际forward
过程,在搭建网络中与理论对应时应当忽略该维度。if __name__ == '__main__': dataset = CrisisTextDataset('./data/CrisisMMD/damaged_infrastructure/text', ToTensor()) word2index = dataset.generate_vocab() # 构建训练集和测试集 train_dataset = dataset.get_dataset(train=True) train_loader = CrisisDataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_dataset = dataset.get_dataset(train=False) test_loader = CrisisDataLoader(test_dataset, batch_size=batch_size) model = TextCNN() use_gpu = torch.cuda.is_available() # 判断是否有GPU加速 if use_gpu: model = model.cuda() # 定义loss和optimizer criterion = nn.CrossEntropyLoss() # 优化器传入的参数是网络的parameters() optimizer = optim.SGD(model.parameters(), lr=learning_rate) for epoch in range(num_epoches): model.train() # 训练开始 print('*' * 25, 'epoch {}'.format(epoch + 1), '*' * 25) running_loss = 0.0 running_acc = 0.0 for i, (text, label) in enumerate(train_loader): # 从numpy.ndarray转为Tensor text = torch.LongTensor(text) label = torch.LongTensor(label) # CUDA if use_gpu: text = text.cuda() label = label.cuda() out = model(text) loss = criterion(out, label) running_loss += loss.item() * label.size(0) # 返回batch中每个图片预测最大值所在的位置标签,size为64 _, pred = torch.max(out, 1) # sum()用于统计pred==label的个数 num_correct = (pred == label).sum() # mean()用于统计pred==label的均值 accuracy = (pred == label).float().mean() # acc统计一个batch中预测正确的个数 running_acc += num_correct.item() # 向后传播 optimizer.zero_grad() loss.backward() optimizer.step() print('Finish {} epoch, Loss: {:.6f}, Acc: {:.6f}'.format( epoch + 1, running_loss / (len(train_dataset)), running_acc / (len(train_dataset)))) model.eval() # 训练结束,测试开始 eval_loss = 0 eval_acc = 0 with torch.no_grad(): for i, (text, label) in enumerate(test_loader): # 从numpy.ndarray转为Tensor text = torch.LongTensor(text) label = torch.LongTensor(label) if use_gpu: text = text.cuda() label = label.cuda() out = model(text) loss = criterion(out, label) eval_loss += loss.item() * label.size(0) _, pred = torch.max(out, 1) num_correct = (pred == label).sum() eval_acc += num_correct.item() print('Test Loss: {:.6f}, Acc: {:.6f}'.format( eval_loss / len(test_dataset), eval_acc / len(test_dataset))) # 保存模型参数 save_path = './textCNN.pth' torch.save(model.state_dict(), save_path)
VGG-16
的过程几乎相同,只有对data_loader
的使用方式略有区别。CrisisTextDataset
的时候使用了ToTensor()
参数,但实际上构建数据集的时候并没有立刻将数据转成Tensor
类型。这是因为后续的data_loader
中仍有数据处理的环节,但是由于不能调用标准的DataLoader
类进行处理,所以如果提前转成Tensor
的话不好进行自定义的数据处理。所以将转换Tensor
的步骤放到了将数据输入到网络之前执行。epoch
可以多跑几趟观察acc
的变化。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。