赞
踩
前馈神经网络(Feedforward Neural Network,简称FFNN)是一种基本的人工神经网络模型,广泛应用于自然语言处理和其他机器学习任务中。它的基本定义和工作原理如下:
一、定义
前馈神经网络是一种由多层神经元组成的神经网络,其中信息从输入层经过一层一层的隐藏层,最终到达输出层,每一层的神经元都与下一层的所有神经元相连,而各层之间没有反馈连接。每个神经元都通过一种激活函数(如ReLU、sigmoid、tanh等)处理其输入,以产生输出。
二、原理
1.输入层:接受输入数据的层。在自然语言处理中,输入可以是文本数据的词向量表示,或者是对文本进行编码后的特征向量。
2.隐藏层:中间层,通常由多个全连接层组成。每个隐藏层的神经元通过权重连接到上一层的所有神经元,并且每个神经元都应用一个激活函数来计算输出。隐藏层的数量和每个隐藏层的神经元数量是设计网络结构时的关键参数。
3.输出层:产生最终预测或输出的层。在自然语言处理中,输出可以是词汇表中的词语概率分布,也可以是某个标签的预测概率。
4.激活函数:每个神经元在接收到输入后,会先进行加权求和,然后通过一个激活函数来产生输出。常见的激活函数包括sigmoid函数、tanh函数和ReLU函数等,它们的选择会影响神经网络的性能和训练过程中的稳定性。
5.反向传播算法:用于训练前馈神经网络的主要算法是反向传播(Backpropagation)。它通过计算损失函数对每个权重的梯度,然后使用梯度下降法或其变种来更新网络中的权重,从而使得网络能够逐步优化和学习输入数据的模式。
三、在自然语言处理中的应用
前馈神经网络在自然语言处理中的应用非常广泛,例如:
1.文本分类:通过前馈神经网络可以对文本进行分类,比如情感分析、主题分类等。
2.命名实体识别:将文本中的命名实体(如人名、地名等)识别出来。
3.语言模型:预测下一个词的可能性,如机器翻译、语音识别等任务。
4.序列标注:如词性标注、命名实体识别等序列预测任务。
总之,前馈神经网络作为最基础和常见的神经网络模型之一,在自然语言处理中发挥着重要作用,为处理和理解文本数据提供了有效的工具和方法。
多层感知器(Multilayer Perceptron,MLP)是一种最基本和经典的前馈神经网络(Feedforward Neural Network,FFNN)模型。它由多个神经元层组成,包括输入层、至少一个或多个隐藏层以及一个输出层。MLP通常用于解决分类(Classification)和回归(Regression)问题,它具有以下特点和结构:
结构和特点
1.输入层(Input Layer):输入层接受来自数据集的特征向量作为输入。每个特征通常是一个实数值,可以是原始特征或者经过特征工程处理得到的。
2.隐藏层(Hidden Layers):MLP至少包含一个或多个隐藏层。每个隐藏层由多个神经元组成,每个神经元接收前一层的所有神经元的输出,并通过加权求和后应用一个激活函数来计算输出。每个隐藏层可以有不同数量的神经元,这是设计MLP时需要调整的一个重要超参数。
3.输出层(Output Layer):输出层生成MLP的最终输出。根据任务的不同,输出层可以有一个神经元(用于二元分类)、多个神经元(用于多类分类),或者连续值(用于回归任务)。
4.激活函数(Activation Functions):在每个神经元中,通常会使用非线性激活函数(如ReLU、sigmoid、tanh等)来引入非线性特性,使得网络能够学习更复杂的模式和特征。
5.反向传播算法(Backpropagation):MLP的训练通常使用反向传播算法。该算法通过计算损失函数对网络中所有参数(权重和偏置)的梯度,并利用梯度下降法或其变种来更新参数,以最小化损失函数。
应用和优势
1.广泛应用:MLP在各种机器学习任务中广泛应用,包括图像分类、语音识别、自然语言处理、推荐系统等。
2.非线性能力:多层结构和非线性激活函数使得MLP能够捕捉和表达复杂的数据模式,比如在自然语言处理中处理非线性关系和序列数据。
3.可扩展性:MLP的结构可以扩展到更深的层次和更大的规模,使其在处理大规模数据和复杂任务时仍然有效。
总之,多层感知器(MLP)作为前馈神经网络的一种典型形式,为解决各种机器学习问题提供了一种有效的方法和工具。
我们在一个二元分类任务中训练感知器和MLP:每个数据点是一个二维坐标。在不深入研究实现细节的情况下,最终的模型预测如图4-3所示。在这个图中,错误分类的数据点用黑色填充,而正确分类的数据点没有填充。在左边的面板中,从填充的形状可以看出,感知器在学习一个可以将星星和圆分开的决策边界方面有困难。然而,MLP(右面板)学习了一个更精确地对星星和圆进行分类的决策边界。
在PyTorch中,我们使用两个线性模块(通常被称为“完全连接层”或简称“fc层”)来构建一个简单的神经网络结构。这些线性层分别被命名为fc1和fc2。在fc1和fc2之间,我们引入了一个修正的线性单元(ReLU)作为非线性激活函数,它有助于模型学习复杂的非线性关系。为了保持网络结构的连贯性,必须确保fc1的输出特征数量与fc2的输入特征数量相匹配。这种层与层之间的连接确保了信息的有效传递。代码如下
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- 初始化多层感知机模型,包括两个全连接层。
-
- Args:
- input_dim (int): 输入向量的大小
- hidden_dim (int): 第一个线性层的输出大小
- output_dim (int): 第二个线性层的输出大小
- """
- super(MultilayerPerceptron, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim) # 第一个全连接层
- self.fc2 = nn.Linear(hidden_dim, output_dim) # 第二个全连接层
-
- def forward(self, x_in, apply_softmax=False):
- """
- 多层感知机的前向传播过程。
-
- Args:
- x_in (torch.Tensor): 输入数据张量。
- x_in.shape 应该是 (batch, input_dim)
- apply_softmax (bool): softmax激活函数的标志
- 如果与交叉熵损失一起使用,应为false
- Returns:
- 结果张量。tensor.shape 应该是 (batch, output_dim)
- """
- intermediate = F.relu(self.fc1(x_in)) # 第一层全连接层经过ReLU激活函数处理
- output = self.fc2(intermediate) # 第二层全连接层
-
- if apply_softmax:
- output = F.softmax(output, dim=1) # 如果需要应用softmax激活函数,则进行softmax处理
- return output
- batch_size = 2 # number of samples input at once
- input_dim = 3
- hidden_dim = 100
- output_dim = 4
-
- # Initialize model
- mlp = MultilayerPerceptron(input_dim, hidden_dim, output_dim)
- print(mlp)
使用大小为3的输入维度、大小为4的输出维度和大小为100的隐藏维度。
结果如下:
在分类任务中,这些特征向量往往被解释为预测向量,它们表示了模型对于不同类别的预测概率分布。预测向量的后续处理取决于模型当前所处的阶段:训练或推理。在训练阶段,预测向量会与真实的目标类标签一起输入到损失函数中,以计算预测与实际之间的误差,从而指导模型的权重更新。而在推理或预测阶段,这些预测向量可能经过额外的处理(如选择最大概率的类别)以得到最终的预测结果。
带有多层感知器(MLP)的姓氏分类是一个利用多层前馈神经网络来解决分类问题的应用实例。姓氏分类是一个典型的分类问题,目标是将给定的姓氏数据正确归类到相应的类别中。多层感知器(MLP)作为神经网络的一种,具有强大的非线性映射能力和学习能力,适合用于解决此类问题。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集将在本课程实验的几个示例中重用,并具有一些使其有趣的属性。第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。
数据收集:收集包含不同姓氏的数据集,每个姓氏都与一些特征相关联。在姓氏分类问题中,姓氏本身可能就是唯一的特征,但为了演示,可以假设有其他的辅助特征。
数据预处理:对收集到的数据进行预处理,包括缺失值填充、数据归一化等。归一化是将数据缩放到同一尺度,有助于加快模型的训练速度和提高性能。
数据划分:将数据集划分为训练集、验证集和测试集。通常,训练集用于训练模型,验证集用于调整超参数和评估模型性能测试集用于最终评估模型的泛化能力。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- 参数:
- surname_df (pandas.DataFrame): 数据集
- vectorizer (SurnameVectorizer): 从数据集实例化的向量化器
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- # 拆分数据集为训练集、验证集和测试集
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # 类别权重
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """加载数据集并从头创建一个新的向量化器
-
- 参数:
- surname_csv (str): 数据集的位置
- 返回:
- SurnameDataset的一个实例
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """加载数据集和相应的向量化器。用于向量化器已被缓存以供重用的情况
-
- 参数:
- surname_csv (str): 数据集的位置
- vectorizer_filepath (str): 已保存的向量化器的位置
- 返回:
- SurnameDataset的一个实例
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """从文件加载向量化器的静态方法
-
- 参数:
- vectorizer_filepath (str): 序列化向量化器的位置
- 返回:
- SurnameVectorizer的一个实例
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """使用json将向量化器保存到磁盘
-
- 参数:
- vectorizer_filepath (str): 保存向量化器的位置
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ 返回向量化器 """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ 使用数据框中的列选择数据集的拆分 """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """PyTorch数据集的主要入口点方法
-
- 参数:
- index (int): 数据点的索引
- 返回:
- 一个包含数据点的字典:
- 特征 (x_surname)
- 标签 (y_nationality)
- """
- row = self._target_df.iloc[index]
-
- surname_vector = self._vectorizer.vectorize(row.surname)
-
- nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """给定批量大小,返回数据集中的批次数量
-
- 参数:
- batch_size (int)
- 返回:
- 数据集中的批次数量
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- 一个包装PyTorch DataLoader的生成器函数。它将确保每个张量在正确的设备位置上。
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
- class Vocabulary(object):
- """用于处理文本并提取词汇以进行映射的类"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 初始化Vocabulary实例。
- 参数:
- token_to_idx (dict): 一个现有的将标记映射到索引的字典
- add_unk (bool): 一个指示是否添加UNK标记的标志
- unk_token (str): 要添加到词汇表中的UNK标记
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 创建从索引到标记的反向映射
- self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- # 如果要求,将UNK标记添加到词汇表中
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
- def to_serializable(self):
- """返回一个可序列化的字典。"""
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """从序列化字典实例化Vocabulary。"""
- return cls(**contents)
-
- def add_token(self, token):
- """基于标记更新映射字典。
- 参数:
- token (str): 要添加到Vocabulary中的项
- 返回:
- index (int): 对应于标记的整数
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """将标记列表添加到Vocabulary中。
-
- 参数:
- tokens (list): 字符串标记列表
- 返回:
- indices (list): 与标记对应的索引列表
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """检索与标记相关联的索引,如果标记不存在,则使用UNK索引。
-
- 参数:
- token (str): 要查找的标记
- 返回:
- index (int): 与标记相关的索引
- 注意:
- UNK功能需要unk_index >= 0(已添加到Vocabulary中)
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """返回与索引相关联的标记。
-
- 参数:
- index (int): 要查找的索引
- 返回:
- token (str): 与索引相关的标记
- 引发:
- KeyError: 如果索引不在Vocabulary中
- """
- if index not in self._idx_to_token:
- raise KeyError("索引(%d)不在Vocabulary中" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- """返回Vocabulary的字符串表示形式。"""
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- """返回Vocabulary中唯一标记的数量。"""
- return len(self._token_to_idx)
- class SurnameVectorizer(object):
- """协调Vocabularies并将它们应用于实际用途的向量化器"""
-
- def __init__(self, surname_vocab, nationality_vocab):
- """
- 参数:
- surname_vocab (Vocabulary): 将字符映射到整数的词汇表
- nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- """
- 将姓氏向量化为一种折叠的one-hot编码。
- 参数:
- surname (str): 姓氏
- 返回:
- one_hot (np.ndarray): 一个折叠的one-hot编码
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从数据集DataFrame实例化向量化器。
- 参数:
- surname_df (pandas.DataFrame): 姓氏数据集
- 返回:
- SurnameVectorizer的实例
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- """返回一个可序列化的字典。"""
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}
构建一个多层感知器(MLP)模型涉及以下几个关键步骤:
1. 数据准备
首先,准备数据集,确保数据集包含输入特征和相应的标签。数据应该被分成训练集、验证集和测试集。
2. 定义模型结构
MLP是一种前馈神经网络,由多个层组成:输入层、多个隐藏层和输出层。每一层都由神经元组成,每个神经元与前一层的所有神经元连接,每个连接有一个权重。以下是构建MLP模型的关键组成部分:
1.输入层:输入层神经元数等于特征向量的维度。
2.隐藏层:可以有一个或多个隐藏层,每个隐藏层包含多个神经元。隐藏层的选择通常依赖于数据的复杂性和问题的特征。每个隐藏层通常使用激活函数来引入非线性,如ReLU(Rectified Linear Unit)或sigmoid函数。
3.输出层:输出层的神经元数通常等于分类问题中的类别数量。对于多类别分类任务,输出层通常使用softmax激活函数,以输出每个类别的概率分布;对于二分类任务,可以使用sigmoid激活函数。
3. 模型构建
在构建模型时,可以使用深度学习框架如TensorFlow、PyTorch或Keras来简化实现过程。以下是使用Keras(TensorFlow的高级API)构建一个简单的MLP模型的示例:
- from keras.models import Sequential
- from keras.layers import Dense
-
- # 定义模型
- model = Sequential()
-
- # 添加输入层和第一个隐藏层
- model.add(Dense(units=64, activation='relu', input_dim=input_dim))
-
- # 添加更多隐藏层
- model.add(Dense(units=64, activation='relu'))
-
- # 添加输出层
- model.add(Dense(units=num_classes, activation='softmax'))
-
- # 编译模型
- model.compile(loss='categorical_crossentropy',
- optimizer='adam',
- metrics=['accuracy'])
-
- # 输出模型的结构
- model.summary()
在这个例子中:
4.Sequential模型允许按顺序添加层。
5.Dense层定义全连接层,units参数指定神经元数量,activation参数指定激活函数。
6.input_dim是输入数据的特征维度。
7.compile方法配置模型的学习过程,包括损失函数、优化器和评估指标。
4. 训练模型
使用准备好的训练数据对模型进行训练:
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_val, y_val))
5. 模型评估和预测
训练完成后,使用测试数据集评估模型的性能,并使用模型进行预测:
- score = model.evaluate(X_test, y_test, batch_size=32)
- predictions = model.predict(X_new_data)
学习率调度器初始化:使用ReduceLROnPlateau调度器,当验证集上的性能(如损失)不再提升时,减少学习率。
初始化训练状态:创建一个字典train_state来存储训练过程中的状态信息。
进度条设置:使用tqdm库设置训练和验证过程的进度条,以便于监控训练进度。
训练循环:循环迭代args.num_epochs次,每次迭代代表一个训练周期。
训练数据集迭代:设置数据集为训练模式,创建批处理生成器,初始化损失和准确率,设置模型为训练模式。对于每个批次,执行以下步骤:计算准确率并更新进度条,更新优化器,进行梯度下降,计算损失并反向传播,计算模型输出y_pred,清零梯度。
验证数据集迭代:设置数据集为验证模式,创建批处理生成器,初始化损失和准确率,设置模型为评估模式。类似于训练数据集迭代,但不对模型参数进行更新,只计算损失和准确率。
更新训练状态:根据验证损失更新训练状态,可能包括早停(early stopping)等策略。
学习率调整:使用scheduler.step()根据验证损失调整学习率。
早停检查:如果满足早停条件,则退出训练循环。
进度条重置:重置训练和验证进度条的计数器。
异常处理:使用try-except结构来捕获KeyboardInterrupt,允许用户通过中断信号(如Ctrl+C)安全退出训练循环。
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
- # 初始化训练状态
- train_state = make_train_state(args)
-
- epoch_bar = tqdm(desc='training routine', # 迭代轮次
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
-
- # 设置: 批处理生成器, 将损失和准确率设为0, 设置训练模式
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # 步骤 1. 清零梯度
- optimizer.zero_grad()
-
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- # 更新训练状态并根据验证损失调整学习率
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
训练过程展示:
使用训练好的模型来预测新姓氏的分类。
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- # 将名字向量化
- vectorized_name = vectorizer.vectorize(name)
- # 将向量转换为张量并调整形状
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- # 使用分类器对名字进行预测,并应用softmax函数
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- # 获取前k个最大概率值和对应的索引
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # returned size is 1,k
- # 将概率值和索引转换为numpy数组
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- # 初始化结果列表
- results = []
- # 遍历概率值和索引,将民族和概率值添加到结果列表中
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
- # 输入要分类的姓氏
- new_surname = input("Enter a surname to classify: ")
-
- # 将分类器移到CPU上
- classifier = classifier.to("cpu")
-
- # 询问用户要显示的前k个预测结果
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- # 调用函数进行预测
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- # 打印前k个预测结果
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
使用CNN对姓氏进行分类的优势在于其强大的特征提取能力,可以自动学习姓氏中的关键特征,而不需要手动设计特征。然而,对于文本数据,卷积神经网络可能不如循环神经网络(RNN)或Transformer等模型在处理序列数据方面灵活。不过,CNN在某些特定的文本分类任务中仍然表现出色。
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
- def compute_accuracy(y_pred, y_target):
- y_pred_indices = y_pred.max(dim=1)[1]
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
- args = Namespace(
- # 数据和路径信息
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/cnn",
- # 模型超参数
- hidden_dim=100,
- num_channels=256,
- # 训练超参数
- seed=1337,
- learning_rate=0.001,
- batch_size=128,
- num_epochs=100,
- early_stopping_criteria=5,
- dropout_p=0.1,
- # 运行时选项
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- catch_keyboard_interrupt=True
- )
-
- # 如果需要将文件路径扩展到保存目录,则进行扩展
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 检查CUDA是否可用
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
- # 设置随机种子以确保可重复性
- set_seed_everywhere(args.seed, args.cuda)
-
- # 处理目录
- handle_dirs(args.save_dir)
- if args.reload_from_files:
- # 如果从文件中加载模型,则从检查点恢复训练
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # 否则,创建数据集和向量化器
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- # 获取向量化器
- vectorizer = dataset.get_vectorizer()
-
- # 初始化分类器
- classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab),
- num_classes=len(vectorizer.nationality_vocab),
- num_channels=args.num_channels)
-
- # 将分类器移动到指定设备(CPU或GPU)
- classifer = classifier.to(args.device)
- # 将类别权重移动到指定设备(CPU或GPU)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 定义损失函数,使用加权交叉熵损失
- loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
- # 定义优化器,使用Adam算法
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- # 定义学习率调度器,当验证损失不再降低时,降低学习率
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- # 创建训练状态对象
- train_state = make_train_state(args)
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
训练过程展示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。