赞
踩
我们将MLP应用于将姓氏分类到其原籍国的任务。从公开观察到的数据推断人口统计信息(如国籍)具有从产品推荐到确保不同人口统计用户获得公平结果的应用。人口统计和其他自我识别信息统称为“受保护属性”。“在建模和产品中使用这些属性时,必须小心。”我们首先对每个姓氏的字符进行拆分,并像对待“示例:将餐馆评论的情绪分类”中的单词一样对待它们。除了数据上的差异,字符层模型在结构和实现上与基于单词的模型基本相似。
多层感知机(MLP,Multilayer Perceptron)也叫人工神经网络(ANN,Artificial Neural Network)他有以下基本原理和关键概念:
神经元(Neurons):MLP中的基本单元。神经元接收来自上一层的输入,通过加权求和和激活函数处理后产生输出。
权重(Weights):连接神经元之间的参数,表示了输入在神经元之间传递的重要性或影响程度。权重会在训练过程中通过反向传播算法进行调整,以最小化损失函数。
偏置(Biases):每个神经元都有一个偏置项,它可以理解为神经元的激活阈值。偏置项与权重一起调整输入的线性组合,影响神经元的激活状态。
激活函数(Activation Functions):MLP中的每个神经元通常都会应用一个非线性激活函数,如Sigmoid、ReLU(Rectified Linear Unit)、Tanh等。这些激活函数引入了非线性因素,使得神经网络能够学习复杂的数据模式。
前向传播(Forward Propagation):在前向传播过程中,输入数据从输入层传递到输出层。在每一层中,输入经过加权求和和激活函数处理后生成输出,然后传递到下一层。
反向传播(Backpropagation):反向传播是用于训练MLP的一种常用方法。它通过计算损失函数对网络参数(权重和偏置)的梯度,然后根据梯度更新参数。这个过程反复进行,直到模型收敛到最优解。
损失函数(Loss Function):损失函数用于衡量模型预测结果与真实标签之间的差异。常用的损失函数包括交叉熵损失函数(用于分类问题)和均方误差损失函数(用于回归问题)。
对于MLP结构来说,除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构,如下图:
从上图可以看到,多层感知机层与层之间是全连接的(全连接的意思就是:上一层的任何一个神经元与下一层的所有神经元都有连接)。多层感知机最底层是输入层,中间是隐藏层,最后是输出层。
对于输入层来说,你输入什么就是什么,比如输入是一个n维向量,就有n个神经元。
隐藏层的神经元与输入层是全连接的,假设输入层用向量X表示,则隐藏层的输出就是f(W1X+b1),W1是权重(也叫连接系数),b1是偏置,函数f 可以是常用的sigmoid函数或者tanh函数:
最后就是输出层,隐藏层到输出层可以看成是一个多类别的逻辑回归,也即softmax回归,所以输出层的输出就是softmax(W2X1+b2),X1表示隐藏层的输出f(W1X+b1)。
MLP整个模型就是,函数G是softmax
因此,MLP所有的参数就是各个层之间的连接权重以及偏置,包括W1、b1、W2、b2。对于一个具体的问题,怎么确定这些参数?求解最佳的参数是一个最优化问题,解决最优化问题,最简单的就是梯度下降法了(SGD):首先随机初始化所有参数,然后迭代地训练,不断地计算梯度和更新参数,直到满足某个条件为止(比如误差足够小、迭代次数足够多时)。这个过程涉及到代价函数、规则化(Regularization)、学习速率(learning rate)、梯度计算等
MLP通过不断地调整权重和偏置,以及选择合适的激活函数和损失函数,使得模型能够逐渐学习输入和输出之间的复杂映射关系,从而实现对数据的有效建模和预测。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集将在本课程实验的几个示例中重用,并具有一些使其有趣的属性。第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。第二个特点是,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。
为了创建最终的数据集,我们从一个比课程补充材料中包含的版本处理更少的版本开始,并执行了几个数据集修改操作。第一个目的是减少这种不平衡——原始数据集中70%以上是俄文,这可能是由于抽样偏差或俄文姓氏的增多。为此,我们通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本。接下来,我们根据国籍对数据集进行分组,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
对数据集进行预处理:
- import collections
- import numpy as np
- import pandas as pd
- import re
-
- from argparse import Namespace
-
- # 定义命令行参数
- args = Namespace(
- raw_dataset_csv="surnames.csv", # 原始数据集文件名
- train_proportion=0.7, # 训练集比例
- val_proportion=0.15, # 验证集比例
- test_proportion=0.15, # 测试集比例
- output_munged_csv="surnames_with_splits.csv", # 输出文件名
- seed=1337 # 随机种子
- )
-
- # 读取原始数据集
- surnames = pd.read_csv(args.raw_dataset_csv, header=0)
- surnames.head()
-
- # 获取唯一的类别(国籍)
- set(surnames.nationality)
-
- # 按国籍划分训练集
- # 创建字典
- by_nationality = collections.defaultdict(list)
- for _, row in surnames.iterrows():
- by_nationality[row.nationality].append(row.to_dict())
-
- # 创建划分后的数据
- final_list = []
- np.random.seed(args.seed)
-
- # 按国籍划分并随机打乱数据
- for _, item_list in sorted(by_nationality.items()):
- np.random.shuffle(item_list)
- n = len(item_list)
- n_train = int(args.train_proportion * n)
- n_val = int(args.val_proportion * n)
- n_test = int(args.test_proportion * n)
-
- # 为每个数据点添加划分属性
- for item in item_list[:n_train]:
- item['split'] = 'train'
- for item in item_list[n_train:n_train+n_val]:
- item['split'] = 'val'
- for item in item_list[n_train+n_val:]:
- item['split'] = 'test'
-
- # 添加到最终列表中
- final_list.extend(item_list)
-
- # 将划分后的数据写入文件
- final_surnames = pd.DataFrame(final_list)
-
- final_surnames.head()
-
- # 将处理后的数据写入CSV文件
- final_surnames.to_csv(args.output_munged_csv, index=False)
结果:
查看前几行数据:
唯一类别:
数据集的划分结果:
查看划分后的数据集:
- #导入必要的第三方库
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
-
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
SurnameDataset的实现与“Example: classification of Sentiment of Restaurant Reviews”中的ReviewDataset几乎相同,只是在getitem方法的实现方式上略有不同。回想一下,本课程中呈现的数据集类继承自PyTorch的数据集类,因此,我们需要实现两个函数:__getitem
方法,它在给定索引时返回一个数据点;以及len方法,该方法返回数据集的长度。
- #创建一个自定义的数据集类来加载和处理文本数据
- #划分训练、验证和测试集,设置数据集大小,以及构建查找字典
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- surname_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- #加载数据集并生成新的向量化器
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- #加载数据集和相应的向量化器,用于重新使用已缓存的向量化器
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- #从文件中加载向量化器
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameVectorizer
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- #将向量化器保存到磁盘
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- #回向量化器对象
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
-
- #划分数据集
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- #返回指定数据集的大小
- def __len__(self):
- return self._target_size
-
- #获取指定索引的数据点,并将文本数据向量化,标签进行编码后返回
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's:
- features (x_surname)
- label (y_nationality)
- """
- row = self._target_df.iloc[index]
-
- surname_vector = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- #根据指定的批量大小返回数据集中的批次数量
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
- #生成批次数据
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
为了使用字符对姓氏进行分类,我们使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。这些数据结构与“Example: Classifying Sentiment of Restaurant Reviews”中使用的数据结构相同,它们举例说明了一种多态性,这种多态性将姓氏的字符标记与Yelp评论的单词标记相同对待。数据不是通过将字令牌映射到整数来向量化的,而是通过将字符映射到整数来向量化的。
- class Vocabulary(object):
- """Class to process text and extract vocabulary for mapping"""
- #初始化词汇表
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- Args:
- token_to_idx (dict): a pre-existing map of tokens to indices
- add_unk (bool): a flag that indicates whether to add the UNK token
- unk_token (str): the UNK token to add into the Vocabulary
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
- #将词汇表保存为可序列化的格式
-
- def to_serializable(self):
- """ returns a dictionary that can be serialized """
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
- #从序列化的字典实例化词汇表对象
- @classmethod
- def from_serializable(cls, contents):
- """ instantiates the Vocabulary from a serialized dictionary """
- return cls(**contents)
- #据输入的标记更新词汇表的映射字典
- def add_token(self, token):
- """Update mapping dicts based on the token.
- Args:
- token (str): the item to add into the Vocabulary
- Returns:
- index (int): the integer corresponding to the token
- """
- if token in self._token_to_idx:
- index = self._token_to_idx[token]
- else:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
- #将一个标记列表添加到词汇表
- def add_many(self, tokens):
- """Add a list of tokens into the Vocabulary
-
- Args:
- tokens (list): a list of string tokens
- Returns:
- indices (list): a list of indices corresponding to the tokens
- """
- return [self.add_token(token) for token in tokens]
- #用于检索与标记相关联的索引
- def lookup_token(self, token):
- """Retrieve the index associated with the token
- or the UNK index if token isn't present.
-
- Args:
- token (str): the token to look up
- Returns:
- index (int): the index corresponding to the token
- Notes:
- `unk_index` needs to be >=0 (having been added into the Vocabulary)
- for the UNK functionality
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
- #用于返回与给定索引相关联的标记
- def lookup_index(self, index):
- """Return the token associated with the index
-
- Args:
- index (int): the index to look up
- Returns:
- token (str): the token corresponding to the index
- Raises:
- KeyError: if the index is not in the Vocabulary
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
- #返回描述词汇表大小的字符串表示
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
- #返回词汇表中的唯一标记数
- def __len__(self):
- return len(self._token_to_idx)
- #将文本数据向量化,并配合词汇表进行处理
- class SurnameVectorizer(object):
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
- def __init__(self, surname_vocab, nationality_vocab):
- """
- Args:
- surname_vocab (Vocabulary): maps characters to integers
- nationality_vocab (Vocabulary): maps nationalities to integers
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- # 将给定的评论文本转换为一个折叠的单热编码向量
- def vectorize(self, surname):
- """
- Args:
- surname (str): the surname
- Returns:
- one_hot (np.ndarray): a collapsed one-hot encoding
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- #从数据框实例化 ReviewVectorizer对象
- def from_dataframe(cls, surname_df):
- """Instantiate the vectorizer from the dataset dataframe
-
- Args:
- surname_df (pandas.DataFrame): the surnames dataset
- Returns:
- an instance of the SurnameVectorizer
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- # 从序列化的字典实例化 ReviewVectorizer对象
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- #将 ReviewVectorizer对象序列化为字典,以便缓存或保存
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}
第一个线性层将输入向量映射到中间向量,并对该向量应用非线性。第二线性层将中间向量映射到预测向量。
在最后一步中,可选地应用softmax操作,以确保输出和为1;这就是所谓的“概率”。它是可选的原因与我们使用的损失函数的数学公式有关——交叉熵损失。我们研究了“损失函数”中的交叉熵损失。回想一下,交叉熵损失对于多类分类是最理想的,但是在训练过程中软最大值的计算不仅浪费而且在很多情况下并不稳定。
- class SurnameClassifier(nn.Module):
- """ 用于对姓氏进行分类的两层多层感知器 """
-
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): 输入向量的大小
- hidden_dim (int): 第一层线性层的输出大小
- output_dim (int): 第二层线性层的输出大小
- """
- super(SurnameClassifier, self).__init__()
- # 定义两个线性层
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """分类器的前向传播
- Args:
- x_in (torch.Tensor): 输入数据张量。
- x_in.shape 应为 (batch, input_dim)
- apply_softmax (bool): 是否进行 softmax 激活。
- 如果与交叉熵损失一起使用,则应为 False
- Returns:
- 结果张量。tensor.shape 应为 (batch, output_dim)
- """
- # 第一层的线性变换,并使用 ReLU 激活函数
- intermediate_vector = F.relu(self.fc1(x_in))
- # 第二层的线性变换
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- # 如果需要应用 softmax 激活函数,则进行 softmax 操作
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
- #创建一个表示训练状态的字典,初始化各种参数和指标
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- #处理训练状态的更新
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # 至少保存一个模型
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # 性能得到改善,则保存模型
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- #如果损失恶化
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # 损失减少
- else:
- # 保存最佳模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # 重置提前停止步骤
- train_state['early_stopping_step'] = 0
-
- # 早停?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- #计算模型预测的准确率
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
-
- #设置随机种子
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
- #处理文件目录
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
-
- args = Namespace(
- # 数据和路径信息
- surname_csv="surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- # 模型超参数
- hidden_dim=300,
- # 训练超参数
- seed=1337,
- num_epochs=5,
- early_stopping_criteria=5,
- learning_rate=0.001,
- batch_size=64,
- # 运行时选项
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- )
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 检查CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- print("Using CUDA: {}".format(args.cuda))
-
-
- # 为可重复性奠定种子
- set_seed_everywhere(args.seed, args.cuda)
-
- #句柄目录
- handle_dirs(args.save_dir)
-
- #确定是重新加载已有的数据集和词向量化器,还是创建新的数据集和词向量化器。
- if args.reload_from_files:
- # training from a checkpoint
- print("Reloading!")
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # create dataset and vectorizer
- print("Creating fresh!")
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- vectorizer = dataset.get_vectorizer()
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
- #模型训练的准备工作
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
-
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- train_state = make_train_state(args)
-
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # 迭代训练数据集
-
- # 设置:批量发电机,设置损耗和acc为0
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 训练程序有以下5个步骤:
-
- # --------------------------------------
- # 步骤1.将梯度归零
- optimizer.zero_grad()
-
- #步骤2.计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- #步骤3.计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- #步骤4.使用损耗来产生梯度
- loss.backward()
-
- #步骤5.使用优化器采取梯度步骤
- optimizer.step()
- # -----------------------------------------
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- #更新值
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- #遍历数据集
-
- #设置损耗和acc为0;设置eval模式为on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # 输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
- # 使用最佳可用模型计算测试集上的损耗和精度
-
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- #输出损失和准确率
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
该模型对测试数据的准确性达到50%左右。如果在附带的notebook中运行训练例程,会注意到在训练数据上的性能更高。这是因为模型总是更适合它所训练的数据,所以训练数据的性能并不代表新数据的性能。如果遵循代码,你可以尝试隐藏维度的不同大小,应该注意到性能的提高。然而,这种增长不会很大(尤其是与“用CNN对姓氏进行分类的例子”中的模型相比)。其主要原因是收缩的onehot向量化方法是一种弱表示。虽然它确实简洁地将每个姓氏表示为单个向量,但它丢弃了字符之间的顺序信息,这对于识别起源非常重要。
- #使用分类器和矢量化器来预测给定姓名的国籍分
- def predict_nationality(name, classifier, vectorizer):
- vectorized_name = vectorizer.vectorize(name)
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- result = classifier(vectorized_name, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality,
- 'probability': probability_value}
-
- #接收用户输入的姓氏,然后使用提供的分类器和矢量化器对该姓氏进行国籍分类预测,并打印出预测结果及其概率
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
-
- #从矢量化器的国籍词汇表中查找索引为 8 的国籍
- vectorizer.nationality_vocab.lookup_index(8)
-
- #使用了分类器和矢量化器来对给定的姓名进行国籍分类预测,并返回概率最高的前 k 个国籍预测结果
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- vectorized_name = vectorizer.vectorize(name)
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- results = []
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
-
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
-
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
由于时间原因训练轮数只有5轮,所以结果较差:
损失和准确率:
给定一个姓氏作为字符串,然后获得模型预测:
给定的姓名进行国籍分类预测,并返回概率最高的前 k 个国籍预测结果:
我们将MLP应用于将姓氏分类到其原籍国的任务。从公开观察到的数据推断人口统计信息(如国籍)具有从产品推荐到确保不同人口统计用户获得公平结果的应用。人口统计和其他自我识别信息统称为“受保护属性”。“在建模和产品中使用这些属性时,必须小心。”我们首先对每个姓氏的字符进行拆分,并像对待“示例:将餐馆评论的情绪分类”中的单词一样对待它们。除了数据上的差异,字符层模型在结构和实现上与基于单词的模型基本相似。
卷积神经网络(Convolutional Neural Network,CNN)是一种专门用于处理具有网格状拓扑结构数据的人工神经网络。CNN在计算机视觉领域得到了广泛应用,特别是在图像识别、对象检测和图像分类等任务中表现出色。下面是CNN的基本原理和关键概念:
卷积层(Convolutional Layer):CNN的核心组件之一。卷积层通过应用一系列滤波器(也称为卷积核)来提取输入数据的特征。每个滤波器在输入数据上进行滑动操作(卷积运算),计算出一系列特征映射。这些特征映射捕获了输入数据中的不同局部特征。
滤波器(Filter):滤波器是卷积层中的参数,用于提取输入数据的特征。每个滤波器都是一个小型的矩阵,其值在训练过程中通过反向传播算法学习得到。每个滤波器在输入数据上进行滑动操作,计算出一个特征映射。
步幅(Stride):步幅是指滤波器在输入数据上移动的步长。较大的步幅可以减小输出特征图的尺寸,而较小的步幅可以保持输出特征图的尺寸与输入数据相同。
填充(Padding):填充是在输入数据周围添加额外的值(通常是0),以控制输出特征图的尺寸。填充可以帮助保持特征图的空间维度,避免由于卷积操作导致尺寸减小而丢失信息。
激活函数(Activation Function):卷积层通常会在卷积运算后应用一个非线性激活函数,如ReLU(Rectified Linear Unit)。激活函数引入了非线性因素,使得网络能够学习复杂的数据模式。
池化层(Pooling Layer):池化层用于降低特征图的空间维度,并减少模型对位置变化的敏感性。常见的池化操作包括最大池化和平均池化,它们分别取输入区域的最大值或平均值作为输出值。
全连接层(Fully Connected Layer):全连接层是传统神经网络中的一种层,它将上一层的所有神经元与当前层的所有神经元相连接。在CNN中,全连接层通常用于将卷积和池化层提取的特征转换为最终的输出。
前向传播(Forward Propagation):在前向传播过程中,输入数据从输入层经过一系列卷积、激活和池化操作后,最终生成输出结果。
反向传播(Backpropagation):反向传播是用于训练CNN的一种常用方法。它通过计算损失函数对网络参数的梯度,并根据梯度更新参数。这个过程反复进行,直到模型收敛到最优解。
CNN的结构:
CNN通过多层卷积和池化操作逐渐提取输入数据的高级特征,然后通过全连接层将这些特征映射到最终的输出空间。这种层级结构使得CNN能够有效地处理大规模图像数据,并在图像识别和分类等任务中取得优秀的性能。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集将在本课程实验的几个示例中重用,并具有一些使其有趣的属性。第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。第二个特点是,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。
为了创建最终的数据集,我们从一个比课程补充材料中包含的版本处理更少的版本开始,并执行了几个数据集修改操作。第一个目的是减少这种不平衡——原始数据集中70%以上是俄文,这可能是由于抽样偏差或俄文姓氏的增多。为此,我们通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本。接下来,我们根据国籍对数据集进行分组,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
对数据集进行预处理:
- import collections
- import numpy as np
- import pandas as pd
- import re
-
- from argparse import Namespace
-
- # 定义命令行参数
- args = Namespace(
- raw_dataset_csv="surnames.csv", # 原始数据集文件名
- train_proportion=0.7, # 训练集比例
- val_proportion=0.15, # 验证集比例
- test_proportion=0.15, # 测试集比例
- output_munged_csv="surnames_with_splits.csv", # 输出文件名
- seed=1337 # 随机种子
- )
-
- # 读取原始数据集
- surnames = pd.read_csv(args.raw_dataset_csv, header=0)
- surnames.head()
-
- # 获取唯一的类别(国籍)
- set(surnames.nationality)
-
- # 按国籍划分训练集
- # 创建字典
- by_nationality = collections.defaultdict(list)
- for _, row in surnames.iterrows():
- by_nationality[row.nationality].append(row.to_dict())
-
- # 创建划分后的数据
- final_list = []
- np.random.seed(args.seed)
-
- # 按国籍划分并随机打乱数据
- for _, item_list in sorted(by_nationality.items()):
- np.random.shuffle(item_list)
- n = len(item_list)
- n_train = int(args.train_proportion * n)
- n_val = int(args.val_proportion * n)
- n_test = int(args.test_proportion * n)
-
- # 为每个数据点添加划分属性
- for item in item_list[:n_train]:
- item['split'] = 'train'
- for item in item_list[n_train:n_train+n_val]:
- item['split'] = 'val'
- for item in item_list[n_train+n_val:]:
- item['split'] = 'test'
-
- # 添加到最终列表中
- final_list.extend(item_list)
-
- # 将划分后的数据写入文件
- final_surnames = pd.DataFrame(final_list)
-
- final_surnames.head()
-
- # 将处理后的数据写入CSV文件
- final_surnames.to_csv(args.output_munged_csv, index=False)
结果:
查看前几行数据:
唯一类别:
数据集的划分结果:
查看划分后的数据集:
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
SurnameDataset的实现与“Example: classification of Sentiment of Restaurant Reviews”中的ReviewDataset几乎相同,只是在getitem方法的实现方式上略有不同。回想一下,本课程中呈现的数据集类继承自PyTorch的数据集类,因此,我们需要实现两个函数:__getitem
方法,它在给定索引时返回一个数据点;以及len方法,该方法返回数据集的长度。
- #创建一个自定义的数据集类来加载和处理文本数据
- #划分训练、验证和测试集,设置数据集大小,以及构建查找字典
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- name_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- #加载数据集并生成新的向量化器
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
- #加载数据集和相应的向量化器,用于重新使用已缓存的向量化器
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
- #从文件中加载向量化器
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameDataset
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
- #将向量化器保存到磁盘
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
- #回向量化器对象
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
- #划分数据集
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
- #返回指定数据集的大小
- def __len__(self):
- return self._target_size
- #根据指定的批量大小返回数据集中的批次数量
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's features (x_data) and label (y_target)
- """
- row = self._target_df.iloc[index]
-
- surname_matrix = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
- #生成批次数据
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
为了使用字符对姓氏进行分类,我们使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。这些数据结构与“Example: Classifying Sentiment of Restaurant Reviews”中使用的数据结构相同,它们举例说明了一种多态性,这种多态性将姓氏的字符标记与Yelp评论的单词标记相同对待。数据不是通过将字令牌映射到整数来向量化的,而是通过将字符映射到整数来向量化的。
- class Vocabulary(object):
- """用于处理文本并提取词汇的类"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 初始化Vocabulary对象
- Args:
- token_to_idx (dict): 一个词汇到索引的映射字典
- add_unk (bool): 是否添加UNK标记的标志
- unk_token (str): 要添加到词汇表中的UNK标记
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 创建索引到词汇的映射
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- # 如果需要添加UNK标记,则添加它
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
- def to_serializable(self):
- """返回一个可序列化的字典"""
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """从一个可序列化的字典中实例化Vocabulary对象"""
- return cls(**contents)
-
- def add_token(self, token):
- """根据词汇更新映射字典
-
- Args:
- token (str): 要添加到词汇表中的词汇
- Returns:
- index (int): 词汇对应的整数索引
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """将一个词汇列表添加到词汇表中
-
- Args:
- tokens (list): 一个字符串词汇列表
- Returns:
- indices (list): 与词汇对应的整数索引列表
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """检索与词汇关联的索引或UNK索引(如果词汇不存在)。
-
- Args:
- token (str): 要查找的词汇
- Returns:
- index (int): 与词汇对应的整数索引
- Notes:
- UNK功能需要unk_index >=0(已添加到词汇表中)
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """返回与索引关联的词汇
-
- Args:
- index (int): 要查找的索引
- Returns:
- token (str): 与索引对应的词汇
- Raises:
- KeyError: 如果索引不在词汇表中
- """
- if index not in self._idx_to_token:
- raise KeyError("索引(%d)不在词汇表中" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
- class SurnameVectorizer(object):
- """姓氏矢量化器,协调词汇表并将其应用于数据"""
-
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- """
- Args:
- surname_vocab (Vocabulary): 将字符映射到整数的词汇表
- nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
- max_surname_length (int): 最长姓氏的长度
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
- self._max_surname_length = max_surname_length
-
- def vectorize(self, surname):
- """
- Args:
- surname (str): 姓氏
- Returns:
- one_hot_matrix (np.ndarray): 一个独热向量矩阵
- """
- # 创建一个全零矩阵
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- # 将姓氏中的字符转换为独热向量
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从数据框实例化矢量化器
-
- Args:
- surname_df (pandas.DataFrame): 姓氏数据集
- Returns:
- SurnameVectorizer的一个实例
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable(),
- 'max_surname_length': self._max_surname_length}
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- Args:
- initial_num_channels (int): 输入特征向量的大小
- num_classes (int): 输出预测向量的大小
- num_channels (int): 网络中使用的常数通道大小
- """
- super(SurnameClassifier, self).__init__()
-
- # 定义卷积网络层
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- nn.ELU(), # 使用ELU激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2), # 步长为2的卷积层
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2), # 步长为2的卷积层
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3), # 没有步长的卷积层
- nn.ELU()
- )
-
- # 全连接层,将卷积层的输出映射到预测向量的大小
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
-
- Args:
- x_surname (torch.Tensor): 输入数据张量。
- x_surname.shape 应为 (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): softmax激活的标志
- 如果与交叉熵损失一起使用,应为false
- Returns:
- 结果张量。tensor.shape 应为 (batch, num_classes)
- """
- # 使用卷积网络进行特征提取
- features = self.convnet(x_surname).squeeze(dim=2)
-
- # 将提取的特征通过全连接层进行预测
- prediction_vector = self.fc(features)
-
- # 如果需要应用softmax激活函数,则进行softmax操作
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
- #创建一个表示训练状态的字典,初始化各种参数和指标
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- #处理训练状态的更新
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # 至少保存一个模型
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # 性能得到改善,则保存模型
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- #如果损失恶化
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # 损失减少
- else:
- # 保存最佳模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # 重置提前停止步骤
- train_state['early_stopping_step'] = 0
-
- # 早停?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- #计算模型预测的准确率
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
-
- #设置随机种子
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
- #处理文件目录
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
-
- args = Namespace(
- # 数据和路径信息
- surname_csv="surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- # 模型超参数
- hidden_dim=300,
- # 训练超参数
- seed=1337,
- num_epochs=5,
- early_stopping_criteria=5,
- learning_rate=0.001,
- batch_size=64,
- # 运行时选项
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- )
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 检查CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- print("Using CUDA: {}".format(args.cuda))
-
-
- # 为可重复性奠定种子
- set_seed_everywhere(args.seed, args.cuda)
-
- #句柄目录
- handle_dirs(args.save_dir)
-
- #确定是重新加载已有的数据集和词向量化器,还是创建新的数据集和词向量化器。
- if args.reload_from_files:
- # training from a checkpoint
- print("Reloading!")
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # create dataset and vectorizer
- print("Creating fresh!")
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- vectorizer = dataset.get_vectorizer()
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
与“Example: Classifying Sentiment of Restaurant Reviews”中的训练循环相比,本例的训练循环除了变量名以外几乎是相同的。具体来说,显示了使用不同的key从batch_dict中获取数据。除了外观上的差异,训练循环的功能保持不变。利用训练数据,计算模型输出、损失和梯度。然后,使用梯度来更新模型。
- epoch_bar = tqdm_notebook(desc='训练过程', # 创建一个进度条显示训练过程
- total=args.num_epochs, # 设置总的迭代次数
- position=0) # 设置进度条位置
-
- # 将数据集设置为训练集并创建一个训练集进度条
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='训练集', # 创建一个显示训练集进度的进度条
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- # 将数据集设置为验证集并创建一个验证集进度条
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='验证集', # 创建一个显示验证集进度的进度条
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs): # 循环迭代每个epoch
- train_state['epoch_index'] = epoch_index
-
- # 遍历训练数据集
-
- # 设置:批次生成器,将损失和准确率设置为0,设置为训练模式
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train() # 设置模型为训练模式
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 训练过程的5个步骤:
-
- # --------------------------------------
- # 步骤1. 梯度清零
- optimizer.zero_grad()
-
- # 步骤2. 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 步骤3. 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 步骤4. 使用损失计算梯度
- loss.backward()
-
- # 步骤5. 使用优化器更新参数
- optimizer.step()
- # -----------------------------------------
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 更新进度条
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # 遍历验证数据集
-
- # 设置:批次生成器,将损失和准确率设置为0,设置为评估模式
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval() # 设置模型为评估模式
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 步骤3. 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
该模型对测试数据的准确性达到50%左右。如果在附带的notebook中运行训练例程,会注意到在训练数据上的性能更高。这是因为模型总是更适合它所训练的数据,所以训练数据的性能并不代表新数据的性能。如果遵循代码,你可以尝试隐藏维度的不同大小,应该注意到性能的提高。然而,这种增长不会很大(尤其是与“用CNN对姓氏进行分类的例子”中的模型相比)。其主要原因是收缩的onehot向量化方法是一种弱表示。虽然它确实简洁地将每个姓氏表示为单个向量,但它丢弃了字符之间的顺序信息,这对于识别起源非常重要。
- # 加载模型权重
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- # 将模型移动到指定的设备上
- classifier = classifier.to(args.device)
- # 将类别权重也移动到指定的设备上
- dataset.class_weights = dataset.class_weights.to(args.device)
- # 使用交叉熵损失函数,并考虑类别权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 设置数据集为测试集,并生成批次数据
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval() # 设置模型为评估模式
-
- # 遍历测试数据集
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算模型输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 将测试结果保存到训练状态中
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- #输出损失和准确率
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
- def predict_nationality(surname, classifier, vectorizer):
- """预测一个新姓氏的国籍
-
- Args:
- surname (str): 待分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 对应的矢量化器
- Returns:
- 包含最可能的国籍及其概率的字典
- """
- # 将姓氏进行矢量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 将矢量化后的姓氏转换为张量,并增加一个维度
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- # 使用分类器进行预测,并应用softmax函数
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取概率最大的值及其索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 根据索引查找对应的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
- new_surname = input("Enter a surname to classify: ") # 输入待分类的姓氏
- classifier = classifier.cpu() # 将分类器移动到CPU上进行预测
- prediction = predict_nationality(new_surname, classifier, vectorizer) # 预测姓氏的国籍
- print("{} -> {} (p={:0.2f})".format(new_surname, # 打印预测结果
- prediction['nationality'],
- prediction['probability']))
-
- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
- """预测一个新姓氏的前K个国籍
-
- Args:
- surname (str): 待分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 对应的矢量化器
- k (int): 要返回的前K个国籍的数量
- Returns:
- 包含字典的列表,每个字典代表一个国籍及其概率
- """
-
- # 将姓氏进行矢量化
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
- # 获取预测向量
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
- # 获取概率最高的K个值和对应的索引
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 将结果转换为numpy数组
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- results = []
- # 遍历获取前K个预测结果
- for kth_index in range(k):
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- probability_value = probability_values[kth_index]
- results.append({'nationality': nationality,
- 'probability': probability_value})
- return results
-
- new_surname = input("Enter a surname to classify: ") # 输入待分类的姓氏
-
- k = int(input("How many of the top predictions to see? ")) # 选择要查看的前K个预测结果
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- # 获取前K个预测结果
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- # 打印每个预测结果
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
由于时间原因训练轮数只有5轮,所以结果较差:
损失和准确率:
给定一个姓氏作为字符串,然后获得模型预测:
给定的姓名进行国籍分类预测,并返回概率最高的前 k 个国籍预测结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。