赞
踩
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
前馈神经网络(feedforward neural network,FNN),简称前馈网络,是人工神经网络的一种。前馈神经网络采用一种单向多层结构。其中每一层包含若干个神经元。在此种神经网络中,各神经元可以接收前一层神经元的信号,并产生输出到下一层。第0层叫输入层,最后一层叫输出层,其他中间层叫做隐含层(或隐藏层、隐层)。隐层可以是一层。也可以是多层。
整个网络中无反馈,信号从输入层向输出层单向传播,可用一个有向无环图表示。
多层感知器(MLP)被认为是最基本的神经网络构建模块之一。多层感知器将数据向量作为输入,计算出一个输出值。在MLP中,许多感知器被分组,以便单个层的输出是一个新的向量,而不是单个输出值。同时,它可以将多个层与每个层之间的非线性结合在一起。
最简单的MLP,如图1-1所示,由三个表示阶段和两个线性层组成。第一阶段是输入向量。这是给定给模型的向量。给定输入向量,第一个线性层计算一个隐藏向量——表示的第二阶段。隐藏向量之所以这样被调用,是因为它是位于输入和输出之间的层的输出。使用这个隐藏的向量,第二个线性层计算一个输出向量。虽然在这个例子中,只展示了一个隐藏的向量,但是有可能有多个中间阶段,每个阶段产生自己的隐藏向量。最终的隐藏向量总是通过线性层和非线性的组合映射到输出向量。
mlp的力量来自于添加第二个线性层和允许模型学习一个线性分割的的中间表示——该属性能表示一个直线(或更一般的,一个超平面)可以用来区分数据点落在线(或超平面)的哪一边的。学习具有特定属性的中间表示,如分类任务是线性可分的,这是使用神经网络的最深刻后果之一,也是其建模能力的精髓。
在一个二元分类任务中训练感知器和MLP:星和圆。每个数据点是一个二维坐标。在不深入研究实现细节的情况下,最终的模型预测如图1-2所示。在这个图中,错误分类的数据点用黑色填充,而正确分类的数据点没有填充。在左边的面板中,从填充的形状可以看出,感知器在学习一个可以将星星和圆分开的决策边界方面有困难。然而,MLP(右面板)学习了一个更精确地对恒星和圆进行分类的决策边界。
图1-2中,每个数据点的真正类是该点的形状:星形或圆形。错误的分类用块填充,正确的分类没有填充。这些线是每个模型的决策边界。在边的面板中,感知器学习一个不能正确地将圆与星分开的决策边界。事实上,没有一条线可以。在右动的面板中,MLP学会了从圆中分离星。
虽然在图中显示MLP有两个决策边界,这是它的优点,但它实际上只是一个决策边界。决策边界是因为中间表示法改变了空间,使一个超平面同时出现在这两个位置上。在图1-3中,我们可以看到MLP计算的中间值。这些点的形状表示类(星形或圆形)。我们所看到的是,MLP已经学会了“扭曲”数据所处的空间,以便在数据通过最后一层时,用一线来分割它们。
相反,如图1-4所示,感知器没有额外的一层来处理数据的形状,直到数据变成线性可分的。
MLP除了简单的感知器之外,还有一个额外的计算层。在例1-1中给出的实现中,使用PyTorch的两个线性模块实例化了这个想法。线性对象被命名为fc1和fc2,它们遵循一个通用约定,即将线性模块称为“完全连接层”,简称为“fc层”。除了这两个线性层外,还有一个修正的线性单元(ReLU),它在被输入到第二个线性层之前应用于第一个线性层的输出。由于层的顺序性,必须确保层中的输出数量等于下一层的输入数量。使用两个线性层之间的非线性是必要的,因为没有它,两个线性层在数学上等价于一个线性层,因此不能建模复杂的模式。MLP只实现反向传播的前向传递。这是因为PyTorch根据模型的定义和向前传递的实现,自动计算出如何进行向后传递和梯度更新。
Example 1-1. Multilayer Perceptron
import torch.nn as nn import torch.nn.functional as F class MultilayerPerceptron(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim): """ 构造函数,初始化多层感知机的参数 参数: input_dim (int): 输入向量的维度 hidden_dim (int): 第一个线性层的输出维度(隐藏层神经元数量) output_dim (int): 第二个线性层的输出维度(输出层神经元数量) """ super(MultilayerPerceptron, self).__init__() self.fc1 = nn.Linear(input_dim, hidden_dim) # 定义第一个全连接层 self.fc2 = nn.Linear(hidden_dim, output_dim) # 定义第二个全连接层 def forward(self, x_in, apply_softmax=False): """ 前向传播函数 参数: x_in (torch.Tensor): 输入数据张量 x_in 的形状应为 (batch_size, input_dim) apply_softmax (bool): 是否应用 softmax 激活函数 如果与交叉熵损失函数一起使用,应设置为 False 返回值: 结果张量,形状应为 (batch_size, output_dim) """ intermediate = F.relu(self.fc1(x_in)) # 在第一个全连接层后应用 ReLU 激活函数 output = self.fc2(intermediate) # 通过第二个全连接层得到输出 if apply_softmax: output = F.softmax(output, dim=1) # 如果需要,应用 softmax 激活函数 return output
将MLP应用于将姓氏分类到其原籍国的任务流程如下:
首先对每个姓氏的字符进行拆分。
然后,使用词汇表、向量化器和DataLoader类逐步完成从姓氏字符串到向量化小批处理的管道。同时引入了多类输出及其对应的损失函数。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,该数据集具有以下属性——
1,它是相当不平衡的。排名前三的语言占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。
2,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。
Example 1-2. Implementing SurnameDataset
import json import pandas as pd import torch from torch.utils.data import Dataset, DataLoader class SurnameDataset(Dataset): def __init__(self, surname_df, vectorizer): """ 参数: surname_df (pandas.DataFrame): 数据集 vectorizer (SurnameVectorizer): 从数据集实例化的矢量化器 """ self.surname_df = surname_df self._vectorizer = vectorizer self.train_df = self.surname_df[self.surname_df.split=='train'] self.train_size = len(self.train_df) self.val_df = self.surname_df[self.surname_df.split=='val'] self.validation_size = len(self.val_df) self.test_df = self.surname_df[self.surname_df.split=='test'] self.test_size = len(self.test_df) self._lookup_dict = {'train': (self.train_df, self.train_size), 'val': (self.val_df, self.validation_size), 'test': (self.test_df, self.test_size)} self.set_split('train') # 类别权重 class_counts = surname_df.nationality.value_counts().to_dict() def sort_key(item): return self._vectorizer.nationality_vocab.lookup_token(item[0]) sorted_counts = sorted(class_counts.items(), key=sort_key) frequencies = [count for _, count in sorted_counts] self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32) @classmethod def load_dataset_and_make_vectorizer(cls, surname_csv): """加载数据集并从头创建一个新的矢量化器 参数: surname_csv (str): 数据集的位置 返回: SurnameDataset 的一个实例 """ surname_df = pd.read_csv(surname_csv) train_surname_df = surname_df[surname_df.split=='train'] return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df)) @classmethod def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath): """加载数据集和相应的矢量化器。 在矢量化器已缓存以便重用的情况下使用 参数: surname_csv (str): 数据集的位置 vectorizer_filepath (str): 保存的矢量化器的位置 返回: SurnameDataset 的一个实例 """ surname_df = pd.read_csv(surname_csv) vectorizer = cls.load_vectorizer_only(vectorizer_filepath) return cls(surname_df, vectorizer) @staticmethod def load_vectorizer_only(vectorizer_filepath): """从文件加载矢量化器的静态方法 参数: vectorizer_filepath (str): 序列化矢量化器的位置 返回: SurnameVectorizer 的一个实例 """ with open(vectorizer_filepath) as fp: return SurnameVectorizer.from_serializable(json.load(fp)) def save_vectorizer(self, vectorizer_filepath): """使用 json 将矢量化器保存到磁盘 参数: vectorizer_filepath (str): 保存矢量化器的位置 """ with open(vectorizer_filepath, "w") as fp: json.dump(self._vectorizer.to_serializable(), fp) def get_vectorizer(self): """ 返回矢量化器 """ return self._vectorizer def set_split(self, split="train"): """ 使用数据框中的列选择数据集中的分割 """ self._target_split = split self._target_df, self._target_size = self._lookup_dict[split] def __len__(self): return self._target_size def __getitem__(self, index): """PyTorch 数据集的主要入口方法 参数: index (int): 数据点的索引 返回: 包含数据点的字典: 特征 (x_surname) 标签 (y_nationality) """ row = self._target_df.iloc[index] surname_vector = \ self._vectorizer.vectorize(row.surname) nationality_index = \ self._vectorizer.nationality_vocab.lookup_token(row.nationality) return {'x_surname': surname_vector, 'y_nationality': nationality_index} def get_num_batches(self, batch_size): """给定批大小,返回数据集中批的数量 参数: batch_size (int) 返回: 数据集中批的数量 """ return len(self) // batch_size def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"): """ 一个包装了 PyTorch DataLoader 的生成器函数。 它将确保每个张量位于正确的设备位置。 """ dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) for data_dict in dataloader: out_data_dict = {} for name, tensor in data_dict.items(): out_data_dict[name] = data_dict[name].to(device) yield out_data_dict
为了使用字符对姓氏进行分类,使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。
Example 1-3. Implementing Vocabulary
class Vocabulary(object): """处理文本并提取用于映射的词汇的类""" def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"): """ 参数: token_to_idx (dict): 预先存在的从token到索引的映射 add_unk (bool): 是否添加UNK标记的标志 unk_token (str): 要添加到词汇表中的UNK标记 """ if token_to_idx is None: token_to_idx = {} self._token_to_idx = token_to_idx self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()} self._add_unk = add_unk self._unk_token = unk_token self.unk_index = -1 if add_unk: self.unk_index = self.add_token(unk_token) def to_serializable(self): """ 返回可以序列化的字典 """ return {'token_to_idx': self._token_to_idx, 'add_unk': self._add_unk, 'unk_token': self._unk_token} @classmethod def from_serializable(cls, contents): """ 从序列化字典实例化词汇表 """ return cls(**contents) def add_token(self, token): """根据token更新映射字典 参数: token (str): 要添加到词汇表中的项目 返回: index (int): 对应于token的整数 """ try: index = self._token_to_idx[token] except KeyError: index = len(self._token_to_idx) self._token_to_idx[token] = index self._idx_to_token[index] = token return index def add_many(self, tokens): """将一系列tokens添加到词汇表中 参数: tokens (list): 字符串token列表 返回: indices (list): 对应于tokens的索引列表 """ return [self.add_token(token) for token in tokens] def lookup_token(self, token): """检索与token关联的索引 或者如果token不存在,则返回UNK索引 参数: token (str): 要查找的token 返回: index (int): 对应于token的索引 注意: `unk_index`需要>=0(已添加到词汇表中) 才能使用UNK功能 """ if self.unk_index >= 0: return self._token_to_idx.get(token, self.unk_index) else: return self._token_to_idx[token] def lookup_index(self, index): """返回与索引关联的token 参数: index (int): 要查找的索引 返回: token (str): 对应于索引的token 异常: KeyError: 如果索引不在词汇表中 """ if index not in self._idx_to_token: raise KeyError("索引 (%d) 不在词汇表中" % index) return self._idx_to_token[index] def __str__(self): return "<Vocabulary(size=%d)>" % len(self) def __len__(self): return len(self._token_to_idx)
Example 1-4. Implementing SurnameVectorizer
import numpy as np class SurnameVectorizer(object): """ 矢量化器,负责协调词汇表并使用它们 """ def __init__(self, surname_vocab, nationality_vocab): self.surname_vocab = surname_vocab self.nationality_vocab = nationality_vocab def vectorize(self, surname): """将提供的姓氏矢量化 参数: surname (str): 姓氏 返回: one_hot (np.ndarray): 一个压缩的一热编码 """ vocab = self.surname_vocab one_hot = np.zeros(len(vocab), dtype=np.float32) for token in surname: one_hot[vocab.lookup_token(token)] = 1 return one_hot @classmethod def from_dataframe(cls, surname_df): """从数据框实例化矢量化器 参数: surname_df (pandas.DataFrame): 姓氏数据集 返回: SurnameVectorizer 的一个实例 """ surname_vocab = Vocabulary(unk_token="@") nationality_vocab = Vocabulary(add_unk=False) for index, row in surname_df.iterrows(): for letter in row.surname: surname_vocab.add_token(letter) nationality_vocab.add_token(row.nationality) return cls(surname_vocab, nationality_vocab)
SurnameClassifier是本实验的MLP的实现。第一个线性层将输入向量映射到中间向量,并对该向量应用非线性。第二线性层将中间向量映射到预测向量。在最后一步中,可选地应用softmax操作,以确保输出和为1;这就是所谓的“概率”。
交叉熵损失对于多类分类是最理想的,但是在训练过程中软最大值的计算不仅浪费而且在很多情况下并不稳定。
Example 1-5. The SurnameClassifier as an MLP
import torch.nn as nn import torch.nn.functional as F class SurnameClassifier(nn.Module): """ A 2-layer Multilayer Perceptron for classifying surnames """ def __init__(self, input_dim, hidden_dim, output_dim): """ Args: input_dim (int): the size of the input vectors hidden_dim (int): the output size of the first Linear layer output_dim (int): the output size of the second Linear layer """ super(SurnameClassifier, self).__init__() self.fc1 = nn.Linear(input_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, output_dim) def forward(self, x_in, apply_softmax=False): """The forward pass of the classifier Args: x_in (torch.Tensor): an input data tensor. x_in.shape should be (batch, input_dim) apply_softmax (bool): a flag for the softmax activation should be false if used with the Cross Entropy losses Returns: the resulting tensor. tensor.shape should be (batch, output_dim) """ intermediate_vector = F.relu(self.fc1(x_in)) prediction_vector = self.fc2(intermediate_vector) if apply_softmax: prediction_vector = F.softmax(prediction_vector, dim=1) return prediction_vector
Example 1-6. The Training
import os import pandas as pd from sklearn.model_selection import train_test_split import torch import json from argparse import Namespace def make_train_state(args): return {'stop_early': False, 'early_stopping_step': 0, 'early_stopping_best_val': 1e8, 'learning_rate': args.learning_rate, 'epoch_index': 0, 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'test_loss': -1, 'test_acc': -1, 'model_filename': args.model_state_file} def update_train_state(args, model, train_state): """Handle the training state updates. Components: - Early Stopping: Prevent overfitting. - Model Checkpoint: Model is saved if the model is better :param args: main arguments :param model: model to train :param train_state: a dictionary representing the training state values :returns: a new train_state """ # Save one model at least if train_state['epoch_index'] == 0: torch.save(model.state_dict(), train_state['model_filename']) train_state['stop_early'] = False # Save model if performance improved elif train_state['epoch_index'] >= 1: loss_tm1, loss_t = train_state['val_loss'][-2:] # If loss worsened if loss_t >= train_state['early_stopping_best_val']: # Update step train_state['early_stopping_step'] += 1 # Loss decreased else: # Save the best model if loss_t < train_state['early_stopping_best_val']: torch.save(model.state_dict(), train_state['model_filename']) # Reset early stopping step train_state['early_stopping_step'] = 0 # Stop early ? train_state['stop_early'] = \ train_state['early_stopping_step'] >= args.early_stopping_criteria return train_state def compute_accuracy(y_pred, y_target): y_pred_indices = y_pred.max(dim=1)[1] n_correct = torch.eq(y_pred_indices, y_target).sum().item() return n_correct / len(y_pred_indices) * 100 def set_seed_everywhere(seed, cuda): np.random.seed(seed) torch.manual_seed(seed) if cuda: torch.cuda.manual_seed_all(seed) def handle_dirs(dirpath): if not os.path.exists(dirpath): os.makedirs(dirpath) # 加载初始数据集 surname_csv = 'surnames.csv' df = pd.read_csv(surname_csv) # 将数据集划分为训练集、验证集和测试集 train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['nationality'], random_state=42) val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['nationality'], random_state=42) # 添加 split 列 train_df = train_df.copy() train_df.loc[:, 'split'] = 'train' val_df = val_df.copy() val_df.loc[:, 'split'] = 'val' test_df = test_df.copy() test_df.loc[:, 'split'] = 'test' # 合并所有数据集 final_df = pd.concat([train_df, val_df, test_df]) # 保存为 CSV 文件 final_df.to_csv("data/surnames/surnames_with_splits.csv", index=False) # 参数定义 args = Namespace( # 数据和路径信息 surname_csv="surnames_with_splits.csv", # 包含拆分信息的数据集路径 vectorizer_file="vectorizer.json", # 向量化器的保存路径 model_state_file="model.pth", # 模型状态的保存路径 save_dir="model_storage/ch4/surname_mlp", # 模型保存目录 # 模型超参数 hidden_dim=300, # 隐藏层维度 # 训练超参数 seed=1337, # 随机种子 num_epochs=10, # 训练的迭代次数 early_stopping_criteria=5, # 提前停止的标准 learning_rate=0.001, # 学习率 batch_size=64, # 批处理大小 # 运行时选项 cuda=False, # 是否使用CUDA reload_from_files=False, # 是否从文件重载 expand_filepaths_to_save_dir=True, # 是否扩展文件保存路径 ) # 扩展保存路径 if args.expand_filepaths_to_save_dir: args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file) args.model_state_file = os.path.join(args.save_dir, args.model_state_file) print("扩展后的文件路径: ") print("\t{}".format(args.vectorizer_file)) print("\t{}".format(args.model_state_file)) # 检查 CUDA if not torch.cuda.is_available(): args.cuda = False args.device = torch.device("cuda" if args.cuda else "cpu") print("使用 CUDA: {}".format(args.cuda)) # 设置随机种子以确保可复现性 def set_seed_everywhere(seed, cuda): torch.manual_seed(seed) if cuda: torch.cuda.manual_seed_all(seed) set_seed_everywhere(args.seed, args.cuda) # 处理目录 def handle_dirs(dirpath): if not os.path.exists(dirpath): os.makedirs(dirpath) handle_dirs(args.save_dir)
训练中最显著的差异与模型中输出的种类和使用的损失函数有关。在这个例子中,输出是一个多类预测向量,可以转换为概率。正如在模型描述中所描述的,这种输出的损失类型仅限于CrossEntropyLoss和NLLLoss。由于它的简化,我们使用了CrossEntropyLoss。
Example 1-7. Instantiating the dataset, model, loss, and optimizer
# 加载数据集并创建矢量化器 dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv) vectorizer = dataset.get_vectorizer() # 初始化分类器 classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab), hidden_dim=args.hidden_dim, output_dim=len(vectorizer.nationality_vocab)) # 将分类器转移到设备(CPU或CUDA) classifier = classifier.to(args.device) # 设置损失函数,使用加权的交叉熵损失 loss_func = nn.CrossEntropyLoss(dataset.class_weights) # 设置优化器,使用Adam优化器 optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
Example 1-8. A snippet of the training loop
import numpy as np from tqdm.notebook import tqdm as tqdm_notebook # 将分类器和类权重转移到设备 classifier = classifier.to(args.device) dataset.class_weights = dataset.class_weights.to(args.device) # 定义损失函数、优化器和学习率调度器 loss_func = nn.CrossEntropyLoss(dataset.class_weights) optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode='min', factor=0.5, patience=1) # 创建训练状态字典 train_state = make_train_state(args) # 创建进度条 epoch_bar = tqdm_notebook(desc='training routine', total=args.num_epochs, position=0) dataset.set_split('train') train_bar = tqdm_notebook(desc='split=train', total=dataset.get_num_batches(args.batch_size), position=1, leave=True) dataset.set_split('val') val_bar = tqdm_notebook(desc='split=val', total=dataset.get_num_batches(args.batch_size), position=1, leave=True) try: for epoch_index in range(args.num_epochs): train_state['epoch_index'] = epoch_index # 在训练数据集上迭代 dataset.set_split('train') batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device) running_loss = 0.0 running_acc = 0.0 classifier.train() for batch_index, batch_dict in enumerate(batch_generator): optimizer.zero_grad() y_pred = classifier(batch_dict['x_surname']) loss = loss_func(y_pred, batch_dict['y_nationality']) loss_t = loss.item() running_loss += (loss_t - running_loss) / (batch_index + 1) loss.backward() optimizer.step() acc_t = compute_accuracy(y_pred, batch_dict['y_nationality']) running_acc += (acc_t - running_acc) / (batch_index + 1) train_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index) train_bar.update() train_state['train_loss'].append(running_loss) train_state['train_acc'].append(running_acc) # 在验证数据集上迭代 dataset.set_split('val') batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device) running_loss = 0. running_acc = 0. classifier.eval() for batch_index, batch_dict in enumerate(batch_generator): y_pred = classifier(batch_dict['x_surname']) loss = loss_func(y_pred, batch_dict['y_nationality']) loss_t = loss.to("cpu").item() running_loss += (loss_t - running_loss) / (batch_index + 1) acc_t = compute_accuracy(y_pred, batch_dict['y_nationality']) running_acc += (acc_t - running_acc) / (batch_index + 1) val_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index) val_bar.update() train_state['val_loss'].append(running_loss) train_state['val_acc'].append(running_acc) # 更新训练状态 train_state = update_train_state(args=args, model=classifier, train_state=train_state) scheduler.step(train_state['val_loss'][-1]) if train_state['stop_early']: break train_bar.n = 0 val_bar.n = 0 epoch_bar.update() except KeyboardInterrupt: print("循环中断")
# 加载最佳模型 classifier.load_state_dict(torch.load(train_state['model_filename'])) # 将模型和类权重转移到设备 classifier = classifier.to(args.device) dataset.class_weights = dataset.class_weights.to(args.device) # 定义损失函数 loss_func = nn.CrossEntropyLoss(dataset.class_weights) # 设置数据集为测试集 dataset.set_split('test') # 生成测试集批次数据 batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device) # 初始化运行损失和准确率 running_loss = 0. running_acc = 0. # 将模型设置为评估模式 classifier.eval() # 遍历测试集批次 for batch_index, batch_dict in enumerate(batch_generator): # 计算输出 y_pred = classifier(batch_dict['x_surname']) # 计算损失 loss = loss_func(y_pred, batch_dict['y_nationality']) loss_t = loss.item() running_loss += (loss_t - running_loss) / (batch_index + 1) # 计算准确率 acc_t = compute_accuracy(y_pred, batch_dict['y_nationality']) running_acc += (acc_t - running_acc) / (batch_index + 1) # 更新训练状态中的测试集损失和准确率 train_state['test_loss'] = running_loss train_state['test_acc'] = running_acc #打印结果 print("Test loss: {};".format(train_state['test_loss'])) print("Test Accuracy: {}".format(train_state['test_acc']))
通过查看分类器的top-k预测来为一个新示例开发模型所了解的内容的直觉。
Example 1-9. A function for performing nationality prediction
def predict_nationality(name, classifier, vectorizer): # 将名字矢量化 vectorized_name = vectorizer.vectorize(name) vectorized_name = torch.tensor(vectorized_name).view(1, -1) # 使用分类器进行预测 result = classifier(vectorized_name, apply_softmax=True) # 获取最大概率值及其对应的索引 probability_values, indices = result.max(dim=1) index = indices.item() # 查找预测的国籍及其概率 predicted_nationality = vectorizer.nationality_vocab.lookup_index(index) probability_value = probability_values.item() return {'nationality': predicted_nationality, 'probability': probability_value}
new_surname = input("请输入要分类的姓氏: ")
# 将分类器移至CPU
classifier = classifier.cpu()
# 预测国籍
prediction = predict_nationality(new_surname, classifier, vectorizer)
# 输出预测结果
print("{} -> {} (p={:0.2f})".format(new_surname,
prediction['nationality'],
prediction['probability']))
不仅要看最好的预测,还要看更多的预测。PyTorch提供了一个torch.topk函数,它提供了一种方便的方法来获得这些预测。
Example 1-10. Predicting the top-k nationalities
def predict_topk_nationality(surname, classifier, vectorizer, k=5): """预测姓氏的前K个国籍 参数: surname (str): 要分类的姓氏 classifier (SurnameClassifer): 分类器实例 vectorizer (SurnameVectorizer): 对应的矢量化器 k (int): 要返回的前几个国籍 返回: 由字典组成的列表,每个字典包含一个国籍和一个概率 """ vectorized_surname = vectorizer.vectorize(surname) vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0) prediction_vector = classifier(vectorized_surname, apply_softmax=True) probability_values, indices = torch.topk(prediction_vector, k=k) # 返回的大小是 1,k probability_values = probability_values[0].detach().numpy() indices = indices[0].detach().numpy() results = [] for kth_index in range(k): nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index]) probability_value = probability_values[kth_index] results.append({'nationality': nationality, 'probability': probability_value}) return results new_surname = input("请输入要分类的姓氏: ") k = int(input("您希望看到前多少个预测结果? ")) if k > len(vectorizer.nationality_vocab): print("对不起! 这是超过我们国籍数量的请求.. 默认最大数量 :)") k = len(vectorizer.nationality_vocab) predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k) print("前 {} 个预测结果:".format(k)) print("===================") for prediction in predictions: print("{} -> {} (p={:0.2f})".format(new_surname, prediction['nationality'], prediction['probability']))
卷积神经网络(Convolutional Neural Networks, CNN)是一类包含卷积计算且具有深度结构的前馈神经网络(Feedforward Neural Networks),是深度学习(deep learning)的代表算法之一 。卷积神经网络具有表征学习(representation learning)能力,能够按其阶层结构对输入信息进行平移不变分类(shift-invariant classification),因此也被称为“平移不变人工神经网络(Shift-Invariant Artificial Neural Networks, SIANN)” 。
为了理解不同的设计决策对CNN意味着什么,在本例中,单个“核”应用于输入矩阵。卷积运算(线性算子)的精确数学表达式对于理解这一节并不重要,但是从这个图中可以直观地看出,核是一个小的方阵,它被系统地应用于输入矩阵的不同位置。
输入矩阵与单个产生输出矩阵的卷积核(也称为特征映射)在输入矩阵的每个位置应用内核。在每个应用程序中,内核乘以输入矩阵的值及其自身的值,然后将这些乘法相加kernel具有以下超参数配置:kernel_size=2,stride=1,padding=0,以及dilation=1。
Example 2-1. Artificial data and using a Conv1d class
import torch import torch.nn as nn batch_size = 2 one_hot_size = 10 sequence_width = 7 # 随机生成数据 data = torch.randn(batch_size, one_hot_size, sequence_width) # 定义1D卷积层 conv1 = nn.Conv1d(in_channels=one_hot_size, out_channels=16, kernel_size=3) # 通过卷积层传递数据 intermediate1 = conv1(data) # 打印输入数据和卷积输出的尺寸 print(data.size()) # 输出: torch.Size([2, 10, 7]) print(intermediate1.size()) # 输出: torch.Size([2, 16, 5])
虽然姓氏数据集之前在“示例:带有多层感知器的姓氏分类”中进行了描述,但建议参考“姓氏数据集”来了解它的描述。尽管使用了相同数据集,但在实现上有一个不同之处:数据集由onehot向量矩阵组成,而不是一个收缩的onehot向量。为此,我们实现了一个数据集类,它跟踪最长的姓氏,并将其作为矩阵中包含的行数提供给矢量化器。列的数量是onehot向量的大小(词汇表的大小)。示例2-2对SurnameDataset.__getitem__进行了更改;同时对SurnameVectorizer的进行了更改。
使用数据集中最长的姓氏来控制onehot矩阵的大小有两个原因。首先,将每一小批姓氏矩阵组合成一个三维张量,要求它们的大小相同。其次,使用数据集中最长的姓氏意味着可以以相同的方式处理每个小批处理。
Example 2-2. SurnameDataset modified for passing the maximum surname length
import torch import pandas as pd from torch.utils.data import Dataset, DataLoader import json class SurnameDataset(Dataset): def __init__(self, surname_df, vectorizer): """ Args: surname_df (pandas.DataFrame): 姓氏数据集 vectorizer (SurnameVectorizer): 从数据集实例化的向量化器 """ self.surname_df = surname_df self._vectorizer = vectorizer self.train_df = self.surname_df[self.surname_df.split=='train'] self.train_size = len(self.train_df) self.val_df = self.surname_df[self.surname_df.split=='val'] self.validation_size = len(self.val_df) self.test_df = self.surname_df[self.surname_df.split=='test'] self.test_size = len(self.test_df) self._lookup_dict = {'train': (self.train_df, self.train_size), 'val': (self.val_df, self.validation_size), 'test': (self.test_df, self.test_size)} self.set_split('train') # 类别权重 class_counts = surname_df.nationality.value_counts().to_dict() def sort_key(item): return self._vectorizer.nationality_vocab.lookup_token(item[0]) sorted_counts = sorted(class_counts.items(), key=sort_key) frequencies = [count for _, count in sorted_counts] self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32) @classmethod def load_dataset_and_make_vectorizer(cls, surname_csv): """加载数据集并创建一个新的向量化器 Args: surname_csv (str): 数据集的位置 Returns: SurnameDataset 的实例 """ surname_df = pd.read_csv(surname_csv) train_surname_df = surname_df[surname_df.split=='train'] return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df)) @classmethod def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath): """加载数据集及其对应的向量化器。 用于在向量化器已缓存以便重复使用的情况下 Args: surname_csv (str): 数据集的位置 vectorizer_filepath (str): 保存的向量化器的位置 Returns: SurnameDataset 的实例 """ surname_df = pd.read_csv(surname_csv) vectorizer = cls.load_vectorizer_only(vectorizer_filepath) return cls(surname_df, vectorizer) @staticmethod def load_vectorizer_only(vectorizer_filepath): """从文件加载向量化器的静态方法 Args: vectorizer_filepath (str): 序列化向量化器的位置 Returns: SurnameDataset 的实例 """ with open(vectorizer_filepath) as fp: return SurnameVectorizer.from_serializable(json.load(fp)) def save_vectorizer(self, vectorizer_filepath): """使用 json 将向量化器保存到磁盘 Args: vectorizer_filepath (str): 保存向量化器的位置 """ with open(vectorizer_filepath, "w") as fp: json.dump(self._vectorizer.to_serializable(), fp) def get_vectorizer(self): """返回向量化器""" return self._vectorizer def set_split(self, split="train"): """使用数据框中的列选择数据集的拆分""" self._target_split = split self._target_df, self._target_size = self._lookup_dict[split] def __len__(self): return self._target_size def __getitem__(self, index): """用于 PyTorch 数据集的主要入口方法 Args: index (int): 数据点的索引 Returns: 一个字典,包含数据点的特征 (x_data) 和标签 (y_target) """ row = self._target_df.iloc[index] surname_matrix = \ self._vectorizer.vectorize(row.surname) nationality_index = \ self._vectorizer.nationality_vocab.lookup_token(row.nationality) return {'x_surname': surname_matrix, 'y_nationality': nationality_index} def get_num_batches(self, batch_size): """给定批次大小,返回数据集中的批次数 Args: batch_size (int) Returns: 数据集中的批次数 """ return len(self) // batch_size def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"): """ 包装 PyTorch DataLoader 的生成器函数。它确保每个张量都位于正确的设备位置上。 """ dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) for data_dict in dataloader: out_data_dict = {} for name, tensor in data_dict.items(): out_data_dict[name] = data_dict[name].to(device) yield out_data_dict
尽管词汇表和DataLoader的实现方式与“示例:带有多层感知器的姓氏分类”中的示例相同,但Vectorizer的vectorize()方法已经更改,以适应CNN模型的需要。我们将使用的Conv1d层要求数据张量在第0维上具有批处理,在第1维上具有通道,在第2维上具有特性。
除了更改为使用onehot矩阵之外,我们还修改了矢量化器,以便计算姓氏的最大长度并将其保存为max_surname_length
Example 2-3. Implementing the Surname Vectorizer for CNNs
class SurnameVectorizer(object): """ 姓氏向量化器,用于协调词汇表并将其用于转换 """ def vectorize(self, surname): """ 将姓氏转换为独热编码矩阵形式 Args: surname (str): 姓氏 Returns: one_hot_matrix (np.ndarray): 一个独热编码向量的矩阵 """ one_hot_matrix_size = (len(self.character_vocab), self.max_surname_length) one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32) for position_index, character in enumerate(surname): character_index = self.character_vocab.lookup_token(character) one_hot_matrix[character_index][position_index] = 1 return one_hot_matrix @classmethod def from_dataframe(cls, surname_df): """ 从数据集 dataframe 实例化向量化器 Args: surname_df (pandas.DataFrame): 姓氏数据集 Returns: SurnameVectorizer 的一个实例 """ character_vocab = Vocabulary(unk_token="@") nationality_vocab = Vocabulary(add_unk=False) max_surname_length = 0 for index, row in surname_df.iterrows(): max_surname_length = max(max_surname_length, len(row.surname)) for letter in row.surname: character_vocab.add_token(letter) nationality_vocab.add_token(row.nationality) return cls(character_vocab, nationality_vocab, max_surname_length)
本例中的新内容是使用sequence和ELU PyTorch模块。序列模块是封装线性操作序列的方便包装器。在这种情况下,我们使用它来封装Conv1d序列的应用程序。
在本例中,我们将每个卷积的通道数与num_channels超参数绑定。我们可以选择不同数量的通道分别进行卷积运算。这样做需要优化更多的超参数。我们发现256足够大,可以使模型达到合理的性能。
Example 2-3. The CNN-based SurnameClassifier
import torch.nn as nn import torch.nn.functional as F class SurnameClassifier(nn.Module): def __init__(self, initial_num_channels, num_classes, num_channels): """ 姓氏分类器模型的定义 Args: initial_num_channels (int): 输入特征向量的大小 num_classes (int): 输出预测向量的大小 num_channels (int): 网络中保持不变的通道大小 """ super(SurnameClassifier, self).__init__() self.convnet = nn.Sequential( nn.Conv1d(in_channels=initial_num_channels, out_channels=num_channels, kernel_size=3), nn.ELU(), nn.Conv1d(in_channels=num_channels, out_channels=num_channels, kernel_size=3, stride=2), nn.ELU(), nn.Conv1d(in_channels=num_channels, out_channels=num_channels, kernel_size=3, stride=2), nn.ELU(), nn.Conv1d(in_channels=num_channels, out_channels=num_channels, kernel_size=3), nn.ELU() ) self.fc = nn.Linear(num_channels, num_classes) def forward(self, x_surname, apply_softmax=False): """分类器的前向传播 Args: x_surname (torch.Tensor): 输入数据张量。 x_surname.shape 应为 (batch, initial_num_channels, max_surname_length) apply_softmax (bool): softmax 激活的标志 如果与交叉熵损失一起使用,则应为 false Returns: 结果张量。张量.shape 应为 (batch, num_classes) """ features = self.convnet(x_surname).squeeze(dim=2) prediction_vector = self.fc(features) if apply_softmax: prediction_vector = F.softmax(prediction_vector, dim=1) return prediction_vector
Example 2-4. Input arguments to the CNN surname classifier
args = Namespace( # 数据和路径信息 surname_csv="/home/jovyan/surnames_with_splits.csv", vectorizer_file="vectorizer.json", model_state_file="model.pth", save_dir="model_storage/ch4/cnn", # 模型超参数 hidden_dim=100, num_channels=256, # 训练超参数 seed=1337, learning_rate=0.001, batch_size=128, num_epochs=10, early_stopping_criteria=5, dropout_p=0.1, # 运行时选项 cuda=False, reload_from_files=False, expand_filepaths_to_save_dir=True, catch_keyboard_interrupt=True ) if args.expand_filepaths_to_save_dir: args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file) args.model_state_file = os.path.join(args.save_dir, args.model_state_file) print("Expanded filepaths: ") print("\t{}".format(args.vectorizer_file)) print("\t{}".format(args.model_state_file)) # 检查 CUDA if not torch.cuda.is_available(): args.cuda = False args.device = torch.device("cuda" if args.cuda else "cpu") print("Using CUDA: {}".format(args.cuda)) def set_seed_everywhere(seed, cuda): np.random.seed(seed) torch.manual_seed(seed) if cuda: torch.cuda.manual_seed_all(seed) def handle_dirs(dirpath): if not os.path.exists(dirpath): os.makedirs(dirpath) # 设置随机种子以确保可重现性 set_seed_everywhere(args.seed, args.cuda) # 处理目录 handle_dirs(args.save_dir)
Example 2-5. Using the trained model to make predictions
def predict_nationality(surname, classifier, vectorizer): """预测一个新姓氏的国籍 Args: surname (str): 要分类的姓氏 classifier (SurnameClassifer): 分类器的实例 vectorizer (SurnameVectorizer): 相应的矢量化器 Returns: 一个字典,包含最有可能的国籍及其概率 """ vectorized_surname = vectorizer.vectorize(surname) vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0) result = classifier(vectorized_surname, apply_softmax=True) probability_values, indices = result.max(dim=1) index = indices.item() predicted_nationality = vectorizer.nationality_vocab.lookup_index(index) probability_value = probability_values.item() return {'nationality': predicted_nationality, 'probability': probability_value} new_surname = input("Enter a surname to classify: ") classifier = classifier.cpu() prediction = predict_nationality(new_surname, classifier, vectorizer) print("{} -> {} (p={:0.2f})".format(new_surname, prediction['nationality'], prediction['probability']))
本文主要讨论了多层感知机(MLP)和卷积神经网络(CNN)在处理姓氏分类问题的应用与表现。
感知机是一个单层神经网络,感知机作为最基本的神经网络模型,由输入层、权重和输出层组成;适用于线性可分的数据集。能够对线性可分的数据进行分类。其输出是输入特征的线性组合经过一个激活函数处理后的结果。
相比于感知机,MLP包含一个或多个隐藏层,使用非线性激活函数,使其能够解决更复杂的分类问题。然而,对于具有空间结构的数据(如图像)效率不高。
CNN是一种专门处理具有空间结构数据的神经网络,通过卷积操作提取局部特征,并通过池化操作减少特征维度,最终使用全连接层进行分类或回归。相比于MLP,能有效捕捉数据中的空间特征,参数共享和局部连接减少了模型的计算复杂度,在图像分类、目标检测等任务中表现优异。
总的来看,感知机适合线性数据,MLP适合复杂但结构化数据,CNN适合具有空间结构的数据。希望本文的介绍能让读者对感知机、MLP和CNN有更深入的理解,并能够在实际项目中灵活应用这些模型,解决实际问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。