赞
踩
1. 通过构建和训练多层感知机(MLP)和卷积神经网络(CNN),深入理解这两种前馈神经网络的工作原理,包括它们的架构、参数调整以及优化策略。
2. 利用前馈神经网络实现对姓氏的分类,旨在识别不同国家或地区的姓氏特征,从而预测一个给定姓氏的可能来源。
最简单的MLP是对第3章感知器的扩展,MLP在感知器干基础上通过BP算法弥补了感知器只能学习线性分割超平面的缺点:
1. 感知机的提出:是由Frank Rosenblatt在1957年提出的,是一种基于线性结构的简单神经网络模型。它由输入层、权重和激活函数组成,可以实现输入的线性分类。但随着感知机和一些单层神经网络的研究和应用逐渐增多,但是其受限于只能学习线性分割的能力,限制了其在复杂问题上的应用。
2. 反向传播算法的提出:1986年,Rumelhart、Hinton和Williams等研究人员提出了反向传播(Backpropagation)算法,这一算法使得神经网络可以进行多层次、非线性的学习。反向传播算法使得MLP的训练变得有效和可行,推动了神经网络领域的复兴和进步。
3. 多层感知机的形成:借助反向传播算法,研究人员开始探索多层神经网络的结构和训练方法。MLP作为一种多层结构的前馈神经网络,能够通过多个非线性层次来学习和表示复杂的数据特征。1980年代末和1990年代初,MLP开始得到广泛的研究和应用,成为当时神经网络研究的主流模型之一。
多层感知机(MLP,Multilayer Perceptron)也叫人工神经网络(ANN,Artificial Neural Network),除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构,如下图:
如上图所示:MLP包含三个层次,输入层—>隐藏层—>输出层
神经元:包含一个带有权重和偏置的线性变换,以及一个激活函数(通常,输入层不使用激活函数,隐藏层和输出层使用激活函数)用来引入非线性,使得神经网络可以任意逼近任何非线性函数,这样神经网络就可以利用到更多的非线性模型中
隐藏层神经元:假设输入层用向量X表示,则隐藏层的输出就是f(w1*X+b1),函数f可以是sigmoid函数或者tanh函数,w1是权重(连接系数),b1是偏置
输出层的输出:softmax(w2*X1+b2),X1是隐藏层的输出
我们通过一个例子来介绍前向传播算法,帮助大家更实际的理解前向传播算法的使用。假设我们选择的激活函数是σ ( z ) ,隐藏层和输出层的输出值为a ,则对于下图的三层DNN,利用和感知机一样的思路,我们可以利用上一层的输出计算下一层的输出,也就是所谓的DNN前向传播算法
对于第二层的的输出,,,我们有:
=σ()=σ(+++)
=σ()=σ(+++)
=σ()=σ(+++)
对于第三层的的输出,我们有:
=σ()=σ(+++)
将上面的例子一般化,假设第l-1层共有m个神经元,则对于第l层的第j 个神经元的输出,我们有:
=σ()=σ(+)
其中,如果l = 2,则对于的即为输入层的。
上面可以看出,使用代数法一个个的表示输出比较复杂,而如果使用矩阵法则比较的简洁。假设第 l−1层共有m个神经元,而第l 层共有n 个神经元,则第l 层的线性系数w组成了一个n × m 的矩阵, 第l 层的偏倚b 组成了一个n × 1 的向量 , 第l − 1 层的输出a组成了一个m × 1的向量,第l 层的未激活前线性输出z 组成了一个n × 1的向量, 第l 层的的输出a 组成了一个n × 1 的向量。则用矩阵法表示,第l 层的输出为:
=σ()=σ(+)
这个表示方法简洁漂亮,后面我们的讨论都会基于上面的这个矩阵法表示来。
我们可以用一个合适的损失函数来度量训练样本的输出损失,接着对这个损失函数进行优化求最小化的极值,对应的一系列线性系数矩阵W和偏倚向量b即为我们的最终结果。在MLP中,损失函数优化极值求解的过程最常见的一般是通过梯度下降法来一步步迭代完成的,当然也可以是其他的迭代方法比如牛顿法与拟牛顿法。因为反向传播算法相较于正向传播更加复杂,三言两语难以表达清楚,我们这里不加赘述。在代码中我们只需要一句loss.backward()即可,我们只要知道:
对MLP的损失函数用梯度下降法进行迭代优化求极小值的过程即为我们的反向传播算法。
神经网络中,如果不加入激活函数,那么每一层的输入输出都是函数均是线性的,再深的MLP 都只能得到线性的模型,使MLP的逼近能力有限,于是就引入非线性函数作为激活函数,使网络的表达能力更强。激活函数将神经网络中将输入信号的总和转换为输出信号的函数,将多层感知机输出转换为非线性,使得神经网络可以任意逼近任何非线性函数,这样神经网络就可以应用到众多的非线性模型中。
总的来说,激活函数是向神经网络中引入非线性因素,通过激活函数神经网络就可以拟合各种曲线。
1)sigmoid
① 定义:sigmoid函数也叫Logistic函数,用于隐层神经元输出,能将$(-\infty,+\infty)$的数值映射到(0,1)的区间,可以用来做二分类。表达式为:
② 特点
优点:平滑、易于求导
缺点:激活函数计算量大,反向传播求误差梯度时,求导涉及除法;反向传播时,很容易就会出现梯度消失
2)tanh
① 定义:双曲正切函数,表达式为:
② 特点
优点:平滑、易于求导;输出均值为0,收敛速度要比sigmoid快,从而可以减少迭代次数
缺点:很容易就会出现梯度消失
3)relu
① 定义:修正线性单元,其表达式为:
② 特点:
优点:计算过程简单;避免了梯度爆炸和梯度消失问题
缺点:小于等于0时无输出
在这个例子中,我们在一个二元分类任务中训练感知器和MLP:星和圆。每个数据点是一个二维坐标。在不深入研究实现细节的情况下,最终的模型预测如图2所示。在这个图中,错误分类的数据点用黑色填充,而正确分类的数据点没有填充。在左边的面板中,从填充的形状可以看出,感知器在学习一个可以将星星和圆分开的决策边界方面有困难。然而,MLP(右面板)学习了一个更精确地对恒星和圆进行分类的决策边界。
图2
图2中,每个数据点的真正类是该点的形状:星形或圆形。错误的分类用块填充,正确的分类没有填充。这些线是每个模型的决策边界。在边的面板中,感知器学习一个不能正确地将圆与星分开的决策边界。事实上,没有一条线可以。在右动的面板中,MLP学会了从圆中分离星。
虽然在图中显示MLP有两个决策边界,这是它的优点,但它实际上只是一个决策边界!决策边界就是这样出现的,因为中间表示法改变了空间,使一个超平面同时出现在这两个位置上。在图3中,我们可以看到MLP计算的中间值。这些点的形状表示类(星形或圆形)。我们所看到的是,神经网络(本例中为MLP)已经学会了“扭曲”数据所处的空间,以便在数据通过最后一层时,用一线来分割它们。
图3
图3MLP的输入和中间表示是可视化的。从左到右:
(1)网络的输入;
(2)第一个线性模块的输出;
(3)第一个非线性模块的输出;
(4)第二个线性模块的输出。第一个线性模块的输出将圆和星分组,而第二个线性模块的输出将数据点重新组织为线性可分的。
相反,如图4所示,感知器没有额外的一层来处理数据的形状,直到数据变成线性可分的。
图4
图4中感知机的输入和输出表示没有像MLP那样的中间表示来分组和重新组织,所以不能将圆和星分开。
超越基本的感知器模型,MLP(多层感知机)引入了额外的计算层,显著增强了其功能性和表达能力。在PyTorch框架下,我们通过实例化两个线性模块来体现这一概念,通常将这些模块标记为fc1和fc2,这里的"fc"是“全连接”(Fully Connected)的缩写,意指每一层的所有节点都与下一层的每一个节点相连接。
在实现MLP时,我们主要关注前向传播流程,即数据从输入层到输出层的流动路径。这是因为PyTorch框架的动态计算图特性,它能够根据前向传播过程中定义的操作自动推导出反向传播所需的梯度计算规则。这意味着开发者无需手动编写反向传播代码,PyTorch会自动根据模型的前向计算逻辑,高效地执行梯度下降和参数更新步骤,大大简化了深度学习模型的训练过程。
例1:定义Multilayer Perceptron
- import torch.nn as nn
- import torch.nn.functional as F
- import torch
- seed = 1337
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
-
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): 输入向量的大小
- hidden_dim (int): 第一个全连接层的输出大小
- output_dim (int): 第二个全连接层的输出大小
- """
- super(MultilayerPerceptron, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """MLP的前向传播
- Args:
- x_in (torch.Tensor): 输入数据张量
- x_in.shape 应为 (batch, input_dim)
- apply_softmax (bool): 一个标志,用于指示是否使用softmax激活
- 如果与交叉熵损失一起使用,则应为False
- Returns:
- 结果张量。tensor.shape 应为 (batch, output_dim)
- """
- intermediate = F.relu(self.fc1(x_in))
- output = self.fc2(intermediate)
- if apply_softmax:
- output = F.softmax(output, dim=1) # 指定使用softmax激活
- return output # 返回输出张量
MLP(多层感知机)的一个显著优点是其灵活性,能够适应各种规模的输入数据,无论是小批量还是大数据集。为了直观地展示这一点,我们可以通过一个具体例子来进行说明:假设我们的输入数据具有3个特征维度,而我们期望的输出结果包含4个维度,同时,为了增强模型的学习能力,我们选择了一个隐藏层,其维度设定为100。
在实际操作中,当我们执行print语句查看各层的具体配置时,会清晰地观察到各层单元数之间的关联性和连贯性。具体而言,输入层的3个单元通过一系列权重连接到隐藏层的100个单元上,而这些隐藏单元再进一步与输出层的4个单元相连。这种层级间单元数的精心设计确保了模型能够从三维输入开始,经过隐藏层的非线性变换后,最终产生四维的输出结果
例2:MLP的示例实例化
- # 批次大小,定义一次输入模型的样本数量
- batch_size = 2
- # 输入维度,即每个样本的特征数量
- input_dim = 3
- # 隐藏层维度,这是模型中第一层全连接层的输出尺寸
- hidden_dim = 100
- # 输出维度,模型最终预测的类别数或输出值的数量
- output_dim = 4
- # 初始化模型,创建一个具有指定输入、隐藏和输出维度的多层感知机实例
- mlp = MultilayerPerceptron(input_dim, hidden_dim, output_dim)
- # 打印模型的结构信息
- print(mlp)
我们这里通过传递一些随机输入来快速测试模型的“连接”。因为模型还没有经过训练,所以输出是随机的。在花费时间训练模型之前,这样做是一个有用的完整性检查。
例3:用随机输入测试MLP
- import torch
- # 定义一个函数,用于描述输入张量的信息
- def describe(x):
- print("Type: {}".format(x.type())) # 打印输入张量的数据类型
- print("Shape/size: {}".format(x.shape)) # 打印输入张量的形状/尺寸
- print("Values: \n{}".format(x))# 打印输入张量的值
-
- x_input = torch.rand(batch_size, input_dim) # 生成一个随机的输入张量
- describe(x_input) # 调用describe函数,描述输入张量的信息
- y_output = mlp(x_input, apply_softmax=False)
- describe(y_output)
以先前的MLP模型为例,其输出呈现为一个二维张量,具体结构为2行4列。这里,张量的行数实际上对应着我们所使用的批次大小,即单次前向传播处理的数据点数量;而列数则代表了模型对每个数据点生成的最终特征表示的维度。在特定场景下,尤其是分类任务中,这些特征向量往往被视为模型的预测向量,其含义在于它们映射出了针对不同类别归属可能性的分布。在模型训练阶段,原始的预测向量直接作为损失函数的输入,与真实的目标标签进行对比,以评估模型预测的准确性,并据此指导权重的更新但是,如果想将预测向量转换为概率,则需要额外的步骤。
具体来说,需要softmax函数,它用于将一个值向量转换为概率。softma x有许多根,在自然语言处理(NLP)社区,它是最大熵(MaxEnt)分类这个函数背后的直觉是,大的正值会导致更高的概率,小的负值会导致更小的概率。在示例2-3中,apply_softmax参数应用了这个额外的步骤。在例2-4中,可以看到相同的输出,但是这次将apply_softmax标志设置为True。
卷积神经网络(CNN)在图像分类等领域表现出色,通常由以下几种层组成:卷积层(Convolutional Layer)、激活层(Activation Layer)、池化层(Pooling Layer)、全连接层(Fully Connected Layer)。通过卷积层、池化层和全连接层的组合,能够提取出数据中的空间层次特征,非常适合处理具有空间结构的数据。
计算机与人眼看到的不同,对于输入图像,首先要将其转换为对应的二维矩阵,这个二位矩阵就是由图像每一个像素的像素值大小组成的,我们可以看一个例子,如下图所示的手写数字“8”的图像,计算机读取后是以像素值大小组成的二维矩阵存储的图像。
通过输出层,我们已经得到图片的二维矩阵了,想要提取其中特征,那么卷积操作就会为存在特征的区域确定一个高值,否则确定一个低值。这个过程需要通过计算其与卷积核(Convolution Kernel)的乘积值来确定。假设我们现在的输入图片是一个人的脑袋,而人的眼睛是我们需要提取的特征,那么我们就将人的眼睛作为卷积核,通过在人的脑袋的图片上移动来确定哪里是眼睛,这个过程如下所示:
通过整个卷积过程又得到一个新的二维矩阵,此二维矩阵也被称为特征图(Feature Map),最后我们可以将得到的特征图进行上色处理,最后可以提取到关于人的眼睛的特征,如下所示:
有几个卷积核就有多少个特征图,现实中情况肯定更为复杂,也就会有更多的卷积核,那么就会有更多的特征图,当特征图非常多的时候,意味着我们得到的特征也非常多,但是这么多特征都不一定都是我们需要的,并且多余的特征通常会给我们带来如下两个问题:
* 过拟合
* 维度过高
为了解决这个问题,我们可以利用池化层又称为下采样,也就是说,当我们进行卷积操作后,再将得到的特征图进行特征提取,将其中最具有代表性的特征提取出来,可以起到减小过拟合和降低维度的作用。
通常来说,池化有两种方式:最大池化与平均池化。这里不再介绍,我们只要了解池化层可以提取到更有代表性的特征,同时还减少了不必要的计算,这对于我们现实中的神经网络计算大有脾益即可。
还是上面人的脑袋的示例,现在我们已经通过卷积和池化提取到了这个人的眼睛、鼻子和嘴的特征,如果我想利用这些特征来识别这个图片是否是人的脑袋该怎么办呢?此时我们只需要将提取到的所有特征图进行“展平”,将其维度变为1 × x ,这个过程就是全连接的过程,也就是说,此步我们将所有的特征都展开并进行运算,最后会得到一个概率值,这个概率值就是输入图片是否是人的概率,这个过程如下所示:
只需要将全连接层得到的一维向量经过计算后得到识别值的一个概率,当然,这个计算可能是线性的,也可能是非线性的。在深度学习中,我们需要识别的结果一般都是多分类的,所以每个位置都会有一个概率值,代表识别为当前值的概率,取最大的概率值,就是最终的识别结果。在训练的过程中,可以通过不断地调整参数值来使识别结果更准确,从而达到最高的模型准确率。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集将在本课程实验的几个示例中重用,并具有一些使其有趣的属性。
第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。
第二个特点是,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密,如“O’Neill”属于爱尔兰,“Antonopoulos”属于希腊,“Nagasawa”属于日本,以及“Zhu”属于中国,这些姓氏一眼即可辨识其文化根源。
为了创建最终的数据集,我们从一个比课程补充材料中包含的版本处理更少的版本开始,并执行了几个数据集修改操作。第一个目的是减少这种不平衡——原始数据集中70%以上是俄文,这可能是由于抽样偏差或俄文姓氏的增多。为此,我们通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本。接下来,我们根据国籍对数据集进行分组,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
SurnameDataset的实现与“Example: classification of Sentiment of Restaurant Reviews”中的ReviewDataset几乎相同,只是在getitem方法的实现方式上略有不同。回想一下,本课程中呈现的数据集类继承自PyTorch的数据集类,因此,我们需要实现两个函数:__getitem方法,它在给定索引时返回一个数据点;以及len方法,该方法返回数据集的长度。“示例:餐厅评论的情绪分类”中的示例与本示例的区别在getitem__中,如示例4-5所示。它不像“示例:将餐馆评论的情绪分类”那样返回一个向量化的评论,而是返回一个向量化的姓氏和与其国籍相对应的索引:
- from torch.utils.data import Dataset
- import pandas as pd
-
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- surname_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train'] # 获取训练集的数据
- self.train_size = len(self.train_df) # 训练集大小
-
- self.val_df = self.surname_df[self.surname_df.split=='val'] # 获取验证集的数据
- self.validation_size = len(self.val_df) # 验证集大小
-
- self.test_df = self.surname_df[self.surname_df.split=='test'] # 获取测试集的数据
- self.test_size = len(self.test_df) # 测试集大小
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict() # 计算每个类别的计数,并转换为字典
- def sort_key(item): # 定义排序函数
- return self._vectorizer.nationality_vocab.lookup_token(item[0]) # 返回类别的索引
- sorted_counts = sorted(class_counts.items(), key=sort_key) # 按类别索引排序
- frequencies = [count for _, count in sorted_counts] # 获取排序后的频率列表
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32) # 计算类别权重
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameVectorizer
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's:
- features (x_surname)
- label (y_nationality)
- """
- row = self._target_df.iloc[index] # 获取对应索引的数据行
-
- surname_vector = \
- self._vectorizer.vectorize(row.surname) # 向量化姓氏
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality) # 获取国籍的索引
-
- return {'x_surname': surname_vector, # 返回特征和标签
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
为了使用字符对姓氏进行分类,我们使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。这些数据结构与“Example: Classifying Sentiment of Restaurant Reviews”中使用的数据结构相同,它们举例说明了一种多态性,这种多态性将姓氏的字符标记与Yelp评论的单词标记相同对待。数据不是通过将字令牌映射到整数来向量化的,而是通过将字符映射到整数来向量化的。
在词汇表构建的过程中,add_token方法担当着关键角色,它允许我们将新遇到的字符无缝添加至词汇表中,从而不断丰富词汇表的覆盖范围。而lookup_token和lookup_index方法则分别提供了从字符查找整数索引和从整数索引恢复字符的功能,前者在数据预处理阶段至关重要,后者则在模型预测阶段发挥着不可或缺的作用,尤其是在需要将模型输出的索引值还原为人类可读的字符形式时。
值得注意的是,相较于Yelp评论中采用的基于统计频率的词汇表构建策略,此处我们选择了一种更为简化的方法——one-hot词汇表。这意味着我们并未对字符的出现频率进行统计分析,也未设置任何频率阈值来筛选字符。之所以采取这样的策略,主要是考虑到我们的数据集规模相对较小,且大部分字符的出现频率已经足够高,无需额外的频率过滤步骤。这一决策不仅简化了词汇表的构建流程,同时也避免了因过度筛选而导致的潜在信息损失,确保了模型在处理姓氏分类任务时能够充分利用所有可用的字符信息。
- # 定义词汇表类,用于处理词到索引以及索引到词的映射
- class Vocabulary(object):
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- # 初始化词汇表,如果未提供token到index的映射,则创建空映射
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 创建从索引到标记的反向映射
- self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- # 如果要求,将UNK标记添加到词汇表中
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
- def to_serializable(self):
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- # 从序列化内容中重建词汇表
- return cls(**contents)
-
- def add_token(self, token):
- # 添加新token到词汇表中,如果已经存在则返回其现有索引,否则添加并返回新索引
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
-
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
-
- if index not in self._idx_to_token:
- raise KeyError("索引(%d)不在Vocabulary中" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- # 返回字符串表示的词汇表信息
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- # 返回词汇表中token的数量
- return len(self._token_to_idx)
- # 定义姓氏向量化器类,用于处理姓氏和国籍的向量化
- class SurnameVectorizer(object):
-
- def __init__(self, surname_vocab, nationality_vocab):
- # 初始化姓氏和国籍的词汇表
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- # 将姓氏转换为one-hot编码向量
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- # 从DataFrame构建姓氏和国籍的词汇表
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- # 从序列化内容中重建姓氏和国籍的词汇表
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- # 返回可序列化版本的姓氏和国籍词汇表属性
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}
SurnameClassifier是本实验前面介绍的MLP的实现。第一个线性层接收原始输入向量并将其转换为一个中间表示向量,紧接着对这一中间向量施加非线性变换,以增强模型的表达能力和学习复杂模式的能力。随后,第二线性层将中间向量映射到预测向量,将经过非线性变换的中间向量进一步映射至预测向量,这一向量承载着模型对输入数据类别的预测信息。
在最后一步中,可选地应用softmax操作,以确保输出和为1;这就是所谓的“概率”。它是可选的原因与我们使用的损失函数的数学公式有关——交叉熵损失。我们研究了“损失函数”中的交叉熵损失。回想一下,交叉熵损失对于多类分类是最理想的,但是在训练过程中软最大值的计算不仅浪费而且在很多情况下并不稳定。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class SurnameClassifier(nn.Module):
- """ A 2-layer Multilayer Perceptron for classifying surnames """
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the classifier
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
虽然我们使用了不同的模型、数据集和损失函数,但是训练例程是相同的。因此,我们只展示了
args以及本例中的训练例程与“示例:餐厅评论情绪分类”中的示例之间的主要区别。
- from argparse import Namespace
-
- args = Namespace(
- # Data and path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- # Model hyper parameters
- hidden_dim=300,
- # Training hyper parameters
- seed=1337,
- num_epochs=100,
- early_stopping_criteria=5,
- learning_rate=0.001,
- batch_size=64,
- # Runtime options
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- )
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # Check CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- print("Using CUDA: {}".format(args.cuda))
-
-
- # Set seed for reproducibility
- set_seed_everywhere(args.seed, args.cuda)
-
- # handle dirs
- handle_dirs(args.save_dir)
训练中最显著的差异与模型中输出的种类和使用的损失函数有关。在这个例子中,输出是一个多类预测向量,可以转换为概率。正如在模型描述中所描述的,这种输出的损失类型仅限于CrossEntropyLoss和NLLLoss。由于它的简化,我们使用了CrossEntropyLoss。
在下方代码中,我们展示了数据集、模型、损失函数和优化器的实例化。这些实例应该看起来与“示例:将餐馆评论的情绪分类”中的实例几乎相同。事实上,在本课程后面的实验中,这种模式将对每个示例进行重复。
- # 加载数据集并创建向量化器
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- vectorizer = dataset.get_vectorizer()
- # 初始化分类器
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
-
- classifier = classifier.to(args.device)
- # 定义损失函数,使用类别权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
与“Example: Classifying Sentiment of Restaurant Reviews”中的训练循环相比,本例的训练循环除了变量名以外几乎是相同的。具体来说,下面代码显示了使用不同的key从batch_dict中获取数据。除了外观上的差异,训练循环的功能保持不变。利用训练数据,计算模型输出、损失和梯度。然后,使用梯度来更新模型。
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
-
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- # 创建训练状态
- train_state = make_train_state(args)
-
- # 创建训练进度条
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- # 设置数据集为训练集
- dataset.set_split('train')
- # 创建训练集进度条
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- # 设置数据集为验证集
- dataset.set_split('val')
- # 创建验证集进度条
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss) # 将当前验证损失添加到训练状态的验证损失列表中
- train_state['val_acc'].append(running_acc) # 将当前验证准确率添加到训练状态的验证准确率列表中
-
- train_state = update_train_state(args=args, model=classifier, # 更新训练状态
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1]) # 调整学习率,根据最新的验证损失
-
- if train_state['stop_early']: # 如果早停条件满足,退出训练
- break
- # 重置训练和验证进度条
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
-
- # compute the loss & accuracy on the test set using the best available model
-
- classifier.load_state_dict(torch.load(train_state['model_filename'])) # 加载训练状态中保存的模型参数
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device) # 将类别权重移至指定设备
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
-
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
为了全面洞察模型的实际效能,我们应当采取定性和定量双重分析策略,以确保模型不仅在理论上站得住脚,更能实际应对未知数据挑战。定量角度,通过计算模型在测试集上的误差率,我们能够直观判断分类器的泛化能力,即其对未见过样本的适应性。而定性层面,则需借助模型的top-k预测结果,通过观察和分析,逐步构建起对模型理解深度和广度的直观认知。
下面的代码段展示了如何利用训练好的模型对新的姓氏进行分类预测。对于任意给定的姓氏字符串,我们首先执行向量化处理,将其转化为模型可识别的数字形式;而后,将向量化后的数据输入模型,获得预测结果。值得一提的是,此处我们特意启用了apply_softmax选项,使得输出结果以概率的形式呈现,便于后续分析。
- def predict_nationality(name, classifier, vectorizer): # 预测姓氏的国籍
- vectorized_name = vectorizer.vectorize(name) # 向量化输入的姓氏
- vectorized_name = torch.tensor(vectorized_name).view(1, -1) # 转换为张量并调整形状
- result = classifier(vectorized_name, apply_softmax=True) # 使用分类器预测,应用 softmax 函数
-
- probability_values, indices = result.max(dim=1) # 获取最大概率值及其对应的索引
- index = indices.item() # 提取索引的值
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index) # 查找预测的国籍
- probability_value = probability_values.item() # 提取概率值
-
- return {'nationality': predicted_nationality,
- 'probability': probability_value}
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
不仅要看最好的预测,还要看更多的预测。例如,NLP中的标准实践是采用k-best预测并使用另一个模型对它们重新排序。PyTorch提供了一个torch.topk函数,它提供了一种方便的方法来获得这些预测,如下例所示
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- # 向量化输入的名字
- vectorized_name = vectorizer.vectorize(name)
- # 将向量化的名字转换为 PyTorch 张量,并调整其形状以匹配模型输入的要求
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
-
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
- # 创建一个空列表,用于存储前k个国籍的预测结果
- results = []
- # 遍历前k个国籍的概率值和索引
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
- # 返回包含前k个国籍预测结果的列表
- return results
- # 用户输入待分类的姓氏
- new_surname = input("Enter a surname to classify: ")
- # 确保模型在 CPU 上运行
- classifier = classifier.to("cpu")
- # 用户输入想要查看的前k个预测结果的数量
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
- # 调用函数进行预测
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
- # 打印预测结果
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
这项任务的许多细节与前面的MLP示例相同,但真正发生变化的是模型的构造和向量化过程。模型的输入,而不是我们在上一个例子中看到的收缩的onehot,将是一个onehot的矩阵。这种设计将使CNN能够更好地“view”字符的排列。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- # 初始化数据集,包括加载数据和向量化器
- self.surname_df = surname_df
- self._vectorizer = vectorizer
- # 分割数据集为训练集、验证集和测试集
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
- # 创建数据集字典
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
- # 设置默认的数据集为训练集
- self.set_split('train')
- # 计算类别的权重,用于处理类别不平衡的问题
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- # 负责加载数据集和向量化器的方法
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
-
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
-
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
-
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
-
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
-
- return self._vectorizer
-
- def set_split(self, split="train"):
-
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
-
- row = self._target_df.iloc[index]
-
- surname_matrix = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
-
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
-
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
虽然词汇表和DataLoader的实现方式与“示例:带有多层感知器的姓氏分类”中的示例相同,但是Vectorizer的vectorize()方法已经改变,以适应CNN模型的需要。具体来说,每个字符在字符串中映射为一个整数,并使用整数构建一个由onehot向量组成的矩阵,这与我们在示例4-18的代码中所示的方式相同。重要的是,矩阵中的每一列都是不同的onehot向量。这是因为我们将使用的Conv1d层要求数据张量在第0维上具有批处理,在第1维上具有通道,在第2维上具有特征。除了更改为使用onehot矩阵之外,我们还修改了矢量化器,以便计算姓氏的最大长度并将其保存。
- class Vocabulary(object):
- """用于处理文本并提取词汇表进行映射的类"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- Args:
- token_to_idx (dict): 一个预先存在的标记到索引的映射字典
- add_unk (bool): 一个指示是否添加UNK标记的标志
- unk_token (str): 要添加到词汇表中的UNK标记
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- """返回一个可序列化的字典"""
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """从序列化字典中实例化词汇表"""
- return cls(**contents)
-
- def add_token(self, token):
- """基于标记更新映射字典。
- Args:
- token (str): 要添加到词汇表中的项目
- Returns:
- index (int): 对应于标记的整数
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """将标记列表添加到词汇表中
-
- Args:
- tokens (list): 一组字符串标记
- Returns:
- indices (list): 与标记对应的索引列表
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """检索与标记相关联的索引,如果标记不存在,则返回UNK索引。
-
- Args:
- token (str): 要查找的标记
- Returns:
- index (int): 与标记对应的索引
- Notes:
- 对于UNK功能,unk_index需要>=0(已添加到词汇表中)
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """返回与索引相关联的标记
-
- Args:
- index (int): 要查找的索引
- Returns:
- token (str): 与索引相关联的标记
- Raises:
- KeyError: 如果索引不在词汇表中
- """
- if index not in self._idx_to_token:
- raise KeyError("索引 (%d) 不在词汇表中" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<词汇表(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
- class SurnameVectorizer(object):
- """协调词汇表并将其用于向量化的向量化器"""
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- """
- Args:
- surname_vocab (Vocabulary): 将字符映射到整数的词汇表
- nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
- max_surname_length (int): 最长姓氏的长度
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
- self._max_surname_length = max_surname_length
-
- def vectorize(self, surname):
- """
- Args:
- surname (str): 姓氏
- Returns:
- one_hot_matrix (np.ndarray): 一个独热向量的矩阵
- """
-
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从数据框实例化向量化器
-
- Args:
- surname_df (pandas.DataFrame): 姓氏数据集
- Returns:
- SurnameVectorizer的实例
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable(),
- 'max_surname_length': self._max_surname_length}
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- surname_df (pandas.DataFrame): 数据集
- vectorizer (SurnameVectorizer): 从数据集实例化的向量化器
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
- self.train_df = self.surname_df[self.surname_df.split=='train'] # 训练集DataFrame
- self.train_size = len(self.train_df) # 训练集大小
-
- self.val_df = self.surname_df[self.surname_df.split=='val'] # 验证集DataFrame
- self.validation_size = len(self.val_df) # 验证集大小
-
- self.test_df = self.surname_df[self.surname_df.split=='test'] # 测试集DataFrame
- self.test_size = len(self.test_df) # 测试集大小
-
- # 以字典形式存储数据集拆分
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train') # 设置当前拆分为训练集
-
- # 类别权重
- class_counts = surname_df.nationality.value_counts().to_dict() # 统计每个国籍的样本数
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key) # 按国籍词汇表的顺序对样本数进行排序
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32) # 计算类别权重
-
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """加载数据集并从头开始创建新的向量化器
-
- Args:
- surname_csv (str): 数据集的位置
- Returns:
- SurnameDataset的实例
- """
- surname_df = pd.read_csv(surname_csv) # 从CSV文件加载数据集
- train_surname_df = surname_df[surname_df.split=='train'] # 获取训练集DataFrame
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df)) # 实例化SurnameDataset对象
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """加载数据集和相应的向量化器。
- 在向量化器已经缓存以便重用的情况下使用
-
- Args:
- surname_csv (str): 数据集的位置
- vectorizer_filepath (str): 保存的向量化器的位置
- Returns:
- SurnameDataset的实例
- """
- surname_df = pd.read_csv(surname_csv) # 从CSV文件加载数据集
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath) # 加载向量化器
- return cls(surname_df, vectorizer) # 实例化SurnameDataset对象
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """从文件加载向量化器的静态方法
-
- Args:
- vectorizer_filepath (str): 序列化向量化器的位置
- Returns:
- SurnameDataset的实例
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp)) # 从文件加载向量化器
-
- def save_vectorizer(self, vectorizer_filepath):
- """使用json保存向量化器到磁盘
-
- Args:
- vectorizer_filepath (str): 保存向量化器的位置
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp) # 将向量化器序列化为JSON并保存到文件中
-
- def get_vectorizer(self):
- """返回向量化器"""
- return self._vectorizer # 返回向量化器
-
- def set_split(self, split="train"):
- """使用数据框中的列选择数据集的拆分"""
- self._target_split = split # 设置目标拆分
- self._target_df, self._target_size = self._lookup_dict[split] # 获取目标拆分的DataFrame和大小
-
- def __len__(self):
- return self._target_size # 返回数据集大小
-
- def __getitem__(self, index):
- """PyTorch数据集的主要入口方法
-
- Args:
- index (int): 数据点的索引
- Returns:
- 一个字典,包含数据点的特征(x_data)和标签(y_target)
- """
- row = self._target_df.iloc[index] # 获取索引处的行数据
-
- surname_matrix = \
- self._vectorizer.vectorize(row.surname) # 使用向量化器将姓氏转换为矩阵形式
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality) # 获取国籍的索引
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index} # 返回姓氏矩阵和国籍索引的字典
-
- def get_num_batches(self, batch_size):
- """给定批量大小,返回数据集中的批次数
-
- Args:
- batch_size (int)
- Returns:
- 数据集中的批次数
- """
- return len(self) // batch_size # 返回数据集中的批次数
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- 一个生成器函数,它封装了PyTorch DataLoader。
- 它将确保每个张量都位于正确的设备位置。
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last) # 创建DataLoader对象
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device) # 将张量移到指定设备
- yield out_data_dict # 生成数据字典
我们在本例中使用的模型是使用我们在“卷积神经网络”中介绍的方法构建的。实际上,我们在该部分中创建的用于测试卷积层的“人工”数据与姓氏数据集中使用本例中的矢量化器的数据张量的大小完全匹配。正如在下方代码中所看到的,它与我们在“卷积神经网络”中引入的Conv1d序列既有相似之处,也有需要解释的新添加内容。具体来说,该模型类似于“卷积神经网络”,它使用一系列一维卷积来增量地计算更多的特征,从而得到一个单特征向量。
然而,本例中的新内容是使用sequence和ELU PyTorch模块。序列模块是封装线性操作序列的方便包装器。在这种情况下,我们使用它来封装Conv1d序列的应用程序。ELU是类似于实验3中介绍的ReLU的非线性函数,但是它不是将值裁剪到0以下,而是对它们求幂。ELU已经被证明是卷积层之间使用的一种很有前途的非线性。
在本例中,我们将每个卷积的通道数与num_channels超参数绑定。我们可以选择不同数量的通道分别进行卷积运算。这样做需要优化更多的超参数。我们发现256足够大,可以使模型达到合理的性能。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- 初始化姓氏分类器。
- 参数:
- initial_num_channels (int): 输入数据的通道数。
- num_classes (int): 输出类别的数量。
- num_channels (int): 卷积层的输出通道数。
- """
-
- super(SurnameClassifier, self).__init__()
- # 创建卷积神经网络模块
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3), # 第一层卷积
- nn.ELU(), # Exponential Linear Unit 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),# 第二层卷积,步长为2
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),# 第三层卷积,步长为2
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- nn.ELU()
- )
- # 创建全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """
- 定义模型的前向传播。
- 参数:
- x_surname (Tensor): 输入的姓氏数据。
- apply_softmax (bool): 是否应用softmax函数。
- 返回:
- prediction_vector (Tensor): 预测向量。
- """
-
- # 通过卷积网络提取特征,然后沿第2维挤压以移除多余的维度
- features = self.convnet(x_surname).squeeze(dim=2)
- # 特征通过全连接层得到预测向量
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
训练程序包括以下似曾相识的的操作序列:实例化数据集,实例化模型,实例化损失函数,实例化优化器,遍历数据集的训练分区和更新模型参数,遍历数据集的验证分区和测量性能,然后重复数据集迭代一定次数。此时,这是本书到目前为止的第三个训练例程实现,应该将这个操作序列内部化。对于这个例子,我们将不再详细描述具体的训练例程,因为它与“示例:带有多层感知器的姓氏分类”中的例程完全相同。但是,输入参数是不同的,可以在下方代码中中看到。
- from argparse import Namespace
- args = Namespace(
- # Data and Path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/cnn",
- # Model hyper parameters
- hidden_dim=100,
- num_channels=256,
- # Training hyper parameters
- seed=1337,
- learning_rate=0.001,
- batch_size=128,
- num_epochs=100,
- early_stopping_criteria=5,
- dropout_p=0.1,
- # Runtime omitted for space ...
- )
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- def compute_accuracy(y_pred, y_target):
- y_pred_indices = y_pred.max(dim=1)[1]
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
- import os
- import numpy as np
- import pandas as pd
-
- args = Namespace(
- # Data and Path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/cnn",
- # Model hyper parameters
- hidden_dim=100,
- num_channels=256,
- # Training hyper parameters
- seed=1337,
- learning_rate=0.001,
- batch_size=128,
- num_epochs=100,
- early_stopping_criteria=5,
- dropout_p=0.1,
- # Runtime options
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- catch_keyboard_interrupt=True
- )
-
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # Check CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
- # Set seed for reproducibility
- set_seed_everywhere(args.seed, args.cuda)
-
- # handle dirs
- handle_dirs(args.save_dir)
-
- if args.reload_from_files:
- # training from a checkpoint
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # create dataset and vectorizer
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- vectorizer = dataset.get_vectorizer()
-
- classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab),
- num_classes=len(vectorizer.nationality_vocab),
- num_channels=args.num_channels)
-
- classifer = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- train_state = make_train_state(args)
-
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
-
-
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
Evaluating on the Test Dataset 正如“示例:带有多层感知器的姓氏分类”中的示例与本示例之间的训练例程没有变化一样,执行评估的代码也没有变化。总之,调用分类器的eval()方法来防止反向传播,并迭代测试数据集。与 MLP 约 50% 的性能相比,该模型的测试集性能准确率约为56%。尽管这些性能数字绝不是这些特定架构的上限,但是通过一个相对简单的CNN模型获得的改进应该足以让您在文本数据上尝试CNNs。
Classifying or retrieving top predictions for a new surname
在本例中,predict_nationality()函数的一部分发生了更改,如示例4-21所示:我们没有使用视图方法重塑新创建的数据张量以添加批处理维度,而是使用PyTorch的unsqueeze()函数在批处理应该在的位置添加大小为1的维度。相同的更改反映在predict_topk_nationality()函数中。
- def predict_nationality(surname, classifier, vectorizer):
- """Predict the nationality from a new surname
- Args:
- surname (str): the surname to classifier
- classifier (SurnameClassifer): an instance of the classifier
- vectorizer (SurnameVectorizer): the corresponding vectorizer
- Returns:
- a dictionary with the most likely nationality and its probability
- """
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- result = classifier(vectorized_surname, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.cpu()
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
四、实验总结
实验结果证实了CNN在处理文本分类任务,特别是基于字符的文本分类时的优势。CNN能够自动学习到文本数据中的空间和序列特征,这对于姓氏这类具有强烈文化标识的文本分类尤其重要。此外,实验还揭示了特征表示方法(如one-hot编码与嵌入向量)对模型性能的显著影响,提示在后续研究中探索更先进的特征表示方法,如词嵌入和Transformer架构,有望进一步提升模型的分类效果。本实验不仅加深了对MLP和CNN两种前馈神经网络的理解,还通过实践展现了它们在姓氏地理来源分类任务中的应用潜力和局限性,为未来在文本分类领域的研究提供了有价值的参考和启示
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。