赞
踩
在本次实验专注于两种前馈神经网络的深入研究:多层感知器和卷积神经网络。
多层感知器作为简单感知器的进阶版本,通过将多个感知器集成在一个层内,并层层叠加,形成了更为复杂的网络结构。在“多层感知器在姓氏分类中的应用”章节中,我们将展示这种网络如何在复杂的多级分类任务中发挥作用。
卷积神经网络(CNN)则是另一种前馈神经网络,它借鉴了数字信号处理中的窗口滤波技术,能够捕捉输入数据中的局部特征。这一能力不仅使CNN在计算机视觉领域成为核心工具,同时也使其成为识别序列数据中的模式(例如单词和句子)的理想选择。在“CNN在姓氏分类中的应用”章节中,我们将具体演示如何利用CNN进行分类任务。
多层感知机(MLP,Multilayer Perceptron)也叫人工神经网络(ANN,Artificial Neural Network),除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构。
多层感知器(multilayer Perceptron,MLP)是指可以是感知器的人工神经元组成的多个层次。MPL的层次结构是一个有向无环图。通常,每一层都全连接到下一层,某一层上的每个人工神经元的输出成为下一层若干人工神经元的输入。MLP至少有三层人工神经元。分别为输入层、隐藏层、输出层。它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构。下图为MLP的网络结构。
输入层(input layer)由简单的输入人工神经元构成。每个输入神经元至少连接一个隐藏层(hidden layer)的人工神经元。隐藏层表示潜在的变量;层的输入和输出都不会出现在训练集中。隐藏层后面连接的是输出层(output layer)。
多层感知器(MLP)的输入层接收要处理的输入信号,输出层执行预测和分类等所需任务,位于输入和输出层之间的隐藏层则是MLP的真正计算引擎。数据正向沿着输入层流向输出层,MLP中的神经元通过反向传播进行训练学习算法。在本部分,我们将详细介绍多层感知机的工作机制,包括前向传播、反向传播和权重更新的过程。
1、前向传播(Forward Propagation)
在前向传播过程中,输入数据从输入层经过一系列的隐藏层传递到输出层。每个神经元都接收上一层所有神经元的输出,并计算加权和,并应用激活函数来产生输出。
假设我们有一个多层感知机模型,包含一个输入层、两个隐藏层和一个输出层。输入层有3个神经元,隐藏层1有4个神经元,隐藏层2有2个神经元,输出层有1个神经元。每个神经元与上一层的所有神经元相连。
下图展示了这个多层感知机的结构:
- 输入层 隐藏层1 隐藏层2 输出层
- o1 -------------- o1 -------------- o1 -------------- o1
- o2 -------------- o2 -------------- o2 -------------- o2
- o3 -------------- o3 -------------- o3 -------------- o3
- o4 -------------- o4
- o5
假设输入数据为 [1, 2, 3],每个连接都有一个权重,我们用 w 来表示。每个神经元还有一个偏置项 b。我们使用 ReLU(Rectified Linear Unit)作为激活函数。
首先,我们将输入数据乘以权重并加上偏置项,然后将结果传递给激活函数。这一过程可以表示为:
- 隐藏层1的输入 = (1 * w1) + (2 * w2) + (3 * w3) + b1
- 隐藏层1的输出 = ReLU(隐藏层1的输入)
-
- 隐藏层2的输入 = (隐藏层1的输出 * w4) + (隐藏层1的输出 * w5) + (隐藏层1的输出 * w6) + b2
- 隐藏层2的输出 = ReLU(隐藏层2的输入)
-
- 输出层的输入 = (隐藏层2的输出 * w7) + (隐藏层2的输出 * w8) + b3
- 输出层的输出 = ReLU(输出层的输入)
这样,我们就完成了前向传播过程,得到了最终的输出。
2、反向传播(Backpropagation)
在反向传播过程中,我们计算输出层的误差,并将误差反向传播回隐藏层,以更新权重,从而最小化预测输出与实际输出之间的差距。
首先,我们计算输出层的误差。假设我们的目标输出为 y,输出层的输出为 y_hat,则输出层的误差可以使用均方误差(Mean Squared Error)来计算:
输出层的误差 = (y - y_hat)^2
然后,我们将输出层的误差反向传播到隐藏层2,并计算隐藏层2的误差:
隐藏层2的误差 = 输出层的误差 * 隐藏层2的梯度
接下来,我们将隐藏层2的误差反向传播到隐藏层1,并计算隐藏层1的误差:
隐藏层1的误差 = 隐藏层2的误差 * 隐藏层1的梯度
在反向传播过程中,我们需要计算每个神经元的梯度,以便更新权重。梯度表示了误差相对于权重的变化率。
3、权重更新
在权重更新阶段,我们使用梯度下降法来调整权重,以最小化损失函数。我们根据反向传播计算得到的梯度,使用优化算法(如随机梯度下降)来更新网络中的权重。
权重的更新可以使用以下公式表示:
新权重 = 旧权重 - 学习率 * 梯度
学习率是一个超参数,控制每次更新的步长。通过迭代训练样本并反复进行前向传播和反向传播,不断调整权重,直到达到收敛条件或达到预定的训练轮数。
通过前向传播和反向传播的过程,多层感知机可以学习输入数据的特征,并进行预测和分类任务。
卷积神经网络(Convolutional Neural Network,简称CNN),是一种前馈神经网络,人工神经元可以响应周围单元,可以进行大型图像处理。卷积神经网络利用输入是图片的特点,把神经元设计成三个维度 : width, height, depth。比如输入的图片大小是 32 × 32 × 3 (rgb),那么输入神经元就也具有 32×32×3 的维度。下图为CNN的网络结构。可以看到,一个卷积神经网络由很多层组成,它们的输入是三维的,输出也是三维的,有的层有参数,有的层不需要参数。当CNN网络工作时,这些卷积会伴随着卷积过程进行不断转换。
图1 CNN网络结构
CNN网络一共有5个层级结构:输入层、卷积层、激活层、池化层、全连接FC层。
输入层接收原始图像数据。图像通常由三个颜色通道(红、绿、蓝)组成,形成一个二维矩阵,表示像素的强度值。
卷积层将输入图像与卷积核进行卷积操作。它并不是一下子整张图同时识别,而是对于图片中的每一个特征首先局部感知,然后在更高层次对局部进行综合操作,从而得到的全局信息。
图2 局部感知示意图
对卷积层的输出结果做一次非线性映射。使网络能够学习复杂的特征。常见的激活函数有:Sigmoid函数、Tanh函数、ReLU、Leaky ReLU。
Sigmoid:
Tanh:
ReLU:
Leaky ReLU:
通过减小特征图的大小来减少计算复杂性。它通过选择池化窗口内的最大值或平均值来实现。这有助于提取最重要的特征。
图 最大池化
全连接层将提取的特征映射转化为网络的最终输出。这可以是一个分类标签、回归值或其他任务的结果。整个模型训练完毕。
所有过程可以用下图来表示:
因为代码比较多和繁琐,做了个比较粗糙的流程图供观赏~
数据预处理主要步骤
MLP训练和可视化主要步骤
MLP和CNN在姓氏分类预测的主要步骤
我们先导入预先给定的数据(surnames.csv
)和必要的库
- import collections
- import numpy as np # 导入numpy库,用于进行数值计算和数组操作
- import pandas as pd # 导入pandas库,用于数据分析和处理
- import re
-
- from argparse import Namespace # 导入argparse模块中的Namespace类,用于创建命名空间对象
使用Namespace
创建一个名为args
的对象存储数据集路径、数据集分割比例、输出路径和随机数种子等参数。
- # 定义一个Namespace对象,用于存储参数
- args = Namespace(
- # 指定原始数据集的CSV文件路径
- raw_dataset_csv="data/surnames/surnames.csv",
- # 指定训练集占总数据集的比例
- train_proportion=0.7,
- # 指定验证集占总数据集的比例
- val_proportion=0.15,
- # 指定测试集占总数据集的比例
- test_proportion=0.15,
- # 指定处理后的数据集的CSV文件输出路径
- output_munged_csv="data/surnames/surnames_with_splits.csv",
- # 指定随机数生成器的种子,用于确保数据集分割的可重复性
- seed=1337
- )
读取原始数据,使用head
函数查看数据集的前几行后,使用collections.defaultdict
创建一个按国籍分组的字典,并将数据集拆分构建最终数据列表
- # Read raw data
- surnames = pd.read_csv(args.raw_dataset_csv, header=0)# 读取CSV文件
- surnames.head() # 返回DataFrame的前5行数据
- # Unique classes
- set(surnames.nationality) # 返回一个包含所有不同国籍的集合
- # Splitting train by nationality
- # Create dict
- # 根据国籍将训练数据集拆分
- # 创建一个字典,用于根据国籍分组存储数据
- by_nationality = collections.defaultdict(list)
- for _, row in surnames.iterrows():
- by_nationality[row.nationality].append(row.to_dict())
- # 创建用于划分数据的数据列表。
- final_list = []
- # 设置随机数生成器的种子
- np.random.seed(args.seed)
- # 遍历按国籍排序的字典项。
- for _, item_list in sorted(by_nationality.items()):
- # 打乱列表顺序,以进行随机划分。
- np.random.shuffle(item_list)
- # 计算列表中的数据点总数
- n = len(item_list)
- # 根据给定比例计算训练集、验证集和测试集的大小。
- n_train = int(args.train_proportion * n)
- n_val = int(args.val_proportion * n)
- n_test = int(args.test_proportion * n)
- # 给数据点添加一个'split'属性,表示它们属于哪个数据集。
- for item in item_list[:n_train]: # 前n_train个为训练集
- item['split'] = 'train'
- for item in item_list[n_train:n_train+n_val]: # 接下来的n_val个为验证集
- item['split'] = 'val'
- for item in item_list[n_train+n_val:]: # 剩余的为测试集
- item['split'] = 'test'
- # 将当前国籍的数据点添加到最终列表中。
- final_list.extend(item_list)

将final_list
转换为pandas
的DataFrame
对象,将处理后的数据写入到指定的CSV文件中,使用to_csv
函数,并设置index=False
以避免写入索引。
- # Write split data to file
- # 最终的数据列表转换为pandas的DataFrame对象
- final_surnames = pd.DataFrame(final_list)
- # Write munged data to CSV
- # 将处理后的数据写入到CSV文件中
- final_surnames.to_csv(args.output_munged_csv, index=False)
我们对数据处理前后进行结果展示
查看处理前的数据集
查看和验证处理后的数据集
- # 统计DataFrame中'split'列的各个唯一值出现的次数
- final_surnames.split.value_counts()
- # 返回前5行数据
- final_surnames.head()
可以看见,数据排列更加整齐了~
在应用之前,我们先通过可视化结果观察一下多层感知机的训练过程^-^。
老规矩,导入必要的库。并定义个全局变量标签集合LABELS
和数据点的中心位置CENTERS。
- import torch # 导入torch库,用于构建和训练神经网络模型
- import torch.nn as nn # 导入torch.nn模块,提供了构建神经网络模型所需的类和函数
- import torch.nn.functional as F # 导入torch.nn.functional模块,提供了一些非线性函数和损失函数
- import torch.optim as optim # 导入torch.optim模块,提供了优化算法
-
- import numpy as np # 导入numpy库,用于进行数值计算和数组操作
- import matplotlib.pyplot as plt
- %matplotlib inline
- LABELS = [0, 0, 1, 1]
- CENTERS = [(-3, -3), (3, 3), (3, -3), (-3, 3)]
定义多层感知器类(MultilayerPerceptron
),用于构建多层感知器模型
- class MultilayerPerceptron(nn.Module):
- """
- 初始化多层感知机的权重。
- """
- def __init__(self, input_size, hidden_size=2, output_size=3,
- num_hidden_layers=1, hidden_activation=nn.Sigmoid):
- """
- 初始化多层感知机的构造函数。
- 参数:
- input_size (int): 输入层的大小
- hidden_size (int): 隐藏层的大小
- output_size (int): 输出层的大小
- num_hidden_layers (int): 隐藏层的数量
- hidden_activation (torch.nn.*): 隐藏层使用的激活函数类
- """
- super(MultilayerPerceptron, self).__init__() # 调用基类的构造函数
- self.module_list = nn.ModuleList() # 创建一个模块列表
-
- interim_input_size = input_size # 临时输入大小初始化为输入层大小
- interim_output_size = hidden_size # 临时输出大小初始化为隐藏层大小
-
- # 为每个隐藏层创建一个线性层和激活函数层,并添加到模块列表中
- for _ in range(num_hidden_layers):
- self.module_list.append(nn.Linear(interim_input_size, interim_output_size))
- self.module_list.append(hidden_activation())
- interim_input_size = interim_output_size # 更新临时输入大小为下一层的输出大小
-
- # 创建最终的全连接层,将最后一个隐藏层的输出映射到输出层
- self.fc_final = nn.Linear(interim_input_size, output_size)
-
- # 用于存储前向传播过程中的中间数据的列表
- self.last_forward_cache = []
-
- def forward(self, x, apply_softmax=False):
- """MLP的前向传播
-
- 参数:
- x (torch.Tensor): 输入数据张量
- apply_softmax (bool): 是否应用softmax激活函数
- 如果与交叉熵损失一起使用,则应为False
- 返回:
- 结果张量,张量形状应为(batch, output_dim)
- """
- self.last_forward_cache = []
- # 将输入数据存储为NumPy数组,并转换为CPU张量
- self.last_forward_cache.append(x.to("cpu").numpy())
-
- # 遍历模块列表,应用每个层的计算
- for module in self.module_list:
- x = module(x)
- # 将每层的输出存储为NumPy数组
- self.last_forward_cache.append(x.to("cpu").data.numpy())
-
- # 通过最终的全连接层
- output = self.fc_final(x)
- # 存储最终输出
- self.last_forward_cache.append(output.to("cpu").data.numpy())
-
- # 如果需要,应用softmax激活函数
- if apply_softmax:
- output = F.softmax(output, dim=1)
-
- # 返回模型的输出
- return output

定义数据生成函数(get_toy_data
),用于训练和测试多层感知器。
- def get_toy_data(batch_size):
- """
- 生成一个简单的数据集。
-
- 参数:
- batch_size (int): 要生成的数据批次的大小
-
- 返回:
- x_data (torch.Tensor): 输入数据张量,包含模拟的特征
- y_targets (torch.Tensor): 目标张量,包含每个输入数据点的标签
- """
-
- assert len(CENTERS) == len(LABELS), 'centers should have equal number labels'
-
- # 初始化存储输入数据的列表
- x_data = []
- # 初始化目标标签数组,大小为batch_size,初始值为0
- y_targets = np.zeros(batch_size)
- # 获取中心点的数量
- n_centers = len(CENTERS)
-
- # 遍历批次中的每个样本
- for batch_i in range(batch_size):
- # 随机选择一个中心点的索引
- center_idx = np.random.randint(0, n_centers)
- # 从以中心点为中心的正态分布中生成一个数据点,并添加到x_data列表
- x_data.append(np.random.normal(loc=CENTERS[center_idx]))
- # 将当前样本的标签设置为对应的中心点标签
- y_targets[batch_i] = LABELS[center_idx]
-
- # 将输入数据列表转换为PyTorch张量,数据类型为float32
- x_data = torch.tensor(x_data, dtype=torch.float32)
- # 将目标标签数组转换为PyTorch张量,数据类型为int64
- y_targets = torch.tensor(y_targets, dtype=torch.int64)
-
- # 返回生成的数据和标签张量
- return x_data, y_targets

定义结果可视化函数(visualize_results
),用于可视化训练后的模型分类结果和决策边界。
- def visualize_results(perceptron, x_data, y_truth, n_samples=1000, ax=None, epoch=None,
- title='', levels=[0.3, 0.4, 0.5], linestyles=['--', '-', '--']):
- """
- 可视化多层感知机的分类结果。
- 参数:
- perceptron : 训练好的多层感知机模型
- x_data: 输入特征的数据张量
- y_truth: 实际标签的数据张量
- n_samples: 用于绘制的样本数量
- ax: 绘图的轴对象,如果为None则创建新的
- epoch: 当前训练世代,如果提供则显示在图上
- title: 图的标题
- levels: 用于绘制分类边界的阈值
- linestyles: 用于分类边界的线条样式
- """
- # 前向传播模型并获取预测的最大概率和对应的标签
- _, y_pred = perceptron(x_data, apply_softmax=True).max(dim=1)
- y_pred = y_pred.data.numpy() # 将预测的标签转换为NumPy数组
-
- x_data = x_data.data.numpy() # 将输入特征转换为NumPy数组
- y_truth = y_truth.data.numpy() # 将实际标签转换为NumPy数组
-
- n_classes = len(set(LABELS)) # 获取类别数量
-
- # 初始化每个类别的x数据和颜色列表
- all_x = [[] for _ in range(n_classes)]
- all_colors = [[] for _ in range(n_classes)]
-
- # 设置颜色和标记
- colors = ['black', 'white']
- markers = ['o', '*']
-
- # 遍历数据点,根据预测和真实标签填充颜色和x数据列表
- for x_i, y_pred_i, y_true_i in zip(x_data, y_pred, y_truth):
- all_x[y_true_i].append(x_i)
- # 如果预测标签与真实标签一致,则使用白色,否则使用黑色
- if y_pred_i == y_true_i:
- all_colors[y_true_i].append("white")
- else:
- all_colors[y_true_i].append("black")
- #all_colors[y_true_i].append(colors[y_pred_i])
-
- # 将列表转换为NumPy数组
- all_x = [np.stack(x_list) for x_list in all_x]
-
- # 如果没有提供ax,则创建一个新的绘图轴
- if ax is None:
- _, ax = plt.subplots(1, 1, figsize=(10,10))
-
- # 为每个类别绘制散点图
- for x_list, color_list, marker in zip(all_x, all_colors, markers):
- ax.scatter(x_list[:, 0], x_list[:, 1], edgecolor="black", marker=marker,
- facecolor=color_list, s=100)
-
- # 设置x和y轴的限制
- xlim = (min([x_list[:,0].min() for x_list in all_x]),
- max([x_list[:,0].max() for x_list in all_x]))
- ylim = (min([x_list[:,1].min() for x_list in all_x]),
- max([x_list[:,1].max() for x_list in all_x]))
-
- # 绘制分类超平面
- xx = np.linspace(xlim[0], xlim[1], 30)
- yy = np.linspace(ylim[0], ylim[1], 30)
- YY, XX = np.meshgrid(yy, xx)
- xy = np.vstack([XX.ravel(), YY.ravel()]).T
-
- for i in range(n_classes):
- # 计算超矩形区域内每个点的预测概率
- Z = perceptron(torch.tensor(xy, dtype=torch.float32),
- apply_softmax=True)
- Z = Z[:, i].data.numpy().reshape(XX.shape)
- # 绘制分类边界
- ax.contour(XX, YY, Z, colors=colors[i], levels=levels, linestyles=linestyles)
-
- # 绘图美化
- plt.suptitle(title)
-
- # 如果提供了epoch,则在图上显示
- if epoch is not None:
- plt.text(xlim[0], ylim[1], "Epoch = {}".format(str(epoch)))

生成模拟数据集并得到初始化数据结果
- # 设置随机种子以确保结果的可复现性
- seed = 24
- torch.manual_seed(seed) # 设置PyTorch的随机种子
- torch.cuda.manual_seed_all(seed) # 如果使用CUDA,设置CUDA的随机种子
- np.random.seed(seed) # 设置NumPy的随机种子
-
- # 生成模拟数据集
- x_data, y_truth = get_toy_data(batch_size=1000)
-
- # 将数据转换为NumPy数组
- x_data = x_data.data.numpy()
- y_truth = y_truth.data.numpy().astype(np.int64) # 转换为64位整数
-
- # 计算不同类别的数量
- n_classes = len(set(LABELS))
-
- # 初始化每个类别的数据和颜色列表
- all_x = [[] for _ in range(n_classes)]
- all_colors = [[] for _ in range(n_classes)]
-
- # 设置颜色和标记样式
- colors = ['black', 'white']
- markers = ['o', '*']
-
- # 根据真实标签将数据点分组并分配颜色
- for x_i, y_true_i in zip(x_data, y_truth):
- all_x[y_true_i].append(x_i) # 将数据点添加到对应类别的列表中
- all_colors[y_true_i].append(colors[y_true_i]) # 根据类别索引分配颜色
-
- # 将列表转换为NumPy数组
- all_x = [np.stack(x_list) for x_list in all_x]
-
- # 创建绘图轴
- _, ax = plt.subplots(1, 1, figsize=(10,5))
-
- # 绘制散点图
- for x_list, color_list, marker in zip(all_x, all_colors, markers):
- ax.scatter(x_list[:, 0], x_list[:, 1], edgecolor='black', marker=marker,
- facecolor="white", s=100) # 绘制带有白色填充和黑色边缘的散点图
-
- plt.tight_layout() # 自动调整子图参数,使之填充整个图像区域
- plt.axis('off') # 不显示坐标轴
-
- plt.title("");
-
- plt.savefig("images/data.png")
- plt.savefig("images/data.pdf")

根据不同的配置初始化多个多层感知器模型,逐步增加网络的复杂性(隐藏层数量从没有到有两个)这里只写了隐藏层数量为0的模型,其他模型只需改动隐藏层数量即可。
- # 设置输入层的大小为2
- input_size = 2
- # 设置输出层的大小为标签集合的大小
- output_size = len(set(LABELS))
- # 设置隐藏层的数量为0
- num_hidden_layers = 0
- ## 设置隐藏层的数量为1
- # num_hidden_layers = 1
- ## 设置隐藏层的数量为2
- # num_hidden_layers = 2
- hidden_size = 2 # isn't ever used but we still set it
- # 设置随机种子以确保结果的可复现性
- seed = 24
-
- torch.manual_seed(seed) # 设置PyTorch的随机种子
- torch.cuda.manual_seed_all(seed) # 如果使用CUDA,设置CUDA的随机种子
- np.random.seed(seed) # 设置NumPy的随机种子
-
- # 初始化一个多层感知机
- mlp1 = MultilayerPerceptron(
- input_size=input_size, # 输入层大小
- hidden_size=hidden_size, # 隐藏层大小
- num_hidden_layers=num_hidden_layers, # 隐藏层数量
- output_size=output_size # 输出层大小
- )
- # 打印多层感知机的模型结构
- print(mlp1)
-
- # 设置批量大小
- batch_size = 1000
-
- # 生成模拟数据集
- x_data_static, y_truth_static = get_toy_data(batch_size)
-
- # 创建绘图轴
- fig, ax = plt.subplots(1, 1, figsize=(10,5))
-
- # 使用visualize_results函数可视化初始多层感知机的状态
- visualize_results(
- mlp1, # 多层感知机模型
- x_data_static, # 输入数据
- y_truth_static, # 真实标签
- ax=ax, # 绘图轴
- title='Initial Perceptron State', # 图像标题
- levels=[0.5] # 分类边界的阈值
- )
-
- # 不显示坐标轴
- plt.axis('off')
-
- # 保存图像
- plt.savefig('images/perceptron_initial.png')

通过多个训练周期,使用Adam优化器和交叉熵损失函数训练模型。训练完成后,保存最终的模型分类结果和决策边界的可视化图像。
- # 初始化
- losses = []
-
- # 设置批量大小
- batch_size = 10000
- # 设置批量数量
- n_batches = 10
- # 设置最大训练世代数
- max_epochs = 10
-
- # 初始化损失变化量和上一次的损失
- loss_change = 1.0
- last_loss = 10.0
- # 设置损失变化的阈值
- change_threshold = 1e-3
-
- # 初始化
- epoch = 0
- all_imagefiles = []
-
- # 设置学习率
- lr = 0.01
- # 初始化Adam优化器
- optimizer = optim.Adam(params=mlp1.parameters(), lr=lr)
- # 初始化交叉熵损失函数
- cross_ent_loss = nn.CrossEntropyLoss()
-
- # 定义早期停止的函数
- def early_termination(loss_change, change_threshold, epoch, max_epochs):
- # 根据损失变化和世代数决定是否停止训练
- terminate_for_loss_change = loss_change < change_threshold
- terminate_for_epochs = epoch > max_epochs
- # 如果损失变化小于阈值或超过最大世代数,则返回True停止训练
- return terminate_for_loss_change or terminate_for_epochs
-
- # 训练循环
- while not early_termination(loss_change, change_threshold, epoch, max_epochs):
- # 对每个批次的数据进行训练
- for _ in range(n_batches):
- # step 0: 获取数据
- x_data, y_target = get_toy_data(batch_size)
-
- # step 1: 将梯度置零
- mlp1.zero_grad()
-
- # step 2: 执行前向传播
- y_pred = mlp1(x_data).squeeze() # 去除单维度的批次维度
-
- # step 3: 计算损失
- loss = cross_ent_loss(y_pred, y_target.long()) # 目标标签转换为长整型
-
- # step 4: 执行反向传播
- loss.backward()
-
- # step 5: 优化器进行优化步骤
- optimizer.step()
-
- # 辅助步骤:记录和更新损失信息
- loss_value = loss.item()
- losses.append(loss_value)
- loss_change = abs(last_loss - loss_value) # 计算损失的变化量
- last_loss = loss_value # 更新上一次的损失
-
- # 可视化
- fig, ax = plt.subplots(1, 1, figsize=(10,5))
- visualize_results(
- mlp1, # 多层感知机模型
- x_data_static, # 静态输入数据
- y_truth_static, # 静态真实标签
- ax=ax, # 绘图轴
- epoch=epoch,
- title=f"{loss_value:0.2f}; {loss_change:0.4f}" # 图像标题显示损失和损失变化
- )
- plt.axis('off') # 不显示坐标轴
- epoch += 1 # 更新
- # 保存图像
- all_imagefiles.append(f'images/perceptron_epoch{epoch}_toylearning.png')
- plt.savefig(all_imagefiles[-1])

可视化对比不同配置的多层感知器模型的分类结果
- _, axes = plt.subplots(1,3,figsize=(16,5))
- # 对比
- visualize_results(mlp1, x_data_static, y_truth_static, epoch=None, levels=[0.5], ax=axes[0])
- visualize_results(mlp2, x_data_static, y_truth_static, epoch=None, levels=[0.5], ax=axes[1])
- visualize_results(mlp3, x_data_static, y_truth_static, epoch=None, levels=[0.5], ax=axes[2])
- plt.tight_layout()
- axes[0].axis('off');
- axes[1].axis('off');
- axes[2].axis('off');
- plt.savefig("images/perceptron_vs_mlp2_vs_mlp3.png")
通过对比我们发现,具有一个或多个隐藏层的MLP能够更好地拟合数据,并且可能在分类任务上获得更高的准确率。这是因为每增加一个隐藏层,模型就增加了更多的参数,从而提高了其表达能力。
可视化模型中间层的激活值,展示不同层如何学习和转换数据。
- batch_size = 100
- # 绘制多层感知机(MLP)模型的中间表示(特征)
- def plot_intermediate_representations(mlp_model, plot_title, figsize=(10, 2)):
- # 获取模拟数据集
- x_data, y_target = get_toy_data(batch_size)
-
- # 使用模型进行预测,并将输出转换为NumPy数组
- y_pred = mlp_model(x_data, True).detach().numpy()
-
- # 将输入数据和目标标签转换为NumPy数组
- x_data = x_data.numpy()
- y_target = y_target.numpy()
-
- # 定义用于不同类别的颜色和标记
- colors = ['black', 'white']
- markers = ['o', '*']
-
- # 初始化用于存储不同类别数据点索引的列表
- class_zero_indices = []
- class_one_indices = []
- for i in range(y_target.shape[0]):
- # 根据目标标签将数据点的索引分配到相应的类别列表中
- if y_target[i] == 0:
- class_zero_indices.append(i)
- else:
- class_one_indices.append(i)
-
- # 将列表转换为NumPy数组
- class_zero_indices = np.array(class_zero_indices)
- class_one_indices = np.array(class_one_indices)
-
- # 创建一个图形和多个子图轴,数量等于模型的中间表示的数量加1(包括输入层)
- fig, axes = plt.subplots(1, len(mlp_model.last_forward_cache), figsize=figsize)
-
- # 对每个类别的数据点进行绘制
- for class_index, data_indices in enumerate([class_zero_indices, class_one_indices]):
- # 在第一个子图上绘制输入数据
- axes[0].scatter(
- x_data[data_indices, 0],
- x_data[data_indices, 1],
- edgecolor='black',
- facecolor="white",
- marker=markers[class_index],
- s=[200, 400][class_index] # 根据类别调整点的大小
- )
- axes[0].axis('off') # 关闭坐标轴显示
-
- # 在后续的子图上绘制每一层的激活值
- for i, activations in enumerate(mlp_model.last_forward_cache[1:], 1):
- axes[i].scatter(
- activations[data_indices, 0],
- activations[data_indices, 1],
- edgecolor='black',
- facecolor="white",
- marker=markers[class_index],
- s=[200, 400][class_index] # 根据类别调整点的大小
- )
- axes[i].axis('off') # 关闭坐标轴显示
-
- # 自动调整子图布局以避免重叠
- plt.tight_layout()
-
- # 设置图形的总标题
- plt.suptitle(plot_title, size=15)
- # 调整子图之间的间距
- plt.subplots_adjust(top=0.75)
- # 调用函数绘制mlp1模型的中间表示
- plot_intermediate_representations(
- mlp1, # 传入mlp1模型
- "The Perceptron's Input and Intermediate Representation", # 设置绘图的标题
- figsize=(9, 3) # 设置图形的尺寸
- )
-
- # 图形保存
- plt.savefig("images/perceptron_intermediate.png")
- plt.savefig("images/figure_4_5.pdf")
- # 调用函数绘制mlp2模型的中间表示
- plot_intermediate_representations(mlp2,
- "A 2-layer MLP's Input and Intermediate Representation",
- figsize=(10, 3))
- # 图形保存
- plt.savefig("images/mlp2_intermediate.png")
- plt.savefig("images/figure_4_4.pdf")
- # 调用函数绘制mlp3模型的中间表示
- plot_intermediate_representations(mlp3,
- "The 3-layer Multilayer Perceptron's Input and Intermediate Representation",
- figsize=(13, 3))
- plt.savefig("images/mlp3_intermediate.png")
- plt.savefig("images/mlp3_intermediate.pdf")

通过对比我们发现,在中间表示的可视化中,较低层表示可能与输入数据相似,而在更高层表示可能捕捉到更高级的特征
终于到了多层感知器的应用模块!
导入库
- from argparse import Namespace # 导入argparse模块中的Namespace类,用于创建命名空间对象
- from collections import Counter # 导入collections模块中的Counter类,用于计数和统计元素出现的次数
- import json # 导入json模块,用于处理JSON格式的数据
- import os # 导入os模块,用于进行文件和目录操作
- import string # 导入string模块,包含了用于字符串操作的常用字符集合
-
- import numpy as np # 导入numpy库,用于进行数值计算和数组操作
- import pandas as pd # 导入pandas库,用于数据分析和处理
-
- import torch # 导入torch库,用于构建和训练神经网络模型
- import torch.nn as nn # 导入torch.nn模块,提供了构建神经网络模型所需的类和函数
- import torch.nn.functional as F # 导入torch.nn.functional模块,提供了一些非线性函数和损失函数
- import torch.optim as optim # 导入torch.optim模块,提供了优化算法
- from torch.utils.data import Dataset, DataLoader # 从torch.utils.data模块导入Dataset和DataLoader类,用于自定义数据集和数据加载
- from tqdm import tqdm_notebook # 导入tqdm模块中的tqdm_notebook函数,用于显示进度条
定义词汇表类(Vocabulary
),用于处理文本数据并建立词汇表映射
- class Vocabulary(object):
- """Class to process text and extract vocabulary for mapping"""
- # 处理文本数据并提取词汇表映射
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- # """
- # Args:
- # token_to_idx (dict): 一个预先存在的标记到索引的映射字典
- # add_unk (bool): 一个指示是否添加UNK标记的标志
- # unk_token (str): 要添加到词汇表中的UNK标记
- # """
- """
- Args:
- token_to_idx (dict): a pre-existing map of tokens to indices
- add_unk (bool): a flag that indicates whether to add the UNK token
- unk_token (str): the UNK token to add into the Vocabulary
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- # """返回可序列化的字典"""
- """ returns a dictionary that can be serialized """
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- # """从可序列化的字典实例化Vocabulary"""
- """ instantiates the Vocabulary from a serialized dictionary """
- return cls(**contents)
-
- def add_token(self, token):
- # """根据标记更新映射字典
-
- # Args:
- # token (str): 要添加到词汇表的标记
- # Returns:
- # index (int): 与标记对应的整数索引
- # """
- """Update mapping dicts based on the token.
- Args:
- token (str): the item to add into the Vocabulary
- Returns:
- index (int): the integer corresponding to the token
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- # """根据标记更新映射字典
-
- # Args:
- # token (str): 要添加到词汇表的标记
- # Returns:
- # index (int): 与标记对应的整数索引
- # """
- """Add a list of tokens into the Vocabulary
-
- Args:
- tokens (list): a list of string tokens
- Returns:
- indices (list): a list of indices corresponding to the tokens
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- # """检索与标记相关联的索引,如果标记不存在则返回UNK索引
-
- # Args:
- # token (str): 要查找的标记
- # Returns:
- # index (int): 与标记对应的索引
- # Notes:
- # 对于UNK功能,`unk_index`需要是>=0的(已添加到词汇表中)
- # """
- """Retrieve the index associated with the token
- or the UNK index if token isn't present.
-
- Args:
- token (str): the token to look up
- Returns:
- index (int): the index corresponding to the token
- Notes:
- `unk_index` needs to be >=0 (having been added into the Vocabulary)
- for the UNK functionality
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- # """返回与索引相关联的标记
-
- # Args:
- # index (int): 要查找的索引
- # Returns:
- # token (str): 与索引对应的标记
- # Raises:
- # KeyError: 如果索引不在词汇表中
- # """
- """Return the token associated with the index
-
- Args:
- index (int): the index to look up
- Returns:
- token (str): the token corresponding to the index
- Raises:
- KeyError: if the index is not in the Vocabulary
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)

定义向量化器类(SurnameVectorizer
),用于将姓氏文本数据转换为数值向量
- class SurnameVectorizer(object):
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
- # """
- # 一个向量化器,它协调Vocabulary的使用,并将它们应用于数据。
- # """
- def __init__(self, surname_vocab, nationality_vocab):
- # """
- # 构造函数,初始化姓氏和国籍的Vocabulary。
-
- # 参数:
- # surname_vocab (Vocabulary): 将字符映射到整数的Vocabulary
- # nationality_vocab (Vocabulary): 将国籍映射到整数的Vocabulary
- # """
- """
- Args:
- surname_vocab (Vocabulary): maps characters to integers
- nationality_vocab (Vocabulary): maps nationalities to integers
- """
- self.surname_vocab = surname_vocab # 姓氏的词汇表
- self.nationality_vocab = nationality_vocab # 国籍的词汇表
-
- def vectorize(self, surname):
- """
- Args:
- surname (str): the surname
- Returns:
- one_hot (np.ndarray): a collapsed one-hot encoding
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """Instantiate the vectorizer from the dataset dataframe
-
- Args:
- surname_df (pandas.DataFrame): the surnames dataset
- Returns:
- an instance of the SurnameVectorizer
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}

定义数据集类(SurnameDataset
),用于封装姓氏数据集
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- surname_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameVectorizer
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's:
- features (x_surname)
- label (y_nationality)
- """
- row = self._target_df.iloc[index]
-
- surname_vector = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict

定义模型类(SurnameClassifier
):实现一个多层感知器模型,用于分类任务
- class SurnameClassifier(nn.Module):
- """ A 2-layer Multilayer Perceptron for classifying surnames """
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the classifier
-
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

定义初始化训练状态(make_train_state)、更新训练状态函数(update_train_state)、定义准确率计算函数(compute_accuracy
)
- def make_train_state(args):
- # """
- # 创建训练状态字典
-
- # Args:
- # args: 主要参数
-
- # Returns:
- # train_state: 训练状态字典
- # """
- return {'stop_early': False, # 是否提前停止训练的标志
- 'early_stopping_step': 0, # 提前停止训练的步数
- 'early_stopping_best_val': 1e8, # 最佳验证集损失
- 'learning_rate': args.learning_rate, # 学习率
- 'epoch_index': 0, # 当前训练的轮数
- 'train_loss': [], # 训练集损失记录
- 'train_acc': [], # 训练集准确率记录
- 'val_loss': [], # 验证集损失记录
- 'val_acc': [], # 验证集准确率记录
- 'test_loss': -1, # 测试集损失
- 'test_acc': -1, # 测试集准确率
- 'model_filename': args.model_state_file} # 模型保存文件名
-
- def update_train_state(args, model, train_state):
- # """
- # 更新训练状态
-
- # Components:
- # - 提前停止:防止过拟合。
- # - 模型检查点:如果模型更好,则保存模型。
-
- # Args:
- # args: 主要参数
- # model: 要训练的模型
- # train_state: 表示训练状态的字典
-
- # Returns:
- # train_state: 更新后的训练状态字典
- # """
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- # 至少保存一个模型
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- # 如果性能提高,则保存模型
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- # 如果损失变差
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- # 更新步数
- train_state['early_stopping_step'] += 1
- # Loss decreased
- # 损失减少
- else:
- # Save the best model
- # 保存最佳模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- # 重置提前停止步数
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- # 是否提前停止?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- def compute_accuracy(y_pred, y_target):
- # """
- # 计算准确率
-
- # Args:
- # y_pred: 预测的类别概率
- # y_target: 真实的类别标签
-
- # Returns:
- # accuracy: 准确率(百分比)
- # """
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100

定义随机种子设置函数(set_seed_everywhere)和目录处理函数(handle_dirs),分别确保可重复性和确保模型保存和其他文件操作的路径有效。
- def set_seed_everywhere(seed, cuda):
- # """
- # 设置随机种子,以确保实验的可重复性。
-
- # 参数:
- # seed (int): 随机种子的值
- # cuda (bool): 是否在CUDA环境中运行
- # """
- np.random.seed(seed) # 设置numpy的随机种子
- torch.manual_seed(seed) # 设置PyTorch的随机种子
- if cuda: # 如果在CUDA环境中运行
- torch.cuda.manual_seed_all(seed) # 设置所有CUDA设备的随机种子
-
- def handle_dirs(dirpath):
- # """
- # 处理目录路径,如果目录不存在则创建。
-
- # 参数:
- # dirpath (str): 要检查或创建的目录路径
- # """
- if not os.path.exists(dirpath): # 检查目录是否存在
- os.makedirs(dirpath) # 如果不存在,则创建目录

使用argparse.Namespace
定义和设置训练过程中的参数,并根据需要,更新文件路径以指向保存模型和向量化器的目录。检查CUDA可用性后,根据是否从文件重新加载数据,创建新的数据集和向量化器或从文件加载。
- # 定义命令行参数对象args,包含程序运行所需的各种配置参数
- args = Namespace(
- # 数据和路径信息
- surname_csv="data/surnames/surnames_with_splits.csv", # 姓氏数据集csv文件路径
- vectorizer_file="vectorizer.json", # 向量化器文件路径
- model_state_file="model.pth", # 模型状态文件路径
- save_dir="model_storage/ch4/surname_mlp", # 存储模型和向量化器的目录
-
- # 模型超参数
- hidden_dim=300, # 隐藏层维度
-
- # 训练超参数
- seed=1337, # 随机种子,用于结果可复现
- num_epochs=100, # 训练迭代的轮数
- early_stopping_criteria=5, # 早停法标准
- learning_rate=0.001, # 学习率
- batch_size=64, # 每批数据的大小
-
- # 运行时选项
- cuda=False, # 是否使用CUDA
- reload_from_files=False, # 是否从文件中重新加载数据
- expand_filepaths_to_save_dir=True, # 是否将文件路径扩展到保存目录
- )
-
- # 如果设置了expand_filepaths_to_save_dir,则将文件路径与保存目录合并
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file) # 更新向量化器文件路径
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file) # 更新模型状态文件路径
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file)) # 打印更新后的向量化器文件路径
- print("\t{}".format(args.model_state_file)) # 打印更新后的模型状态文件路径
-
- # 检查CUDA是否可用
- if not torch.cuda.is_available(): # 如果CUDA不可用
- args.cuda = False # 设置args中的cuda为False
-
- # 设置设备,根据是否使用CUDA决定使用CPU或GPU
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- # 打印是否使用CUDA
- print("Using CUDA: {}".format(args.cuda))
-
- # 为结果可复现设置随机种子
- set_seed_everywhere(args.seed, args.cuda)
-
- # 处理目录,确保保存目录存在
- handle_dirs(args.save_dir)
- # 检查是否从文件重新加载数据
- if args.reload_from_files:
- # 如果设置了从文件重新加载,则执行以下操作
- print("Reloading!")
- # 使用已加载的向量化器来加载数据集
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(
- args.surname_csv, # 传入姓氏数据集的csv文件路径
- args.vectorizer_file # 传入向量化器文件路径
- )
- else:
- # 如果没有设置从文件重新加载,则从头开始创建数据集和向量化器
- print("Creating fresh!")
- # 创建新的数据集和向量化器
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- # 保存新创建的向量化器到文件
- dataset.save_vectorizer(args.vectorizer_file)
-
- # 获取数据集的向量化器
- vectorizer = dataset.get_vectorizer()
-
- # 初始化分类器模型,传入参数
- # input_dim是向量化器中姓氏词汇表的大小,用于确定输入层的大小
- # hidden_dim是模型隐藏层的维度,从args中获取
- # output_dim是向量化器中国籍词汇表的大小,用于确定输出层的大小
- classifier = SurnameClassifier(
- input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab)
- )

初始化MLP模型,设置损失函数和优化器后,进行多个训练周期,每个周期包括: a. 清零梯度。 b. 计算模型输出和损失。 c. 反向传播和优化器步骤 。训练中根据验证集损失,实现早停来避免过拟合。
- # 将分类器模型移动到指定的设备(CPU或GPU)
- classifier = classifier.to(args.device)
-
- # 将数据集的类别权重移动到指定的设备(CPU或GPU)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 初始化交叉熵损失函数,传入数据集的类别权重作为权重参数
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 初始化Adam优化器,传入分类器的参数和学习率
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
-
- # 初始化学习率调度器,使用ReduceLROnPlateau策略
- # 当验证集上的性能不再提升时,降低学习率
- # mode='min'表示我们希望最小化损失函数
- # factor=0.5表示每次降低学习率时乘以0.5
- # patience=1表示在没有性能提升时等待1个epoch后降低学习率
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(
- optimizer=optimizer,
- mode='min',
- factor=0.5,
- patience=1
- )
-
- # 创建训练状态对象,传入命令行参数
- train_state = make_train_state(args)
-
- # 初始化总的训练进度条,显示训练轮数
- epoch_bar = tqdm_notebook(
- desc='training routine', # 进度条描述
- total=args.num_epochs, # 总轮数
- position=0 # 进度条位置
- )
-
- # 设置数据集为训练集
- dataset.set_split('train')
-
- # 初始化训练集进度条,显示每个epoch中的批次数量
- train_bar = tqdm_notebook(
- desc='split=train', # 进度条描述
- total=dataset.get_num_batches(args.batch_size), # 计算批次数量
- position=1, # 进度条位置
- leave=True # 训练完成后保留进度条
- )
-
- # 设置数据集为验证集
- dataset.set_split('val')
-
- # 初始化验证集进度条,显示每个epoch中的批次数量
- val_bar = tqdm_notebook(
- desc='split=val', # 进度条描述
- total=dataset.get_num_batches(args.batch_size), # 计算批次数量
- position=1, # 进度条位置
- leave=True # 验证完成后保留进度条
- )
-
-
- try:
- for epoch_index in range(args.num_epochs): # 遍历所有epoch
- train_state['epoch_index'] = epoch_index # 更新训练状态中的epoch索引
-
- # 训练数据集迭代
- dataset.set_split('train') # 设置数据集为训练集
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device) # 生成批次数据生成器
- running_loss = 0.0 # 初始化训练损失
- running_acc = 0.0 # 初始化训练准确率
- classifier.train() # 将模型设置为训练模式
-
- for batch_index, batch_dict in enumerate(batch_generator): # 遍历所有批次
- # 步骤 1: 清零梯度
- optimizer.zero_grad()
-
- # 步骤 2: 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 步骤 3: 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item() # 获取损失的数值
- running_loss += (loss_t - running_loss) / (batch_index + 1) # 计算平均损失
-
- # 步骤 4: 使用损失来产生梯度
- loss.backward()
-
- # 步骤 5: 使用优化器进行梯度下降
- optimizer.step()
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1) # 计算平均准确率
-
- # 更新训练进度条
- train_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
- train_bar.update()
-
- # 存储训练损失和准确率
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # 验证数据集迭代
- dataset.set_split('val') # 设置数据集为验证集
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device) # 生成批次数据生成器
- running_loss = 0. # 初始化验证损失
- running_acc = 0. # 初始化验证准确率
- classifier.eval() # 将模型设置为评估模式
-
- for batch_index, batch_dict in enumerate(batch_generator): # 遍历所有批次
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 步骤 3: 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item() # 确保损失在CPU上计算
- running_loss += (loss_t - running_loss) / (batch_index + 1) # 计算平均损失
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1) # 计算平均准确率
-
- # 更新验证进度条
- val_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
- val_bar.update()
-
- # 存储验证损失和准确率
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- # 更新训练状态
- train_state = update_train_state(args=args, model=classifier, train_state=train_state)
-
- # 根据验证损失调整学习率
- scheduler.step(train_state['val_loss'][-1])
-
- # 如果满足早停条件,则退出循环
- if train_state['stop_early']:
- break
-
- # 重置进度条
- train_bar.n = 0
- val_bar.n = 0
- # 更新总进度条
- epoch_bar.update()
-
- except KeyboardInterrupt:
- print("Exiting loop")

在验证集和测试集上评估模型性能。
- # compute the loss & accuracy on the test set using the best available model
- # 加载之前训练中保存的最佳模型状态
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- # 将模型移动到指定的设备(CPU或GPU)
- classifier = classifier.to(args.device)
-
- # 将类别权重移动到与模型相同的设备
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 初始化交叉熵损失函数,传入数据集的类别权重作为权重参数
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 设置数据集为测试集
- dataset.set_split('test')
-
- # 生成测试数据的批次生成器
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
-
- # 初始化测试损失和准确率
- running_loss = 0.
- running_acc = 0.
-
- # 将模型设置为评估模式
- classifier.eval()
-
- # 遍历测试数据的所有批次
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算模型输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item() # 获取损失的具体数值
- # 计算平均损失
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- # 计算平均准确率
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 将测试损失和准确率存储到训练状态中
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc

这个结果看着效果好像不太好...准确率不到50%?!
终于可以应用了:D
首先定义预测函数(predict_nationality),用于预测新姓氏的国籍
- # 根据姓氏预测一个人的国籍
- def predict_nationality(surname, classifier, vectorizer):
- """
- 从一个新的姓氏中预测国籍。
- 参数:
- surname (str): 要分类的姓氏
- classifier (SurnameClassifier): 分类器的一个实例
- vectorizer (SurnameVectorizer): 相应的向量化器
- 返回:
- 一个字典,包含最可能的国籍及其概率
- """
- # 使用向量化器将姓氏转换为向量
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量转换为PyTorch张量,并调整为1xN的形状
- vectorized_surname = torch.tensor(vectorized_surname).view(1, -1)
- # 使用分类器进行预测,应用softmax函数
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 从结果中获取最大概率值和对应的索引
- probability_values, indices = result.max(dim=1)
- # 获取索引对应的国籍
- index = indices.item()
-
- # 从词汇表中获取索引对应的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- # 获取最大概率值
- probability_value = probability_values.item()
-
- # 返回包含国籍和概率的字典
- return {'nationality': predicted_nationality, 'probability': probability_value}

通过输入姓氏,来显示预测的国籍和概率。
- # 获取一个姓氏
- new_surname = input("Enter a surname to classify: ")
-
- # 确保分类器在CPU上运行,因为某些设备可能不支持GPU加速
- classifier = classifier.to("cpu")
-
- # 使用之前定义的predict_nationality函数来预测输入姓氏的国籍
- # 传入新姓氏、分类器实例和向量化器实例
- prediction = predict_nationality(new_surname, classifier, vectorizer)
-
- # 打印预测结果,格式为:姓氏 -> 预测的国籍 (概率=XX.XX%)
- # 使用format函数格式化输出,保留两位小数显示概率
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
(竟然是爱尔兰人)
再定义一个函数(predict_topk_nationality),用于显示姓氏可能的前k个最可能的国籍
- # 这个函数用于预测姓氏可能的前k个最可能的国籍
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- # 将姓氏向量化
- vectorized_name = vectorizer.vectorize(name)
- # 将向量转换为PyTorch张量,并调整为1xN的形状
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- # 使用分类器进行预测,应用softmax函数
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- # 获取预测结果中概率最高的k个国籍
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 将PyTorch张量转换为NumPy数组,并取出第一个元素(因为我们的形状是1xk)
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- # 初始化一个空列表来存储预测结果
- results = []
- # 遍历概率值和对应的索引
- for prob_value, index in zip(probability_values, indices):
- # 根据索引找到对应的国籍
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- # 将国籍和概率添加到结果列表中
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- # 返回包含最可能的k个国籍及其概率的列表
- return results
-
- # 获取用户输入的姓氏
- new_surname = input("Enter a surname to classify: ")
- # 确保分类器在CPU上运行
- classifier = classifier.to("cpu")
-
- # 获取用户想要查看的预测结果数量
- k = int(input("How many of the top predictions to see? "))
- # 如果用户请求的预测结果数量超过了国籍词汇表的大小,则使用最大值
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- # 使用predict_topk_nationality函数获取前k个最可能的国籍预测
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- # 打印预测结果的数量
- print("Top {} predictions:".format(k))
- print("===================")
- # 遍历预测结果并打印每个预测的国籍和概率
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

尝试运行CNN进行一次比较~
导入库
- from argparse import Namespace # 导入argparse模块中的Namespace类,用于创建命名空间对象
- from collections import Counter # 导入collections模块中的Counter类,用于计数和统计元素出现的次数
- import json # 导入json模块,用于处理JSON格式的数据
- import os # 导入os模块,用于进行文件和目录操作
- import string # 导入string模块,包含了用于字符串操作的常用字符集合
-
- import numpy as np # 导入numpy库,用于进行数值计算和数组操作
- import pandas as pd # 导入pandas库,用于数据分析和处理
- import torch # 导入torch库,用于构建和训练神经网络模型
- import torch.nn as nn # 导入torch.nn模块,提供了构建神经网络模型所需的类和函数
- import torch.nn.functional as F # 导入torch.nn.functional模块,提供了一些非线性函数和损失函数
- import torch.optim as optim # 导入torch.optim模块,提供了优化算法
- from torch.utils.data import Dataset, DataLoader # 从torch.utils.data模块导入Dataset和DataLoader类,用于自定义数据集和数据加载
- from tqdm import tqdm_notebook # 导入tqdm模块中的tqdm_notebook函数,用于显示进度条
定义词汇表类(Vocabulary
)
- # 处理文本并提取词汇表,用于映射到索引
- class Vocabulary(object):
- """Class to process text and extract vocabulary for mapping"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 初始化词汇表对象。
- 参数:
- token_to_idx (dict): 预先存在的词汇到索引的映射字典
- add_unk (bool): 一个标志,指示是否添加未知(UNK)标记
- unk_token (str): 要添加到词汇表中的未知(UNK)标记
- """
-
- # 如果没有提供token_to_idx,则创建一个新的空字典
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 创建索引到token的反向映射
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- # 存储是否添加UNK标记和UNK标记本身
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- # 初始化UNK标记的索引,如果添加UNK标记,则添加到词汇表并获取索引
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- """返回可以序列化的字典"""
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """从序列化的字典实例化Vocabulary"""
- return cls(**contents)
-
- def add_token(self, token):
- """根据token更新映射字典。
- 参数:
- token (str): 要添加到词汇表的项目
- 返回:
- index (int): 对应于token的整数
- """
- try:
- # 如果token已存在,则返回其索引
- index = self._token_to_idx[token]
- except KeyError:
- # 如果token不存在,则添加到字典,并分配新的索引
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """将一系列token添加到词汇表中
-
- 参数:
- tokens (list): 字符串token的列表
- 返回:
- indices (list): 对应于token的索引列表
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """检索与token相关联的索引,如果token不存在,则返回UNK索引。
-
- 参数:
- token (str): 要查找的token
- 返回:
- index (int): 对应于token的索引
- 说明:
- `unk_index` 需要 >=0(已添加到词汇表中)
- 才能使用UNK功能
- """
- if self.unk_index >= 0:
- # 如果添加了UNK标记,则返回token对应的索引或UNK索引
- return self._token_to_idx.get(token, self.unk_index)
- else:
- # 如果没有添加UNK标记,则返回token对应的索引
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """返回与索引相关联的token
-
- 参数:
- index (int): 要查找的索引
- 返回:
- token (str): 对应于索引的token
- 引发:
- KeyError: 如果索引不在词汇表中
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- # 返回词汇表的字符串表示形式
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- # 返回词汇表的大小,即token的数量
- return len(self._token_to_idx)

定义向量化器类(SurnameVectorizer
)
- # SurnameVectorizer类协调使用两个词汇表,一个用于姓氏中的字符,一个用于国籍
- class SurnameVectorizer(object):
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
-
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- """
- 初始化SurnameVectorizer类的实例。
- 参数:
- surname_vocab (Vocabulary): 一个词汇表对象,用于将姓氏中的字符映射为整数
- nationality_vocab (Vocabulary): 一个词汇表对象,用于将国籍映射为整数
- max_surname_length (int): 姓氏的最大长度
- """
- self.surname_vocab = surname_vocab # 存储字符到整数的映射
- self.nationality_vocab = nationality_vocab # 存储国籍到整数的映射
- self._max_surname_length = max_surname_length # 存储最长姓氏的长度
-
- def vectorize(self, surname):
- """
- 将一个姓氏转换为一个独热编码矩阵。
- 参数:
- surname (str): 需要转换的姓氏
- 返回:
- one_hot_matrix (np.ndarray): 姓氏的独热编码矩阵
- """
- # 确定独热矩阵的大小,基于字符的总数和最长姓氏的长度
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- # 初始化一个全零的矩阵,数据类型为32位浮点数
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- # 对姓氏中的每个字符进行遍历,根据其在surname_vocab中的索引,
- # 在独热矩阵的相应位置设置为1
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- # 返回构建好的独热编码矩阵
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从pandas DataFrame数据集中实例化向量化器
-
- 参数:
- surname_df (pandas.DataFrame): 包含姓氏数据的DataFrame
- 返回:
- SurnameVectorizer的一个实例
- """
- surname_vocab = Vocabulary(unk_token="@") # 创建字符的词汇表,并添加未知字符标记"@"
- nationality_vocab = Vocabulary(add_unk=False) # 创建国籍的词汇表,不添加未知标记
- max_surname_length = 0 # 初始化最长姓氏长度为0
-
- # 遍历数据集中的每一行
- for index, row in surname_df.iterrows():
- # 更新最长姓氏长度
- max_surname_length = max(max_surname_length, len(row.surname))
- # 将姓氏中的每个字符添加到字符词汇表中
- for letter in row.surname:
- surname_vocab.add_token(letter)
- # 将每个国籍添加到国籍词汇表中
- nationality_vocab.add_token(row.nationality)
-
- # 使用从数据集中构建的词汇表和最长姓氏长度来创建向量化器实例
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- def from_serializable(cls, contents):
- # 从可序列化的字典内容创建SurnameVectorizer类的实例
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- # 将向量化器实例的状态序列化成字典
- return {
- 'surname_vocab': self.surname_vocab.to_serializable(), # 序列化字符词汇表
- 'nationality_vocab': self.nationality_vocab.to_serializable(), # 序列化国籍词汇表
- 'max_surname_length': self._max_surname_length # 序列化最长姓氏长度
- }

定义数据集类(SurnameDataset
)
- # SurnameDataset类用于封装姓氏数据集并提供数据给模型训练和评估。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- 初始化SurnameDataset类的实例。
- 参数:
- surname_df (pandas.DataFrame): 包含姓氏数据的DataFrame
- vectorizer (SurnameVectorizer): 根据数据集实例化的向量化器
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- # 根据split列的值将数据集分为训练集、验证集和测试集
- self.train_df = self.surname_df[self.surname_df.split == 'train']
- self.train_size = len(self.train_df)
- self.val_df = self.surname_df[self.surname_df.split == 'val']
- self.validation_size = len(self.val_df)
- self.test_df = self.surname_df[self.surname_df.split == 'test']
- self.test_size = len(self.test_df)
-
- # 创建一个字典,用于快速查找不同split的数据集和它们的大小
- self._lookup_dict = {
- 'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)
- }
-
- # 默认设置为训练集
- self.set_split('train')
-
- # 计算类别权重
- class_counts = surname_df.nationality.value_counts().to_dict()
- # 根据国籍的索引对类别计数进行排序
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- # 计算每个国籍的权重,权重与频率成反比
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """从零开始加载数据集并创建一个新的向量化器
-
- 参数:
- surname_csv (str): 数据集的位置
- 返回:
- SurnameDataset的一个实例
- """
- surname_df = pd.read_csv(surname_csv) # 读取CSV文件
- train_surname_df = surname_df[surname_df.split == 'train'] # 获取训练集
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df)) # 创建数据集实例
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """加载数据集和对应的向量化器,用于向量化器已经被缓存的情况
-
- 参数:
- surname_csv (str): 数据集的位置
- vectorizer_filepath (str): 保存的向量化器的位置
- 返回:
- SurnameDataset的一个实例
- """
- surname_df = pd.read_csv(surname_csv) # 读取CSV文件
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath) # 加载向量化器
- return cls(surname_df, vectorizer) # 创建数据集实例
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """从文件中加载向量化器的静态方法
-
- 参数:
- vectorizer_filepath (str): 序列化向量化器的位置
- 返回:
- SurnameVectorizer的一个实例
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp)) # 加载向量化器
-
- def save_vectorizer(self, vectorizer_filepath):
- """将向量化器保存到磁盘上,使用json格式
-
- 参数:
- vectorizer_filepath (str): 保存向量化器的位置
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp) # 序列化并保存向量化器
-
- def get_vectorizer(self):
- """返回向量化器"""
- return self._vectorizer
-
- def set_split(self, split="train"):
- """根据DataFrame中的列选择数据集的split
-
- 参数:
- split (str): 选择的数据集类型(训练集、验证集、测试集)
- """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- """返回当前选择的数据集split的大小"""
- return self._target_size
-
- def __getitem__(self, index):
- """PyTorch数据集的主要入口点方法
-
- 参数:
- index (int): 数据点的索引
- 返回:
- 一个字典,包含数据点的特征(x_data)和标签(y_target)
- """
- row = self._target_df.iloc[index] # 获取当前索引的数据行
-
- surname_matrix = self._vectorizer.vectorize(row.surname) # 向量化姓氏
-
- nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality) # 获取国籍索引
-
- return {'x_surname': surname_matrix, 'y_nationality': nationality_index} # 返回数据字典
-
- def get_num_batches(self, batch_size):
- """给定批量大小,返回数据集中的批次数量
-
- 参数:
- batch_size (int)
- 返回:
- 数据集中的批次数量
- """
- return len(self) // batch_size
-
- # 生成器函数
- # 确保每个张量都在正确的设备上。
- def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"):
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict # 产生一个数据批次

定义模型类(SurnameClassifier
)
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- 初始化SurnameClassifier类的实例。
- 参数:
- initial_num_channels (int): 输入特征向量的大小
- num_classes (int): 输出预测向量的大小,即类别数
- num_channels (int): 网络中使用的恒定通道数
- """
- super(SurnameClassifier, self).__init__() # 调用基类的初始化方法
-
- # 定义一个序列模型,包含多个卷积层和非线性激活函数
- self.convnet = nn.Sequential(
- # 第一个一维卷积层,使用3个大小的卷积核
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- nn.ELU(), # 使用ELU激活函数
- # 第二个一维卷积层,使用3个大小的卷积核,步长为2
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- # 第三个一维卷积层,同上
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- # 第四个一维卷积层,没有改变步长
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- nn.ELU()
- )
- # 定义一个全连接层,将卷积层的输出映射到类别空间
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
-
- 参数:
- x_surname (torch.Tensor): 输入数据张量
- x_surname.shape应为(batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): 是否应用softmax激活
- 如果与交叉熵损失一起使用,则应为False
- 返回:
- 结果张量,tensor.shape应为(batch, num_classes)
- """
- features = self.convnet(x_surname).squeeze(dim=2) # 通过卷积层并挤压第三维度
-
- # 通过全连接层生成预测向量
- prediction_vector = self.fc(features)
-
- # 如果需要,应用softmax激活函数
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

定义初始化训练状态(make_train_state)、更新训练状态函数(update_train_state)、定义准确率计算函数(compute_accuracy
)
- def make_train_state(args):
- """
- 创建训练状态字典的函数。
- 参数:
- args: 一个包含训练参数的对象,通常是通过解析命令行参数得到的。
-
- 返回:
- 一个包含训练过程中需要的各种状态信息的字典。
- """
- return {
- # 指示是否应该提前终止训练
- 'stop_early': False,
- # 早期停止时的训练步数
- 'early_stopping_step': 0,
- # 早期停止观察到的最好验证损失
- 'early_stopping_best_val': 1e8,
- # 当前学习率
- 'learning_rate': args.learning_rate,
- # 当前的训练世代(Epoch)索引
- 'epoch_index': 0,
- # 存储每个Epoch的训练损失值
- 'train_loss': [],
- # 存储每个Epoch的训练准确率
- 'train_acc': [],
- # 存储每个Epoch的验证损失值
- 'val_loss': [],
- # 存储每个Epoch的验证准确率
- 'val_acc': [],
- # 测试损失,初始值设为-1,直到模型在测试集上评估
- 'test_loss': -1,
- # 测试准确率,初始值设为-1,直到模型在测试集上评估
- 'test_acc': -1,
- # 模型状态文件的名称,用于保存训练过程中的最佳模型
- 'model_filename': args.model_state_file
- }
- def update_train_state(args, model, train_state):
- """
- 更新训练状态的函数。
- 组件:
- - 早期停止:防止过拟合。
- - 模型检查点:如果模型性能更好,则保存模型。
- 参数:
- args: 主要参数。
- model: 要训练的模型。
- train_state: 一个字典,代表训练状态的值。
- 返回:
- 新的训练状态字典。
- """
-
- # 至少保存一个模型
- if train_state['epoch_index'] == 0:
- # 在第一个Epoch结束时保存模型
- torch.save(model.state_dict(), train_state['model_filename'])
- # 不要提前停止训练
- train_state['stop_early'] = False
-
- # 如果已经训练过至少一个Epoch,则进行进一步的检查
- elif train_state['epoch_index'] >= 1:
- # 获取上一个和当前的验证损失
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # 如果损失变差了
- if loss_t >= train_state['early_stopping_best_val']:
- # 更新早期停止步骤
- train_state['early_stopping_step'] += 1
- # 如果损失减少了
- else:
- # 如果当前损失是目前为止最好的,则保存模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # 重置早期停止步骤
- train_state['early_stopping_step'] = 0
-
- # 是否提前停止训练?
- # 如果连续的早期停止步骤数达到设定的阈值,则提前停止
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- # 返回更新后的训练状态
- return train_state
- def compute_accuracy(y_pred, y_target):
- y_pred_indices = y_pred.max(dim=1)[1]
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100

使用argparse.Namespace
定义和设置训练过程中的参数,并根据需要,更新文件路径以指向保存模型和向量化器的目录。检查CUDA可用性后,根据是否从文件重新加载数据,创建新的数据集和向量化器或从文件加载。
- args = Namespace(
- # 数据和路径信息
- surname_csv="data/surnames/surnames_with_splits.csv", # 姓氏数据集的CSV文件路径
- vectorizer_file="vectorizer.json", # 向量化器的保存文件名
- model_state_file="model.pth", # 模型状态的保存文件名
- save_dir="model_storage/ch4/cnn", # 保存模型和向量化器的目录
- # 模型超参数
- hidden_dim=100, # 隐藏层维度
- num_channels=256, # 网络中使用的通道数
- # 训练超参数
- seed=1337, # 随机种子,用于确保结果的可复现性
- learning_rate=0.001, # 学习率
- batch_size=128, # 每个批次的样本数
- num_epochs=100, # 总的训练世代数
- early_stopping_criteria=5, # 早期停止的标准
- dropout_p=0.1, # dropout概率
- # 运行时选项
- cuda=False, # 是否使用CUDA
- reload_from_files=False, # 是否从文件中重新加载
- expand_filepaths_to_save_dir=True, # 是否将文件路径扩展到保存目录
- catch_keyboard_interrupt=True # 是否捕获键盘中断
- )
-
- # 如果设置了将文件路径扩展到保存目录,则更新向量化器和模型状态的文件路径
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 检查CUDA是否可用,如果不是,则将args.cuda设置为False
- if not torch.cuda.is_available():
- args.cuda = False
-
- # 设置设备,如果cuda可用则使用cuda,否则使用cpu
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- # 设置随机种子以确保结果的可复现性
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- # 处理目录,如果目录不存在则创建它
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
- # 调用set_seed_everywhere函数设置随机种子
- set_seed_everywhere(args.seed, args.cuda)
-
- # 调用handle_dirs函数处理保存目录
- handle_dirs(args.save_dir)
- if args.reload_from_files:
- # 如果设置了从文件重新加载,则从保存的状态加载数据集和向量化器。
- # 这通常用于从先前的训练检查点继续训练。
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(
- args.surname_csv, # 姓氏数据集的CSV文件路径
- args.vectorizer_file # 向量化器的保存文件路径
- )
- else:
- # 如果没有设置从文件重新加载,则创建新的数据集和向量化器。
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- # 并且保存新创建的向量化器到文件,以便将来可以重新加载。
- dataset.save_vectorizer(args.vectorizer_file)
-
- # 获取数据集的向量化器,它将用于将姓氏转换为模型可以处理的格式。
- vectorizer = dataset.get_vectorizer()
-
- # 初始化分类器,它是一个用于预测姓氏国籍的神经网络模型。
- # 根据向量化器的词汇表大小和命令行参数来设置模型的参数。
- classifier = SurnameClassifier(
- initial_num_channels=len(vectorizer.surname_vocab), # 姓氏字符的词汇表大小
- num_classes=len(vectorizer.nationality_vocab), # 国籍的词汇表大小
- num_channels=args.num_channels # 网络中使用的通道数
- )
-
- # 将分类器移动到指定的设备上(CPU或CUDA)。
- classifier = classifier.to(args.device)
-
- # 将类别权重移动到指定的设备上,这些权重将用于损失函数。
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 初始化损失函数,这里使用交叉熵损失,它适用于多分类问题。
- # 类别权重被用作损失函数的权重参数,以处理不平衡的数据集。
- loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
-
- # 初始化优化器,这里使用Adam优化器,并设置学习率。
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
-
- # 初始化学习率调度器,这里使用ReduceLROnPlateau调度器,
- # 它在验证损失停止下降时降低学习率。
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(
- optimizer=optimizer, # 要应用学习率调度的优化器
- mode='min', # 模式设置为'min',因为我们希望最小化损失
- factor=0.5, # 学习率减少的因子
- patience=1 # 当验证损失在1个Epoch内没有改善时降低学习率
- )
-
- # 创建训练状态字典,它将跟踪训练过程中的各种状态信息。
- train_state = make_train_state(args)

初始化分类器模型,设置损失函数和优化器,进行多个训练周期,每个周期包括前向传播、损失计算、反向传播和参数更新;在训练中根据验证损失决定是否提前终止训练以防止过拟合。
- # 初始化总的训练进度条,显示训练轮数
- epoch_bar = tqdm_notebook(
- desc='training routine', # 进度条描述
- total=args.num_epochs, # 总轮数
- position=0 # 进度条位置
- )
-
- # 设置数据集为训练集
- dataset.set_split('train')
-
- # 初始化训练集进度条,显示每个epoch中的批次数量
- train_bar = tqdm_notebook(
- desc='split=train', # 进度条描述
- total=dataset.get_num_batches(args.batch_size), # 计算批次数量
- position=1, # 进度条位置
- leave=True # 训练完成后保留进度条
- )
-
- # 设置数据集为验证集
- dataset.set_split('val')
-
- # 初始化验证集进度条,显示每个epoch中的批次数量
- val_bar = tqdm_notebook(
- desc='split=val', # 进度条描述
- total=dataset.get_num_batches(args.batch_size), # 计算批次数量
- position=1, # 进度条位置
- leave=True # 验证完成后保留进度条
- )
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")

在验证集和测试集上评估模型性能。
-
- # 加载之前训练中保存的最佳模型状态
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- # 将类别权重移动到与模型相同的设备
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 初始化交叉熵损失函数,传入数据集的类别权重作为权重参数
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 设置数据集为测试集
- dataset.set_split('test')
-
- # 生成测试数据的批次生成器
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- # 初始化测试损失和准确率
- running_loss = 0.
- running_acc = 0.
-
- # 将模型设置为评估模式
- classifier.eval()
-
- # 遍历测试数据的所有批次
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算模型输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item() # 获取损失的具体数值
- # 计算平均损失
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- # 计算平均准确率
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 将测试损失和准确率存储到训练状态中
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- # 使用format方法来格式化字符串
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))

效果比多层感知器好,准确率到了60%
再一次进行应用:D
首先定义预测函数(predict_nationality),用于预测新姓氏的国籍
- def predict_nationality(surname, classifier, vectorizer):
- """
- 从一个新的姓氏中预测国籍。
- 参数:
- surname (str): 要分类的姓氏
- classifier (SurnameClassifier): 分类器的一个实例
- vectorizer (SurnameVectorizer): 相应的向量化器
- 返回:
- 一个字典,包含最可能的国籍及其概率
- """
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- result = classifier(vectorized_surname, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}

通过输入姓氏,来显示预测的国籍和概率。
- # 获取一个姓氏
- new_surname = input("Enter a surname to classify: ")
-
- # 确保分类器在CPU上运行,因为某些设备可能不支持GPU加速
- classifier = classifier.to("cpu")
-
- # 使用之前定义的predict_nationality函数来预测输入姓氏的国籍
- # 传入新姓氏、分类器实例和向量化器实例
- prediction = predict_nationality(new_surname, classifier, vectorizer)
-
- # 打印预测结果,格式为:姓氏 -> 预测的国籍 (概率=XX.XX%)
- # 使用format函数格式化输出,保留两位小数显示概率
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
(这次是中国人了,概率也挺高)
再定义一个函数(predict_topk_nationality),用于显示姓氏可能的前k个最可能的国籍
- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
- """
- 预测新姓氏的前K个最可能的国籍。
- 参数:
- surname (str): 要分类的姓氏
- classifier (SurnameClassifier): 分类器的一个实例
- vectorizer (SurnameVectorizer): 对应的向量化器
- k (int): 要返回的顶级国籍数量
- 返回:
- 字典列表,每个字典包含一个国籍和相应的概率
- """
-
- # 将姓氏向量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量转换为PyTorch张量,并增加一个维度,以符合模型的输入要求
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
- # 使用分类器进行预测,并应用softmax函数获取概率分布
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
- # 获取概率最高的K个国籍的索引和概率值
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 从PyTorch张量转换为NumPy数组,并取出预测结果(因为我们的形状是1xk)
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- # 初始化一个空列表来存储预测结果
- results = []
- # 遍历概率值和索引
- for kth_index in range(k):
- # 根据索引查找国籍
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- # 获取概率值
- probability_value = probability_values[kth_index]
- # 将国籍和概率添加到结果列表中
- results.append({'nationality': nationality,
- 'probability': probability_value})
- # 返回包含预测结果的列表
- return results
-
- # 获取用户输入的姓氏
- new_surname = input("Enter a surname to classify: ")
-
- # 获取用户想要看到的顶级预测数量
- k = int(input("How many of the top predictions to see? "))
- # 如果请求的预测数量超过了国籍的数量,则限制为国籍的最大数量
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- # 使用predict_topk_nationality函数获取前k个预测的国籍
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- # 打印预测结果的数量
- print("Top {} predictions:".format(k))
- print("===================")
- # 遍历打印每个预测的国籍和概率
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

本次实验深入研究了多层感知器(MLP)和卷积神经网络(CNN)在姓氏分类任务中应用的实践过程,还对多层感知机进行了不同网络层的效果比对。用于探索不同网络结构对分类性能的影响。
总而言之,多层感知器和卷积神经网络都是强大的分类工具,但它们在结构和适用场景上有所不同。我们需要更好地理解这些模型的工作原理和性能特点来运用这两大前馈神经网络!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。