赞
踩
感知器是现存最简单的神经网络。感知器存在一些缺点例如,查看中绘制的数据点。这相当于非此即彼(XOR)的情况,在这种情况下,决策边界不能是一条直线(也称为线性可分)。在这个例子中,感知器失败了。
单层感知器与多层感知器分类结果示例如下:
多层感知机(MLP,Multilayer Perceptron)也叫人工神经网络(ANN,Artificial Neural Network),除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构,如下图:
从上图可以看到,多层感知机层与层之间是全连接的。多层感知机最底层是输入层,中间是隐藏层,最后是输出层。 隐藏层的神经元怎么得来?首先它与输入层是全连接的,假设输入层用向量X表示,则隐藏层的输出就是 f (W1X+b1),W1是权重(也叫连接系数),b1是偏置,函数f 可以是常用的sigmoid函数或者tanh函数。
关于为什么使用激活函数?
a. 不使用激活函数,每一层输出都是上层输入的线性函数,无论神经网络有多少层,输出都是输入的线性组合。
b. 使用激活函数,能够给神经元引入非线性因素,使得神经网络可以任意逼近任何非线性函数,这样神经网络就可以利用到更多的非线性模型中。
sigmoid 是神经网络历史上最早使用的激活函数之一。它取任何实值并将其压缩在0和1之间。数学上,sigmoid 的表达式如下:
程序如下:
- import torch
- import matplotlib.pyplot as plt
- # 生成输入数据 x,范围从 -5 到 5,间隔为 0.1
- x = torch.arange(-5., 5., 0.1)
- # 计算 Sigmoid 函数的输出
- y = torch.sigmoid(x)
- # 绘制曲线
- plt.plot(x.numpy(), y.numpy()) # 使用 .numpy() 方法将张量转换为 NumPy 数组
- # plt.xlabel('x')
- # plt.ylabel('Sigmoid(x)')
- # plt.title('Sigmoid 函数')
- # plt.grid(True) # 添加网格线
- plt.show() # 显示图形
它解决了Sigmoid函数的不以0为中心输出问题,然而,梯度消失的问题和幂运算的问题仍然存在。tanh函数的表达式如下:
- import torch
- import matplotlib.pyplot as plt
- # 生成输入数据 x,范围从 -5 到 5,间隔为 0.1
- x = torch.arange(-5., 5., 0.1)
- # 计算 Tanh 函数的输出
- y = torch.tanh(x)
- # 绘制曲线
- plt.plot(x.numpy(), y.numpy()) # 使用 .numpy() 方法将张量转换为 NumPy 数组
- # plt.xlabel('x')
- # plt.ylabel('Tanh(x)')
- # plt.title('Tanh 函数')
- # # plt.grid(True) # 添加网格线
- # plt.show() # 显示图形
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集将在本课程实验的几个示例中重用,并具有一些使其有趣的属性。第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。第二个特点是,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。
为了创建最终的数据集,我们从一个比课程补充材料中包含的版本处理更少的版本开始,并执行了几个数据集修改操作。第一个目的是减少这种不平衡——原始数据集中70%以上是俄文,这可能是由于抽样偏差或俄文姓氏的增多。为此,我们通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本。接下来,我们根据国籍对数据集进行分组,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
- class SurnameDataset(Dataset):
- def __init__(self, target_df, vectorizer):
- """
- Args:
- target_df (pandas.DataFrame): 数据集DataFrame
- vectorizer (SurnameVectorizer): 数据集的SurnameVectorizer对象
- """
- self._target_df = target_df
- self._vectorizer = vectorizer
-
- def __len__(self):
- return len(self._target_df)
-
- def __getitem__(self, index):#给定索引时返回一个数据点,包括姓氏向量和国籍索引
- """
- Args:
- index (int): 数据索引
- Returns:
- 字典,包含x_surname(姓氏向量)和y_nationality(国籍索引)
- """
- row = self._target_df.iloc[index]
- surname_vector = self._vectorizer.vectorize(row.surname)
- nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}

为了使用字符对姓氏进行分类,我们使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。
THE VOCABULARY CLASS:简要概述一下,词汇表是两个Python字典的协调,这两个字典在令牌(在本例中是字符)和整数之间形成一个双射;也就是说,第一个字典将字符映射到整数索引,第二个字典将整数索引映射到字符。add_token方法用于向词汇表中添加新的令牌,lookup_token方法用于检索索引,lookup_index方法用于检索给定索引的令牌(在推断阶段很有用)。
THE SURNAMEVECTORIZER:虽然词汇表将单个令牌(字符)转换为整数,但SurnameVectorizer负责应用词汇表并将姓氏转换为向量。姓氏是字符的序列,每个字符在我们的词汇表中是一个单独的标记。然而,在“卷积神经网络”出现之前,我们将忽略序列信息,通过迭代字符串输入中的每个字符来创建输入的收缩one-hot向量表示。我们为以前未遇到的字符指定一个特殊的令牌,即UNK。由于我们仅从训练数据实例化词汇表,而且验证或测试数据中可能有惟一的字符,所以在字符词汇表中仍然使用UNK符号。
- class SurnameVectorizer(object):#应用词汇表,将姓氏转换为向量 。 姓氏是字符的序列,字符串没有在空格上分割
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
- def __init__(self, surname_vocab, nationality_vocab):
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- """Vectorize the provided surname
- Args:
- surname (str): the surname
- Returns:
- one_hot (np.ndarray): a collapsed one-hot encoding
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1#迭代字符串输入中的每个字符来创建输入的收缩one-hot向量表示。
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从数据框实例化向量化器
- Args:
- surname_df (pandas.DataFrame): 姓氏数据集
- Returns:
- SurnameVectorizer 的一个实例
- """
- surname_vocab = Vocabulary(unk_token="@") # 姓氏词汇表,初始化一个词汇表,未知词标记为"@"
-
- nationality_vocab = Vocabulary(add_unk=False) # 国籍词汇表,不添加未知词标记
-
- # 遍历数据框的每一行
- for index, row in surname_df.iterrows():
- # 遍历每个姓氏的每个字母
- for letter in row.surname:
- surname_vocab.add_token(letter) # 将字母添加到姓氏词汇表中
- nationality_vocab.add_token(row.nationality) # 将国籍添加到国籍词汇表中
-
- return cls(surname_vocab, nationality_vocab) # 返回 SurnameVectorizer 的一个实例,使用创建的词汇表

SurnameClassifier是本实验前面介绍的MLP的实现,第一个线性层将输入向量映射到中间向量,并对该向量应用非线性。第二线性层将中间向量映射到预测向量。在最后一步中,可选地应用softmax操作,以确保输出和为1;这就是所谓的“概率”。它是可选的原因与我们使用的损失函数的数学公式有关——交叉熵损失。我们研究了“损失函数”中的交叉熵损失。回想一下,交叉熵损失对于多类分类是最理想的,但是在训练过程中软最大值的计算不仅浪费而且在很多情况下并不稳定。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class SurnameClassifier(nn.Module):
- """ A 2-layer Multilayer Perceptron for classifying surnames """
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- super(SurnameClassifier, self).__init__()#初始化
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the classifier
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:#选用softmax操作,以确保和为1
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

虽然我们使用了不同的模型、数据集和损失函数,但是训练例程是相同的
Settings and some prep work
- args = Namespace(
- # 数据和路径信息
- surname_csv="data/surnames_with_splits.csv", # 姓氏数据的CSV文件路径
- vectorizer_file="vectorizer.json", # 向量化器保存文件的路径
- model_state_file="model.pth", # 模型状态保存文件的路径
- save_dir="model_storage/ch4/surname_mlp", # 保存模型和向量化器的目录路径
- # 模型超参数
- hidden_dim=300, # 隐藏层维度
- # 训练超参数
- seed=1337, # 随机种子
- num_epochs=100, # 训练的总轮数
- early_stopping_criteria=5, # 提前停止的标准
- learning_rate=0.001, # 学习率
- batch_size=64, # 批大小
- # 运行时选项
- cuda=False, # 是否使用CUDA加速
- reload_from_files=False, # 是否从文件重新加载模型和向量化器
- expand_filepaths_to_save_dir=True, # 是否将文件路径扩展到保存目录中
- )
-
- # 如果需要,将文件路径扩展到保存目录中
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file)
- args.model_state_file = os.path.join(args.save_dir, args.model_state_file)
-
- print("扩展后的文件路径: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 检查CUDA是否可用
- if not torch.cuda.is_available():
- args.cuda = False
-
- # 设置设备(GPU或CPU)
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- print("使用CUDA: {}".format(args.cuda))
-
- # 设置随机种子以实现可重复性
- set_seed_everywhere(args.seed, args.cuda)
-
- # 处理目录,如果不存在则创建
- handle_dirs(args.save_dir)

THE TRAINING LOOP
除了外观上的差异,训练循环的功能保持不变。利用训练数据,计算模型输出、损失和梯度。然后,使用梯度来更新模型。
- # 将分类器移动到指定的设备上
- classifier = classifier.to(args.device)
- # 将数据集的类别权重也移动到指定的设备上
- dataset.class_weights = dataset.class_weights.to(args.device)
- # 定义损失函数,使用交叉熵损失函数,传入类别权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- # 定义优化器,使用Adam优化器来更新分类器的参数,设置学习率为args.learning_rate
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- # 定义学习率调度器,当验证损失不再减小时,将学习率减半
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
- # 创建训练状态
- train_state = make_train_state(args)
- # 创建一个进度条,用于显示训练过程的总进度
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
- # 将数据集的split设置为训练集
- dataset.set_split('train')
- # 创建一个进度条,用于显示训练集的训练进度
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- # 将数据集的split设置为验证集
- dataset.set_split('val')
- # 创建一个进度条,用于显示验证集的验证进度
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- # 循环每个epoch
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # 遍历训练数据集
-
- # 设置:批生成器,将损失和准确率初始化为0,设置为训练模式
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 训练过程的五个步骤:
-
- # --------------------------------------
- # 步骤1. 梯度归零
- optimizer.zero_grad()
-
- # 步骤2. 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 步骤3. 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 步骤4. 使用损失计算梯度
- loss.backward()
-
- # 步骤5. 使用优化器更新参数
- optimizer.step()
- # -----------------------------------------
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 更新进度条
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # 遍历验证数据集
-
- # 设置:批生成器,将损失和准确率初始化为0,设置为评估模式
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 步骤3. 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
- scheduler.step(train_state['val_loss'][-1])
- if train_state['stop_early']:
- break
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")

Helper functions(补充实验的部分函数
- def make_train_state(args):
- """
- 创建训练状态的字典,包括训练中需要跟踪的各种参数和指标。
- 参数:
- args: 主要参数
- 返回:
- 一个表示训练状态值的新字典
- """
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- def update_train_state(args, model, train_state):
- """处理训练状态的更新。
- 组件:
- - 提前停止: 防止过拟合。
- - 模型检查点: 如果模型更好,则保存模型
- 参数:
- args: 主要参数
- model: 要训练的模型
- train_state: 表示训练状态值的字典
- 返回:
- 一个新的train_state
- """
- # 至少保存一个模型
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # 如果性能提高,则保存模型
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # 如果损失加剧
- if loss_t >= train_state['early_stopping_best_val']:
- # 更新步骤
- train_state['early_stopping_step'] += 1
- # 损失减少
- else:
- # 保存最佳模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # 重置提前停止步骤
- train_state['early_stopping_step'] = 0
-
- # 是否提前停止?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- def compute_accuracy(y_pred, y_target):
- """计算预测的准确率。
- 参数:
- y_pred: 模型的预测
- y_target: 真实目标值
- 返回:
- 准确率百分比
- """
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100

要理解模型的性能,应该使用定量和定性方法分析模型。定量测量出的测试数据的误差,决定了分类器能否推广到不可见的例子。定性地说,可以通过查看分类器的top-k预测来为一个新示例开发模型所了解的内容的直觉。
评价SurnameClassifier测试数据,我们将数据集设置为遍历测试数据,调用classifier.eval()
方法,并遍历测试数据以同样的方式与其他数据。在这个例子中,调用classifier.eval()
可以防止PyTorch在使用测试/评估数据时更新模型参数。
该模型对测试数据的准确性达到50%左右。如果在附带的notebook中运行训练例程,会注意到在训练数据上的性能更高。这是因为模型总是更适合它所训练的数据,所以训练数据的性能并不代表新数据的性能。如果遵循代码,你可以尝试隐藏维度的不同大小,应该注意到性能的提高。然而,这种增长不会很大(尤其是与“用CNN对姓氏进行分类的例子”中的模型相比)。其主要原因是收缩的onehot向量化方法是一种弱表示。虽然它确实简洁地将每个姓氏表示为单个向量,但它丢弃了字符之间的顺序信息,这对于识别起源非常重要。
- # 加载最佳模型的状态字典
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- # 将分类器移至指定的设备(如 GPU)
- classifier = classifier.to(args.device)
- # 将数据集类别权重也移至指定设备
- dataset.class_weights = dataset.class_weights.to(args.device)
- # 定义损失函数为交叉熵损失函数,使用数据集类别权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 设置数据集为测试集
- dataset.set_split('test')
- # 生成测试集批次
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- # 初始化测试过程中的损失和准确率
- running_loss = 0.
- running_acc = 0.
- # 将模型设置为评估模式
- classifier.eval()
-
- # 遍历测试集的批次
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算模型输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- # 更新平均损失
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- # 更新平均准确率
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 将测试集的平均损失和准确率保存到训练状态中
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))

下面显示了分类新姓氏的代码。给定一个姓氏作为字符串,该函数将首先应用向量化过程,然后获得模型预测。注意,我们包含了apply_softmax标志,所以结果包含概率。模型预测,在多项式的情况下,是类概率的列表。我们使用PyTorch张量最大函数来得到由最高预测概率表示的最优类。
- def predict_nationality(surname, classifier, vectorizer):
- """从新的姓氏预测国籍
-
- Args:
- surname (str): 待分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 相应的向量化器
- Returns:
- 包含最可能的国籍及其概率的字典
- """
- # 向量化姓氏
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).view(1, -1)
- # 使用模型进行预测,并应用 softmax 函数
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取预测结果中概率值最高的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 根据索引查找对应的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- # 返回包含预测的国籍及其概率的字典
- return {'nationality': predicted_nationality, 'probability': probability_value}
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

不仅要看最好的预测,还要看更多的预测。例如,NLP中的标准实践是采用k-best预测并使用另一个模型对它们重新排序。PyTorch提供了一个torch.topk函数,它提供了一种方便的方法来获得这些预测.
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- # 向量化输入的姓氏
- vectorized_name = vectorizer.vectorize(name)
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- # 使用模型进行预测,并应用 softmax 函数获取概率
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- # 获取概率值和对应的索引(即国籍)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 转换为 numpy 数组,大小为 1 x k
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- results = []
- # 将结果存储为字典形式
- for prob_value, index in zip(probability_values, indices):
- # 获取对应索引的国籍
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- # 添加到结果列表中
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
-
-
- # 输入待分类的新姓氏
- new_surname = input("请输入一个姓氏进行分类: ")
- # 将分类器移至 CPU 上进行推断
- classifier = classifier.to("cpu")
-
- # 询问要查看的前 k 个预测结果
- k = int(input("要查看前几个最高预测? "))
- # 如果输入的 k 超过了国籍词汇表的大小,则默认为最大值
- if k > len(vectorizer.nationality_vocab):
- print("抱歉!超过了我们所拥有的国籍数量..将默认您查看最大值 :)")
- k = len(vectorizer.nationality_vocab)
-
- # 获取前 k 个预测结果
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- # 打印前 k 个预测结果
- print("前 {} 个预测结果:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (概率={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

DROPOUT可以用于解决过拟合问题,优化实验,这里给出示例代码,读者可根据需要选择。
简单地说,在训练过程中,dropout有一定概率使属于两个相邻层的单元之间的连接减弱。这有什么用呢?我们从斯蒂芬•梅里蒂(Stephen Merity)的一段直观(且幽默)的解释开始:“Dropout,简单地说,是指如果你能在喝醉的时候反复学习如何做一件事,那么你应该能够在清醒的时候做得更好。这一见解产生了许多最先进的结果和一个新兴的领域。”
神经网络——尤其是具有大量分层的深层网络——可以在单元之间创建有趣的相互适应。“Coadaptation”是神经科学中的一个术语,但在这里它只是指一种情况,即两个单元之间的联系变得过于紧密,而牺牲了其他单元之间的联系。这通常会导致模型与数据过拟合。通过概率地丢弃单元之间的连接,我们可以确保没有一个单元总是依赖于另一个单元,从而产生健壮的模型。dropout不会向模型中添加额外的参数,但是需要一个超参数——“drop probability”。drop probability,它是单位之间的连接drop的概率。通常将下降概率设置为0.5。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- super(MultilayerPerceptron, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)#模型的两层,和对应的参数数量
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the MLP
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- intermediate = F.relu(self.fc1(x_in))
- output = self.fc2(F.dropout(intermediate, p=0.5))#dropout的概率是0.5,隐层神经元以0.5的概率失活
-
- if apply_softmax:
- output = F.softmax(output, dim=1)
- return output#dropout只用于训练期间不用于评估期间

卷积神经网络是一种带有卷积结构的深度神经网络,卷积结构可以减少深层网络占用的内存量,其三个关键的操作,其一是局部感受野,其二是权值共享,其三是 pooling 层,有效的减少了网络的参数个数,缓解了模型的过拟合问题。CNNs的名称和基本功能源于经典的数学运算卷积。卷积已经应用于各种工程学科,包括数字信号处理和计算机图形学。一般来说,卷积使用程序员指定的参数。这些参数被指定来匹配一些功能设计,如突出边缘或抑制高频声音。事实上,许多Photoshop滤镜都是应用于图像的固定卷积运算。然而,在深度学习和本实验中,我们从数据中学习卷积滤波器的参数,因此它对于解决当前的任务是最优的。
为了理解不同的设计决策对CNN意味着什么,我们展示了一个示例。在本例中,单个“核”应用于输入矩阵。卷积运算(线性算子)的精确数学表达式对于理解这一节并不重要,但是从这个图中可以直观地看出,核是一个小的方阵,它被系统地应用于输入矩阵的不同位置。
输入矩阵与单个产生输出矩阵的卷积核(也称为特征映射)在输入矩阵的每个位置应用内核。在每个应用程序中,内核乘以输入矩阵的值及其自身的值,然后将这些乘法相加kernel具有以下超参数配置:kernel_size=2,stride=1,padding=0,以及dilation=1。这些超参数解释如下:
虽然经典卷积是通过指定核的具体值来设计的,但是CNN是通过指定控制CNN行为的超参数来设计的,然后使用梯度下降来为给定数据集找到最佳参数。两个主要的超参数控制卷积的形状(称为kernel_size)和卷积将在输入数据张量(称为stride)中相乘的位置。还有一些额外的超参数控制输入数据张量被0填充了多少(称为padding),以及当应用到输入数据张量(称为dilation)时,乘法应该相隔多远。
卷积神经网络是一种多层的监督学习神经网络,隐含层的卷积层和池采样层是实现卷积神经网络特征提取功能的核心模块。该网络模型通过采用梯度下降法最小化损失函数对网络中的权重参数逐层反向调节,通过频繁的迭代训练提高网络的精度。卷积神经网络的低隐层是由卷积层和最大池采样层交替组成,高层是全连接层对应传统多层感知器的隐含层和逻辑回归分类器。
第一个全连接层的输入是由卷积层和子采样层进行特征提取得到的特征图像。
最后一层输出层是一个分类器,可以采用逻辑回归,Softmax 回归甚至是支持向量机对输入图像进行分类。
每一层有多个特征图,每个特征图通过一种卷积滤波器提取输入的一种特征,每个特征图有多个神经元。输入图像统计和滤波器进行卷积之后,提取该局部特征,一旦该局部特征被提取出来之后,它与其他特征的位置关系也随之确定下来了,每个神经元的输入和前一层的局部感受野相连,每个特征提取层都紧跟一个用来求局部平均与二次提取的计算层,也叫特征映射层,网络的每个计算层由多个特征映射平面组成,平面上所有的神经元的权重相等。通常将输入层到隐藏层的映射称为一个特征映射,也就是通过卷积层得到特征提取层,经过 pooling 之后得到特征映射层。
卷积神经网络结构包括:卷积层,降采样层,全连接层
卷积层:因为通过卷积运算我们可以提取出图像的特征,通过卷积运算可以使得原始信号的某些特征增强,并且降低噪声。
下采样层:因为对图像进行下采样,可以减少数据处理量同时保留有用信息,采样可以混淆特征的具体位置,因为某个特征找出来之后,它的位置已经不重要了,我们只需要这个特征和其他特征的相对位置,可以应对形变和扭曲带来的同类物体的变化。
全连接层:采用 softmax 全连接,得到的激活值即卷积神经网络提取到的图片特征。
卷积神经网络的核心思想就是局部感受野、是权值共享和pooling层,以此来达到简化网络参数并使得网络具有一定程度的位移、尺度、缩放、非线性形变稳定性。
局部感受野:由于图像的空间联系是局部的,每个神经元不需要对全部的图像做感受,只需要感受局部特征即可,然后在更高层将这些感受得到的不同的局部神经元综合起来就可以得到全局的信息了,这样可以减少连接的数目。
权值共享:不同神经元之间的参数共享可以减少需要求解的参数,使用多种滤波器去卷积图像就会得到多种特征映射。权值共享其实就是对图像用同样的卷积核进行卷积操作,也就意味着第一个隐藏层的所有神经元所能检测到处于图像不同位置的完全相同的特征。其主要的能力就能检测到不同位置的同一类型特征,也就是卷积网络能很好的适应图像的小范围的平移性,即有较好的平移不变性(比如将输入图像的猫的位置移动之后,同样能够检测到猫的图像)
网络结构能够较好的适应图像的结构
同时进行特征提取和分类,使得特征提取有助于特征分类
权值共享可以减少网络的训练参数,使得神经网络结构变得简单,适应性更强
虽然姓氏数据集之前在“示例:带有多层感知器的姓氏分类”中进行了描述,但建议参考“姓氏数据集”来了解它的描述。尽管我们使用了来自“示例:带有多层感知器的姓氏分类”中的相同数据集,但在实现上有一个不同之处:数据集由onehot向量矩阵组成,而不是一个收缩的onehot向量。为此,我们实现了一个数据集类,它跟踪最长的姓氏,并将其作为矩阵中包含的行数提供给矢量化器。列的数量是onehot向量的大小(词汇表的大小),下面显示了对SurnameDataset.__getitem__
的更改;我们显示对SurnameVectorizer的更改。在下一小节向量化。
我们使用数据集中最长的姓氏来控制onehot矩阵的大小有两个原因。首先,将每一小批姓氏矩阵组合成一个三维张量,要求它们的大小相同。其次,使用数据集中最长的姓氏意味着可以以相同的方式处理每个小批处理。
- class SurnameDataset(Dataset):
- # ... existing implementation from Section 4.2
-
- def __getitem__(self, index):
- row = self._target_df.iloc[index]#将矩阵中包含的行数提供给矢量化器。
-
- surname_matrix = \
- self._vectorizer.vectorize(row.surname, self._max_seq_length)#列的数量是onehot向量的大小-词汇表的大小
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
在本例中,尽管词汇表和DataLoader的实现方式与“示例:带有多层感知器的姓氏分类”中的示例相同,但Vectorizer的vectorize()方法已经更改,以适应CNN模型的需要。具体来说,正如我们在下面的代码中所示,该函数将字符串中的每个字符映射到一个整数,然后使用该整数构造一个由onehot向量组成的矩阵。重要的是,矩阵中的每一列都是不同的onehot向量。主要原因是,我们将使用的Conv1d层要求数据张量在第0维上具有批处理,在第1维上具有通道,在第2维上具有特性。除了更改为使用onehot矩阵之外,我们还修改了矢量化器,以便计算姓氏的最大长度并将其保存为max_surname_length。
- class SurnameVectorizer(object):
- """负责协调词汇表并将其应用于数据向量化的向量化器"""
-
- def vectorize(self, surname):
- """
- 将姓氏向量化为独热编码的矩阵
- 参数:
- surname (str): 姓氏
- 返回:
- one_hot_matrix (np.ndarray): 一个由独热向量组成的矩阵
- """
-
- # 初始化一个全零矩阵,大小为(字符词汇表大小, 最大姓氏长度)
- one_hot_matrix_size = (len(self.character_vocab), self.max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- # 遍历姓氏中的每个字符,并将其转换为独热向量
- for position_index, character in enumerate(surname):
- # 查找字符在字符词汇表中的索引
- character_index = self.character_vocab.lookup_token(character)
- # 将独热向量中对应位置置为1
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从数据集DataFrame中实例化向量化器
- 参数:
- surname_df (pandas.DataFrame): 姓氏数据集
- 返回:
- SurnameVectorizer的一个实例
- """
- # 初始化字符词汇表和国籍词汇表
- character_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- # 遍历数据集中的每一行
- for index, row in surname_df.iterrows():
- # 更新最大姓氏长度
- max_surname_length = max(max_surname_length, len(row.surname))
- # 将姓氏中的每个字符添加到字符词汇表中
- for letter in row.surname:
- character_vocab.add_token(letter)
- # 将国籍添加到国籍词汇表中
- nationality_vocab.add_token(row.nationality)
-
- return cls(character_vocab, nationality_vocab, max_surname_length)

我们在本例中使用的模型是使用我们在“卷积神经网络”中介绍的方法构建的。实际上,我们在该部分中创建的用于测试卷积层的“人工”数据与姓氏数据集中使用本例中的矢量化器的数据张量的大小完全匹配。正如在示例中所看到的,它与我们在“卷积神经网络”中引入的Conv1d序列既有相似之处,也有需要解释的新添加内容。具体来说,该模型类似于“卷积神经网络”,它使用一系列一维卷积来增量地计算更多的特征,从而得到一个单特征向量。
然而,本例中的新内容是使用sequence和ELU PyTorch模块。序列模块是封装线性操作序列的方便包装器。在这种情况下,我们使用它来封装Conv1d序列的应用程序。ELU是类似于实验3中介绍的ReLU的非线性函数,但是它不是将值裁剪到0以下,而是对它们求幂。ELU已经被证明是卷积层之间使用的一种很有前途的非线性(Clevert et al., 2015)。
在本例中,我们将每个卷积的通道数与num_channels超参数绑定。我们可以选择不同数量的通道分别进行卷积运算。这样做需要优化更多的超参数。我们发现256足够大,可以使模型达到合理的性能。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- 初始化 SurnameClassifier 类
- Args:
- initial_num_channels (int): 输入特征向量的大小
- num_classes (int): 输出预测向量的大小
- num_channels (int): 网络中使用的恒定通道大小
- """
- super(SurnameClassifier, self).__init__()
-
- # 定义卷积神经网络结构
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- nn.ELU(), # 使用ELU激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- nn.ELU()
- )
- # 全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
- Args:
- x_surname (torch.Tensor): 输入数据张量。
- x_surname.shape 应为 (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): softmax 激活的标志,
- 如果与交叉熵损失一起使用,则应为 False
- Returns:
- 结果张量。tensor.shape 应为 (batch, num_classes)
- """
- # 将数据传入卷积神经网络并压缩维度
- features = self.convnet(x_surname).squeeze(dim=2)
- # 通过全连接层进行预测
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

训练程序包括以下似曾相识的的操作序列:实例化数据集,实例化模型,实例化损失函数,实例化优化器,遍历数据集的训练分区和更新模型参数,遍历数据集的验证分区和测量性能,然后重复数据集迭代一定次数。对于这个例子,我们将不再详细描述具体的训练例程,因为它与“示例:带有多层感知器的姓氏分类”中的例程完全相同。但是,输入参数是不同的.
- import argparse
-
- # 创建解析器对象
- args = argparse.Namespace(
- # 数据和路径信息
- surname_csv="data/surnames/surnames_with_splits.csv", # 姓氏数据集的CSV文件路径
- vectorizer_file="vectorizer.json", # 向量化器文件的保存路径
- model_state_file="model.pth", # 模型状态文件的保存路径
- save_dir="model_storage/ch4/cnn", # 模型保存目录路径
-
- # 模型超参数
- hidden_dim=100, # 隐藏层维度
- num_channels=256, # 卷积核数量
-
- # 训练超参数
- seed=1337, # 随机种子
- learning_rate=0.001, # 学习率
- batch_size=128, # 批大小
- num_epochs=100, # 迭代次数
- early_stopping_criteria=5, # 提前停止训练的标准
- dropout_p=0.1, # Dropout概率
-
- # 运行时参数被省略以节省空间
- )

正如“示例:带有多层感知器的姓氏分类”中的示例与本示例之间的训练例程没有变化一样,执行评估的代码也没有变化。总之,调用分类器的eval()
方法来防止反向传播,并迭代测试数据集。与 MLP 约 50% 的性能相比,该模型的测试集性能准确率约为56%。尽管这些性能数字绝不是这些特定架构的上限,但是通过一个相对简单的CNN模型获得的改进应该足以让您在文本数据上尝试CNNs。
- # 加载之前保存的模型状态字典
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- # 将分类器移动到指定的设备上
- classifier = classifier.to(args.device)
- # 将数据集的类权重也移动到指定的设备上
- dataset.class_weights = dataset.class_weights.to(args.device)
- # 定义损失函数为交叉熵损失函数,并考虑类权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 设置数据集分割为测试集
- dataset.set_split('test')
- # 创建测试集的batch生成器
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- # 初始化测试集的损失和准确率为0
- running_loss = 0.
- running_acc = 0.
- # 将分类器设置为评估模式
- classifier.eval()
-
- # 迭代测试集的每个batch
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算模型的输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- # 更新测试集损失
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- # 更新测试集准确率
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 将测试集的损失和准确率保存到训练状态中
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))

在本例中,predict_nationality()
函数的一部分发生了更改,我们没有使用视图方法重塑新创建的数据张量以添加批处理维度,而是使用PyTorch的unsqueeze()
函数在批处理应该在的位置添加大小为1的维度。相同的更改反映在predict_topk_nationality()
函数中.
- def predict_nationality(surname, classifier, vectorizer):
- """预测姓氏的国籍
-
- Args:
- surname (str): 待分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 对应的向量化器
- Returns:
- 一个包含最可能的国籍及其概率的字典
- """
- # 将姓氏向量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量化后的姓氏转换为张量,并添加一个维度以符合模型输入要求
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- # 使用分类器进行预测,并应用softmax函数获取概率
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取概率最高的值及其对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 根据索引从向量化器中获取预测的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- # 获取概率值
- probability_value = probability_values.item()
-
- # 返回预测的国籍及其概率值
- return {'nationality': predicted_nationality, 'probability': probability_value}
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.cpu()
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
- """预测一个新姓氏的前K个国籍
-
- Args:
- surname (str): 待分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 对应的向量化器
- k (int): 返回的前K个国籍数量
- Returns:
- 包含字典的列表,每个字典包含一个国籍和其概率
- """
-
- # 将姓氏向量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量化后的姓氏转换为张量,并添加一个维度以符合模型输入要求
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
- # 使用分类器进行预测,并应用softmax函数获取概率
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
- # 获取概率最高的前K个值及其对应的索引
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 将返回的概率值及索引转换为NumPy数组
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- results = []
- # 遍历前K个预测结果
- for kth_index in range(k):
- # 根据索引从向量化器中获取预测的国籍
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- # 获取概率值
- probability_value = probability_values[kth_index]
- results.append({'nationality': nationality,
- 'probability': probability_value})
- return results
-
- # 获取用户输入的新姓氏
- new_surname = input("请输入待分类的姓氏:")
-
- # 获取用户想要查看的前K个预测结果数量
- k = int(input("要查看前几个预测结果?"))
- # 如果用户输入的数量超过了国籍词汇表的大小,则默认显示全部国籍
- if k > len(vectorizer.nationality_vocab):
- print("抱歉!这超过了我们拥有的国籍数量.. 默认显示全部国籍 :)")
- k = len(vectorizer.nationality_vocab)
-
- # 获取前K个预测结果
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- # 打印前K个预测结果
- print("前{}个预测结果:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (概率={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。