赞
踩
目录
三、多层感知机(The Multilayer Perceptron)
作为现存最简单的神经网络,感知器不能处理非线性可分的决策任务(比如图1.1 XOR数据集),这种情况下我们可以考虑两种前馈神经网络模型:多层感知器和卷积神经网络。多层感知器(MLP)被认为是最基本的神经网络构建模块之一;卷积神经网络,因其窗口特性能够在输入中学习局部化模式,这不仅使其成为计算机视觉的主轴,而且在检测单词和句子等序列数据中的子结构任务中效果很好。接下来会详细介绍MLP和卷积神经网络的结构及其在姓氏分类任务中的运用。
(1) 通过“示例:带有多层感知器的姓氏分类”,掌握多层感知器在多层分类中的应用
(2)掌握每种类型的神经网络层对它所计算的数据张量的大小和形状的影响
(3)尝试带有dropout的SurnameClassifier模型,看看它如何更改结果
(4)实验数据为姓氏数据集surnames.csv
最简单的MLP由三层组成(如图3.1所示),分别为1.输入向量,负责接收输入特征;2.隐藏向量,位于输入层和输出层之间的中间层。每个隐藏层包含多个神经元(节点),隐藏层的输入即为输入层的输出,值是组成该层的不同感知器的输出;3.输出向量,产生最终输出,在分类任务中,每个神经元代表一个类别标签。
在MLP中,每个神经元类似于感知器,计算其输入的加权和,并应用激活函数以产生输出。一个层中神经元的输出作为下一层神经元的输入,通过网络传播信息。其中最常用的激活函数有sigmoid、tanh(双曲正切)和ReLU(修正线性单元),这些函数引入非线性,使MLP能够学习数据中的复杂关系。
MLP的多层神经元(输入层、隐藏层和输出层),使其能够学习和表示数据中的非线性关系。它能够近似复杂的函数,学习不同抽象层次的特征。MLP使用激活函数并能够通过反向传播训练来调整权重和偏置,因此比单层感知器更适合处理复杂的机器学习任务。
在图3.2中,错误分类的数据点用黑色填充,而正确分类的数据点没有填充。从填充的形状可以看出,感知器在学习可以将星星和圆分开的决策边界方面有困难。然而,MLP展现了一个很精确地对星和圆进行分类的决策边界。这是因为感知器没有像MLP那样的中间表示来分组和重新组织来处理数据的形状直至它们变成线性可分的,所以它不能将圆和星分开。
在实例化中,设置了两个线性模块fc1和fc2作为全连接层,其中第一个全连接层使用 ReLU 激活函数,作用于第一个线性层的输出并确保层中的输出数量等于下一层的输入数量;
- import torch.nn as nn
- import torch.nn.functional as F
-
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- 初始化多层感知器模型
- Args:
- input_dim (int): 输入向量的大小
- hidden_dim (int): 第一个全连接层的输出大小
- output_dim (int): 第二个全连接层的输出大小
- """
- super(MultilayerPerceptron, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)# 第一个全连接层
- self.fc2 = nn.Linear(hidden_dim, output_dim)# 第二个全连接层
-
- def forward(self, x_in, apply_softmax=False):
- """多层感知器的前向传播
- Args:
- x_in (torch.Tensor): 输入数据张量。
- x_in.shape 应为 (batch, input_dim)
- apply_softmax (bool): 是否对输出进行 softmax 激活。
- 如果与交叉熵损失一起使用,则应为 False
- Returns:
- 结果张量。张量形状应为 (batch, output_dim)
- """
- intermediate = F.relu(self.fc1(x_in)) # 使用 ReLU 激活函数的第一个全连接层
- output = self.fc2(intermediate) # 第二个全连接层
-
- if apply_softmax:
- output = F.softmax(output, dim=1) # 如果应用 softmax,则在输出上进行 softmax 操作
- return output
输入维度大小为3,输出维度大小为4,和隐藏维度大小为100
- batch_size = 2 # 每次输入的样本数量
- input_dim = 3 # 输入向量的维度
- hidden_dim = 100 # 第一个全连接层的输出维度
- output_dim = 4 # 第二个全连接层的输出维度
-
- # 初始化模型
- mlp = MultilayerPerceptron(input_dim, hidden_dim, output_dim)
- print(mlp) # 打印模型结构
MLP模型结构输出:
- import torch
-
- def describe(x):
- """打印张量的类型、形状和数值"""
- print("Type: {}".format(x.type())) # 打印张量类型
- print("Shape/size: {}".format(x.shape)) # 打印张量形状
- print("Values: \n{}".format(x)) # 打印张量数值
-
- x_input = torch.rand(batch_size, input_dim) # 创建随机输入张量
- describe(x_input) # 调用 describe 函数打印张量信息
- y_output = mlp(x_input, apply_softmax=False) # 使用模型进行前向传播,不应用 softmax
- describe(y_output)
x_input的输出结果:
y_output的输出结果:(softmax函数用于将一个值向量转换为概率)
由上述输出分析可知mlp是将张量映射到其他张量的线性层,通过在每两层之间使用非线性来打破数据之间的线性关系,并允许模型扭曲向量空间以达到类之间的线性可分性。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,是作者从互联网上不同的姓名来源收集的。读取姓氏数据集并查看前五行示例:
- import collections
- import numpy as np
- import pandas as pd
- import re
- from argparse import Namespace
- args = Namespace(
- raw_dataset_csv="data/surnames/surnames.csv",
- train_proportion=0.7,
- val_proportion=0.15,
- test_proportion=0.15,
- output_munged_csv="data/surnames/surnames_with_splits.csv",
- seed=1337
- )
- # Read raw data
- surnames = pd.read_csv(args.raw_dataset_csv, header=0)
- surnames.head()
运行结果:
数据集划分与保存:
- # 获取姓氏数据集中的所有国籍类别,并使用集合(set)确保每个国籍只出现一次
- set(surnames.nationality)
- # 按国籍划分训练集
- by_nationality = collections.defaultdict(list)
- for _, row in surnames.iterrows():
- by_nationality[row.nationality].append(row.to_dict())
-
- final_list = []
- np.random.seed(args.seed)
- for _, item_list in sorted(by_nationality.items()):
- np.random.shuffle(item_list)
- # 根据给定的比例计算每个数据集划分(训练集、验证集、测试集)的样本数量
- n = len(item_list)
- n_train = int(args.train_proportion*n)
- n_val = int(args.val_proportion*n)
- n_test = int(args.test_proportion*n)
-
- for item in item_list[:n_train]:
- item['split'] = 'train'
- for item in item_list[n_train:n_train+n_val]:
- item['split'] = 'val'
- for item in item_list[n_train+n_val:]:
- item['split'] = 'test'
-
- final_list.extend(item_list)
- # 从最终的数据点列表 final_list 中创建一个 DataFrame
- final_surnames = pd.DataFrame(final_list)
- final_surnames.split.value_counts()
- final_surnames.head()
- # 保存成csv文件
- final_surnames.to_csv(args.output_munged_csv, index=False)
所有姓氏有:
每个数据集中的数据点数量:
DataFrame数据的前几行:
该数据集内容是非常不平衡的,其中排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。另外,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系,有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。因此需要通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本选择,来消除数据集的不平衡问题。另外,在数据集处理函数部分返回的是一个向量化的姓氏和与其国籍相对应的索引。
数据集处理类函数(SurnameDataset)如下:
- from torch.utils.data import Dataset, DataLoader
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- surname_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameVectorizer
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's:
- features (x_surname)
- label (y_nationality)
- """
- row = self._target_df.iloc[index]
-
- surname_vector = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
需要用词汇表将姓氏字符串转换为向量化的minibatches以便机器读取并使用字符对姓氏进行分类的SurnameVectorizer()类代码如下:
- class SurnameVectorizer(object):
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
- def __init__(self, surname_vocab, nationality_vocab):
- """
- Args:
- surname_vocab (Vocabulary): maps characters to integers
- nationality_vocab (Vocabulary): maps nationalities to integers
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- """
- Args:
- surname (str): the surname
- Returns:
- one_hot (np.ndarray): a collapsed one-hot encoding
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """Instantiate the vectorizer from the dataset dataframe
-
- Args:
- surname_df (pandas.DataFrame): the surnames dataset
- Returns:
- an instance of the SurnameVectorizer
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}
用于处理文本和提取映射词汇的类Vocabulary()完整代码:
- class Vocabulary(object):
- """Class to process text and extract vocabulary for mapping"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- Args:
- token_to_idx (dict): a pre-existing map of tokens to indices
- add_unk (bool): a flag that indicates whether to add the UNK token
- unk_token (str): the UNK token to add into the Vocabulary
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- """ returns a dictionary that can be serialized """
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """ instantiates the Vocabulary from a serialized dictionary """
- return cls(**contents)
-
- def add_token(self, token):
- """Update mapping dicts based on the token.
- Args:
- token (str): the item to add into the Vocabulary
- Returns:
- index (int): the integer corresponding to the token
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """Add a list of tokens into the Vocabulary
-
- Args:
- tokens (list): a list of string tokens
- Returns:
- indices (list): a list of indices corresponding to the tokens
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """Retrieve the index associated with the token
- or the UNK index if token isn't present.
-
- Args:
- token (str): the token to look up
- Returns:
- index (int): the index corresponding to the token
- Notes:
- `unk_index` needs to be >=0 (having been added into the Vocabulary)
- for the UNK functionality
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """Return the token associated with the index
-
- Args:
- index (int): the index to look up
- Returns:
- token (str): the token corresponding to the index
- Raises:
- KeyError: if the index is not in the Vocabulary
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
同上述3.2.1中的实例化MLP,最后的softmax是否应用取决于是否需要确保输出和为1,完整的MLP实例化代码如下:
- class SurnameClassifier(nn.Module):
- """ A 2-layer Multilayer Perceptron for classifying surnames """
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the classifier
-
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
基于本实验,MLP进行姓氏分类所需的参数结构如下:
- args = Namespace(
- # Data and path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- # Model hyper parameters
- hidden_dim=300,
- # Training hyper parameters
- seed=1337,
- num_epochs=100,
- early_stopping_criteria=5,
- learning_rate=0.001,
- batch_size=64,
- # Runtime options
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- )
-
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # Check CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- print("Using CUDA: {}".format(args.cuda))
-
-
- # Set seed for reproducibility
- set_seed_everywhere(args.seed, args.cuda)
-
- # handle dirs
- handle_dirs(args.save_dir)
输出结果:
其中要用到一些函数:
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
实例化数据集、模型、损失和优化器:
- import pandas as pd
- # 加载数据集并创建Vectorizer
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- vectorizer = dataset.get_vectorizer()
-
- # 初始化分类器
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
-
- # 将分类器移到指定的设备上
- classifier = classifier.to(args.device)
- # 定义损失函数为交叉熵损失,并考虑类别权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 定义优化器为Adam,并传入分类器的参数和学习率
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
使用不同的key从batch_dict中获取数据,利用训练数据,计算模型输出、损失和梯度。然后,使用梯度来更新模型,循环代码如下:
- from tqdm import tqdm_notebook
-
- # 创建用于显示进度条的epoch_bar,用于跟踪训练过程中的epoch
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs, # 总共的epoch数量
- position=0) # 进度条在显示中的位置
-
- # 设置数据集的split为训练集
- dataset.set_split('train')
-
- # 创建用于显示训练集进度的train_bar,用于跟踪每个epoch中的批次
- train_bar = tqdm_notebook(desc='split=train', # 进度条的描述
- total=dataset.get_num_batches(args.batch_size), # 总共的批次数量
- position=1, # 进度条在显示中的位置
- leave=True) # 训练完成后是否保留进度条
-
- # 设置数据集的split为验证集
- dataset.set_split('val')
-
- # 创建用于显示验证集进度的val_bar,用于跟踪每个epoch中的批次
- val_bar = tqdm_notebook(desc='split=val', # 进度条的描述
- total=dataset.get_num_batches(args.batch_size), # 总共的批次数量
- position=1, # 进度条在显示中的位置
- leave=True) # 训练完成后是否保留进度条
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
循环的结果:
- # compute the loss & accuracy on the test set using the best available model
-
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
-
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
测试结果与准确率:
模型对测试数据的准确性达到50%左右,在训练数据上的准确率会更高,但是总体的准确率都不高,这是因为one-hot向量表示丢弃了字符之间的顺序信息,是一种弱表示;
还需要测试该模型对新数据的预测结果来判断模型的好坏,给定一个姓氏作为字符串,首先进行向量化过程,然后进行模型预测;
- def predict_nationality(surname, classifier, vectorizer):
- """Predict the nationality from a new surname
-
- Args:
- surname (str): the surname to classifier
- classifier (SurnameClassifer): an instance of the classifier
- vectorizer (SurnameVectorizer): the corresponding vectorizer
- Returns:
- a dictionary with the most likely nationality and its probability
- """
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).view(1, -1)
- result = classifier(vectorized_surname, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
将apply_softmax标志设置为True,结果将为类别概率预测,使用PyTorch张量最大函数得到由最高预测概率表示的最优类,即作为预测结果:
对代码做一些修改,通过torch.topk函数使用K-best方法可以检索模型预测概率最大的前K个结果,以便进行对比:
- vectorizer.nationality_vocab.lookup_index(8)
-
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- vectorized_name = vectorizer.vectorize(name)
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # returned size is 1,k
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- results = []
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
-
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
-
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
输出结果如下:
在模型训练过程中经常出现过拟合的现象,解决过拟合问题有两种重要的权重正则化类型——L1和L2正则化,除此之外,对于MLP这种深度模型来说,结构正则化方法也非常常用,即Dropout,添加Dropout的实例化MLP模型如下:
# 在第一个全连接层的输出上应用dropout,然后传递给第二个全连接层
output = self.fc2(F.dropout(intermediate, p=0.5))
- import torch.nn as nn
- import torch.nn.functional as F
-
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- 初始化多层感知机模型
- Args:
- input_dim (int): 输入向量的大小
- hidden_dim (int): 第一个线性层的输出大小
- output_dim (int): 第二个线性层的输出大小
- """
- super(MultilayerPerceptron, self).__init__()
- # 定义第一个全连接层
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- # 定义第二个全连接层
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """
- MLP的前向传播
- Args:
- x_in (torch.Tensor): 输入数据张量,x_in.shape应为(batch, input_dim)
- apply_softmax (bool): 是否应用softmax激活函数,如果用于交叉熵损失,应设置为False
- Returns:
- 结果张量,tensor.shape应为(batch, output_dim)
- """
- # 使用ReLU激活函数的第一个全连接层
- intermediate = F.relu(self.fc1(x_in))
- # 在第一个全连接层的输出上应用dropout,然后传递给第二个全连接层
- output = self.fc2(F.dropout(intermediate, p=0.5))
-
- # 如果apply_softmax为True,则应用softmax激活函数
- if apply_softmax:
- output = F.softmax(output, dim=1)
- return output
由测试结果可知在加入过拟合处理后,模型准确率会有一定提高,但不会大幅度提升(结果截图没有保存)。
这是一种非常适合检测空间子结构(并因此创建有意义的空间子结构)的神经网络。CNNs通过使用少量的权重来扫描输入数据张量来实现这一点。通过这种扫描,它们产生表示子结构检测(或不检测)的输出张量。
CNN的结构可以从四个方面进行分析,一是输入层:本实验的输入即为预处理后的姓氏数据集信息;
二是卷积层(Convolutional Layer):卷积层是CNN的核心组成部分,由多个滤波器(或卷积核)组成,每个滤波器与输入数据进行卷积操作。滤波器通过滑动窗口的方式在输入数据上移动,计算每个位置的卷积结果,从而提取局部特征。每个滤波器在不同位置的计算共享参数,从而减少模型的参数数量和计算复杂度。其中,每个卷积层后通常会添加非线性激活函数,激活函数类型与MLP模型相同,如ReLU等,用于引入非线性特征并增加网络的表达能力;
三是池化层(Pooling Layer):池化层用于减少卷积层输出的空间维度,包括降低数据体积、参数数量,以及控制过拟合。常用的池化操作包括最大池化(Max Pooling)和平均池化(Average Pooling),它们分别取池化窗口中的最大值或平均值作为输出;
四是全连接层(Fully Connected Layer):在CNN的顶部,通常会添加全连接层,用于将卷积层和池化层提取的特征映射转换为最终的输出。全连接层的每个神经元与前一层的所有神经元相连,执行分类或回归等任务;
五是输出层:根据全连接层的信息得到概率最大的结果即为输出。
具体的卷积运算如图4.1,输入矩阵(4X4的矩阵)与单个产生输出矩阵的卷积核(3X3矩阵)每次移动卷积核都将自身的值乘以输入矩阵对应位置的值,然后将这些乘法相加作为映射位置的输出值(2X2矩阵),就是一个降维的过程。另外,CNN的设计中超参数的设定尤为重要,一般包括卷积核大小(Kernel_size)、输入张量填充(Padding)、步长(stride)、通道数(channel)等,需要根据实验需求灵活选取,且取值的合理性会很大的影响模型效果。
第一步是将PyTorch的Conv1d类的一个实例应用到三维数据张量,通过检查输出的大小,可以知道张量减少了多少:
- batch_size = 2 # 批量大小
- one_hot_size = 10 # one-hot编码的大小
- sequence_width = 7 # 序列宽度
- # 生成随机数据,形状为(batch_size, one_hot_size, sequence_width)
- data = torch.randn(batch_size, one_hot_size, sequence_width)
-
- # 创建一个一维卷积层,输入通道数为one_hot_size,输出通道数为16,卷积核大小为3
- conv1 = Conv1d(in_channels=one_hot_size, out_channels=16,
- kernel_size=3)
-
- # 将数据传递给卷积层进行前向传播
- intermediate1 = conv1(data)
-
- # 打印原始数据和经过第一个卷积层后的形状
- print(data.size()) # 打印原始数据形状
- print(intermediate1.size()) # 打印经过第一个卷积层后的形状
运行结果可以看出数据形状的减小:
通过添加额外的卷积可以减小张量,在应用两个额外卷积之后,输出结果在最终维度上的大小才为1:
- # 设置卷积层
- conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3)
- conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3)
-
- intermediate2 = conv2(intermediate1)
- intermediate3 = conv3(intermediate2)
- #打印卷积后的结果
- print(intermediate2.size())
- print(intermediate3.size())
在每次卷积中,通道维数的大小都会增加,因为通道维数是每个数据点的特征向量:
- y_output = intermediate3.squeeze() #squeeze()从张量中移除大小为1的维度
- print(y_output.size())#打印张量 y_output 的大小
使用squeeze()方法去掉不需要的尺寸(1维),输出结果如下:
另外,减小张量还可以使用PyTorch的view()方法将所有向量平展成单个向量(即Method2),或者通过求算术平均值/最大值/沿feature map维数求和的方法(即Method3):
- # Method 2 of reducing to feature vectors
- print(intermediate1.view(batch_size, -1).size())
-
- # Method 3 of reducing to feature vectors
- print(torch.mean(intermediate1, dim=2).size())
- # print(torch.max(intermediate1, dim=2).size())
- # print(torch.sum(intermediate1, dim=2).size())
示例结果如下:
CNN实现姓氏分类的主要函数类如下,主要包含三个类,该部分与MLP基本相同,不做赘述。
- # 导入必要的库和模块
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
- import numpy as np
- import pandas as pd
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
-
- class Vocabulary:
- """用于处理文本并提取词汇表的类"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 初始化Vocabulary类
- Args:
- token_to_idx (dict): 一个现有的将标记映射到索引的字典
- add_unk (bool): 一个指示是否添加UNK标记的标志
- unk_token (str): 要添加到词汇表中的UNK标记
- """
- # 如果token_to_idx为None,则初始化为空字典
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 创建从索引到标记的映射字典
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- # 设置是否添加UNK标记和UNK标记的值
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- # 如果需要添加UNK标记,则添加UNK标记并获取其索引
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
- def to_serializable(self):
- """返回一个可序列化的字典"""
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """从一个序列化的字典实例化Vocabulary"""
- return cls(**contents)
-
- def add_token(self, token):
- """根据标记更新映射字典。
- Args:
- token (str): 要添加到词汇表中的项
- Returns:
- index (int): 与标记对应的整数
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- # 如果标记不在词汇表中,则将其添加并获取新的索引
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """Add a list of tokens into the Vocabulary
-
- Args:
- tokens (list): a list of string tokens
- Returns:
- indices (list): a list of indices corresponding to the tokens
- """
- # 使用列表推导将每个标记添加到词汇表中,并获取对应的索引
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """Retrieve the index associated with the token
- or the UNK index if token isn't present.
-
- Args:
- token (str): the token to look up
- Returns:
- index (int): the index corresponding to the token
- Notes:
- `unk_index` needs to be >=0 (having been added into the Vocabulary)
- for the UNK functionality
- """
- # 如果存在UNK索引,则使用get方法检索标记的索引,否则直接检索
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """返回与索引关联的标记
-
- Args:
- index (int): 要查找的索引
- Returns:
- token (str): 与索引对应的标记
- Raises:
- KeyError: 如果索引不在词汇表中
- """
- # 检查索引是否在索引到标记的映射字典中,如果不存在,则引发KeyError
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- # 返回与索引对应的标记
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
尽管词汇表和DataLoader的实现方式与MLP中的相同,但Vectorizer的vectorize()方法已经更改,以适应CNN模型的需要。具体来说,该函数将字符串中的每个字符映射到一个整数,然后使用该整数构造一个由onehot向量组成的矩阵,矩阵中的每一列都是不同的onehot向量。主要原因是,我们将使用的Conv1d层要求数据张量在第0维上具有批处理,在第1维上具有通道,在第2维上具有特性,还修改了矢量化器,以便计算姓氏的最大长度并将其保存为max_surname_length。
- class SurnameVectorizer(object):
- """
- SurnameVectorizer类用于处理姓氏数据的向量化,包括构建姓氏和国籍的词汇表并将其应用于数据向量化。
- Attributes:
- surname_vocab (Vocabulary): 姓氏词汇表,用于将姓氏转换为向量表示。
- nationality_vocab (Vocabulary): 国籍词汇表,用于将国籍转换为向量表示。
- """
-
- def __init__(self, surname_vocab, nationality_vocab):
- """
- 初始化SurnameVectorizer对象。
- Args:
- surname_vocab (Vocabulary): 姓氏词汇表。
- nationality_vocab (Vocabulary): 国籍词汇表。
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- """
- 将姓氏向量化。
- Args:
- surname (str): 姓氏字符串。
- Returns:
- np.ndarray: 姓氏的向量表示(折叠的独热编码)。
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """
- 从数据集DataFrame实例化Vectorizer对象。
- Args:
- surname_df (pandas.DataFrame): 姓氏数据集DataFrame。
- Returns:
- SurnameVectorizer: 实例化的SurnameVectorizer对象。
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- """
- 从可序列化内容实例化Vectorizer对象。
- Args:
- contents (dict): 包含可序列化内容的字典。
- Returns:
- SurnameVectorizer: 实例化的SurnameVectorizer对象。
- """
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- """
- 将Vectorizer对象转换为可序列化内容。
- Returns:
- dict: 包含可序列化内容的字典。
- """
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}
尽管我们使用了与“多层感知器的姓氏分类”相同数据集,但在实现上有一个不同之处:数据集由onehot向量矩阵组成,而不是一个收缩的onehot向量。为此,我们实现了一个数据集类,它跟踪最长的姓氏,并将其作为矩阵中包含的行数提供给矢量化器。列的数量是onehot向量的大小(词汇表的大小)。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- 初始化SurnameDataset类
- Args:
- surname_df (pandas.DataFrame): 数据集
- vectorizer (SurnameVectorizer): 从数据集实例化的矢量化器
- """
- # 保存数据集和矢量化器
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- # 根据数据集的拆分设置训练集、验证集和测试集
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- # 创建一个字典来存储各个拆分的数据集和大小
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- # 设置当前使用的数据集拆分,默认为训练集
- self.set_split('train')
-
- # 类别权重
- # 统计各个国籍的样本数量
- class_counts = surname_df.nationality.value_counts().to_dict()
- # 定义排序关键字函数
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- # 按国籍词汇表的顺序对样本数量进行排序
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- # 获取排序后的样本频率
- frequencies = [count for _, count in sorted_counts]
- # 计算类别权重,即样本频率的倒数
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """加载数据集并从头创建一个新的矢量化器
- Args:
- surname_csv (str): 数据集的位置
- Returns:
- SurnameDataset的实例
- """
- # 从CSV文件中读取姓氏数据集
- surname_df = pd.read_csv(surname_csv)
- # 从数据集中获取训练集的部分
- train_surname_df = surname_df[surname_df.split=='train']
- # 使用训练集的部分创建一个新的矢量化器
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """加载数据集和相应的矢量化器。
- 在矢量化器已被缓存以便重复使用的情况下使用。
- Args:
- surname_csv (str): 数据集的位置
- vectorizer_filepath (str): 保存的矢量化器的位置
- Returns:
- SurnameDataset的实例
- """
- # 从CSV文件中读取姓氏数据集
- surname_df = pd.read_csv(surname_csv)
- # 加载已保存的矢量化器
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- # 返回一个数据集实例,同时传入数据集和加载的矢量化器
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """从文件加载矢量化器的静态方法
- Args:
- vectorizer_filepath (str): 序列化矢量化器的位置
- Returns:
- SurnameVectorizer的实例
- """
- # 使用json.load从文件中加载序列化的矢量化器,并返回一个实例
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """使用json将矢量化器保存到磁盘
- Args:
- vectorizer_filepath (str): 要保存矢量化器的位置
- """
- # 使用json.dump将矢量化器序列化并保存到文件中
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """返回矢量化器"""
- return self._vectorizer
-
- def set_split(self, split="train"):
- """根据DataFrame中的列选择数据集的拆分"""
- # 设置目标拆分和对应的DataFrame以及大小
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- """返回数据集的大小"""
- return self._target_size
-
- def __getitem__(self, index):
- """PyTorch数据集的主要入口方法
- Args:
- index (int): 数据点的索引
- Returns:
- 一个字典,包含数据点的特征 (x_data) 和标签 (y_target)
- """
- # 获取指定索引处的行数据
- row = self._target_df.iloc[index]
-
- # 对姓氏进行矢量化
- surname_matrix = self._vectorizer.vectorize(row.surname)
-
- # 获取国籍在词汇表中的索引
- nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """给定批量大小,返回数据集中的批次数
- Args:
- batch_size (int)
- Returns:
- 数据集中的批次数
- """
- # 计算批次数并返回
- return len(self) // batch_size
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- 一个生成器函数,用于封装PyTorch的DataLoader。它会确保每个张量都在正确的设备位置上。
-
- Args:
- dataset (Dataset): 要生成批次的数据集
- batch_size (int): 每个批次的大小
- shuffle (bool): 是否在每个epoch开始前打乱数据集,默认为True
- drop_last (bool): 如果数据集的大小不能被批次大小整除,是否丢弃最后一个不完整的批次,默认为True
- device (str): 张量所在的设备,例如"cpu"或"cuda:0"等,默认为"cpu"
-
- Yields:
- 一个字典,包含从数据集中获取的批次数据,并确保每个张量都在指定的设备上
- """
- # 创建一个PyTorch DataLoader
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- # 遍历DataLoader中的每个批次
- for data_dict in dataloader:
- out_data_dict = {}
- # 将批次中的每个张量移到指定的设备上
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- # 生成经过处理的批次数据
- yield out_data_dict
相比于MLP中的SurnameClassifier模块,本例使用sequence和ELU PyTorch模块,使用sequeece来封装Conv1d序列的应用程序;ELU是类似于ReLU的非线性函数,但是它不是将值裁剪到0以下,而是对它们求幂:
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- 初始化方法
-
- Args:
- initial_num_channels (int): 输入特征向量的大小
- num_classes (int): 输出预测向量的大小
- num_channels (int): 网络中使用的恒定通道大小
- """
- super(SurnameClassifier, self).__init__()
-
- # 定义卷积神经网络模型
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- nn.ELU(), # 使用RELU作为激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- nn.ELU()
- )
- # 定义全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
-
- Args:
- x_surname (torch.Tensor): 输入数据张量。
- x_surname.shape 应为 (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): 是否应用softmax激活。
- 如果与交叉熵损失一起使用,应为False
- Returns:
- 结果张量。tensor.shape 应为 (batch, num_classes)
- """
- # 运行卷积神经网络模型
- features = self.convnet(x_surname).squeeze(dim=2)
-
- # 将特征张量传递给全连接层
- prediction_vector = self.fc(features)
-
- # 如果需要应用softmax,则应用softmax激活
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
CNN实例中的参数设置:
- args = Namespace(
- # Data and Path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/cnn",
- # Model hyper parameters
- hidden_dim=100,
- num_channels=256,
- # Training hyper parameters
- seed=1337,
- learning_rate=0.001,
- batch_size=128,
- num_epochs=100,
- early_stopping_criteria=5,
- dropout_p=0.1,
- # Runtime omitted for space ...
- )
评估和测试:
- def predict_nationality(surname, classifier, vectorizer):
- """预测一个新姓氏的国籍
- Args:
- surname (str): 要分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 对应的向量化器
- Returns:
- dict: 包含最可能的国籍及其概率的字典
- 'nationality' (str): 预测的国籍
- 'probability' (float): 预测的概率值
- """
- # 使用向量化器将姓氏转换为向量表示
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量转换为 PyTorch 的张量,并添加批次维度
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- # 使用分类器进行预测,应用 softmax 激活函数
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取最大概率值及其对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 使用向量化器的国籍词汇表查询预测的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- # 获取预测的概率值
- probability_value = probability_values.item()
-
- # 返回预测的国籍及其概率值的字典
- return {'nationality': predicted_nationality, 'probability': probability_value}
-
-
在本次实验中,我们探索了多层感知器(MLP)在多层分类任务中的应用,并深入了解了神经网络不同层次对数据张量大小和形状的影响,同时尝试了带有dropout的模型以观察其对结果的影响。
多层感知器是一种经典的前馈神经网络,适用于解决分类问题。在姓氏分类示例中,我们使用了MLP来根据姓氏的字母序列预测姓氏的语言和国家来源。以下是我们从实验中学到的几个关键点:
特征表示与预处理:姓氏作为文本数据,需要转换为数字形式输入到MLP中。我们使用了字符级别的one-hot编码来表示姓氏的每个字符序列,这样神经网络能够理解和处理。
模型架构设计:MLP由多个全连接层组成,每个隐藏层通过激活函数引入非线性,最终输出层使用softmax激活函数生成预测的概率分布。
训练与优化:我们使用了反向传播算法和随机梯度下降(SGD)来优化模型参数,以最小化损失函数(如交叉熵),从而提高分类准确率。
不同类型的神经网络层(如卷积层、池化层、全连接层等)对输入数据张量的大小和形状有不同的影响:
卷积层:卷积操作保留了输入数据的空间结构,通过滤波器(卷积核)的滑动窗口在输入数据上提取特征。卷积操作会减少输出的空间维度,但增加通道数(深度)。
池化层:池化层通过取局部区域的最大值或平均值来降低数据维度,通常减少输入大小,但保持深度不变。
全连接层:全连接层将前一层的所有神经元与当前层的每个神经元相连,扁平化数据张量,从而影响数据的维度。
dropout通过在训练过程中随机将一部分神经元置为零来降低神经元之间的依赖性,从而增强了模型的泛化能力。
实验结果:在SurnameClassifier模型中引入dropout后,观察到训练集和验证集上的准确率可能会略有下降,但模型在未见过的数据上的表现更稳定,避免了过拟合。
思考提升:在实际应用中,合理使用dropout可以提升模型的泛化能力,特别是在数据集较小或者复杂度较高时更为有效。需要注意的是,dropout的使用应根据具体任务和数据情况进行调整,避免过度正则化导致欠拟合。
本次实验通过实际操作和观察,我加深了对多层感知器在多类分类任务中的理解,掌握了神经网络结构与模型构建,同时学习了各项数据处理方式对模型性能的影响。在未来的深度学习任务中,希望进一步探索不同类型的神经网络结构和优化技术,以提升模型的性能和泛化能力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。