赞
踩
1.什么是前馈神经网络
2.前馈神经网络在NLP中的应用
(1)文本分类:将文本分为预定义的类别或标签。例如将新闻文章分为政治、经济、体育等类别
(2)情感分析:分析文本中的情感倾向,如分析社交媒体上用户对产品或事件的态度等
(3)命名实体识别:识别文本中提及的具有特定意义的实体,如人名、地名、组织机构名等。
定义:多层感知器是一种特殊类型的前馈网络,至少包含一个隐藏层,且每个神经元与下一层的每个神经元完全连接。
单层感知机是一个二分类模型,可以很好的解决线性可分问题。
多层感知机通过对一个问题多次划分,可以解决线性不可分问题
最简单的MLP,由三个表示阶段(输入向量、隐藏向量和输出向量)和两个线性层组成。
下面定义一个多层感知机模型:
我们用PyTorch的两个线性模块实例化了这个想法。线性对象被命名为fc1和fc2,它们遵循一个通用约定,即将线性模块称为“完全连接层”,简称为“fc层”。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- super(MultilayerPerceptron, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim) # 定义第一个全连接层
- self.fc2 = nn.Linear(hidden_dim, output_dim) # 定义第二个全连接层
-
- def forward(self, x_in, apply_softmax=False):#前向传播
- # 通过第一个全连接层,并应用 ReLU 激活函数
- intermediate = F.relu(self.fc1(x_in))
- # 通过第二个全连接层
- output = self.fc2(intermediate)
-
- # 如果 apply_softmax 为 True,则应用 softmax 激活函数
- if apply_softmax:
- output = F.softmax(output, dim=1)
- return output # 返回最终输出
初始化多层感知机模型:
- batch_size = 2 # 输入样本大小
- input_dim = 3 # 输入向量的维度大小
- hidden_dim = 100 # 第一个全连接层的输出维度大小
- output_dim = 4 # 第二个全连接层的输出维度大小
-
- # 初始化多层感知器模型
- mlp = MultilayerPerceptron(input_dim, hidden_dim, output_dim)
-
- print(mlp)
运行结果:
2.1数据集准备
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。
导入需要的库
- from torch.utils.data import Dataset, DataLoader
- import torch.nn as nn
- import torch.optim as optim
- import pandas as pd
- import numpy as np
- from collections import Counter
- import string
- import torch
定义 SurnameDataset
类,处理数据集并返回姓氏向量和国籍索引
- from torch.utils.data import Dataset, DataLoader
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- surname_df = pd.read_csv(surname_csv)
- return cls(surname_df, SurnameVectorizer.from_dataframe(surname_df))
-
- def get_vectorizer(self):
- return self._vectorizer
-
- def set_split(self, split="train"):
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- row = self._target_df.iloc[index]
- surname_vector = self._vectorizer.vectorize(row.surname)
- nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
- return {'x_surname': surname_vector, 'y_nationality': nationality_index}
定义 SurnameVectorizer
类,将姓氏转化为向量,将国籍标签转化为索引
- class SurnameVectorizer(object):
-
- def __init__(self, surname_vocab, nationality_vocab):
-
- self.surname_vocab = surname_vocab # 保存姓氏词汇表
- self.nationality_vocab = nationality_vocab # 保存国籍词汇表
-
- def vectorize(self, surname):
- vocab = self.surname_vocab # 获取姓氏词汇表
- # 创建长度为词汇表大小的零数组
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- # 对每个字母进行 one-hot 编码
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- # 创建姓氏词汇表,并指定未知标记
- surname_vocab = Vocabulary(unk_token="@")
-
- # 创建国籍词汇表,不包含未知标记
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows(): # 遍历数据帧的每一行
- for letter in row.surname: # 遍历姓氏中的每一个字母
- surname_vocab.add_token(letter) # 将字母添加到姓氏词汇表
- nationality_vocab.add_token(row.nationality) # 将国籍添加到国籍词汇表
-
- return cls(surname_vocab, nationality_vocab)
分割数据集:
- # 加载数据集
- surname_df = pd.read_csv('data/surnames/surnames.csv')
-
- # 假设我们手动添加 split 列,分割数据集为训练集、验证集和测试集
- surname_df['split'] = 'train'
- #验证集随机抽取20%数据
- surname_df.loc[surname_df.sample(frac=0.2).index, 'split'] = 'val'
- #测试集随机抽取10%数据 ,所以剩下70%为训练集数据
- surname_df.loc[surname_df.sample(frac=0.1).index, 'split'] = 'test'
构建模型:
- import torch.nn as nn
- import torch.nn.functional as F
-
- class SurnameClassifier(nn.Module):
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim) # 第一层全连接层
- self.fc2 = nn.Linear(hidden_dim, output_dim) # 第二层全连接层
-
- def forward(self, x_in, apply_softmax=False):
- """
- 参数:
- x_in (torch.Tensor): 输入数据张量
- x_in 的形状应该是 (batch, input_dim)
- apply_softmax (bool): 是否应用 softmax 激活函数的标志
- 如果与交叉熵损失一起使用,应该设置为 False
- 返回值:
- 最终的张量,形状应该是 (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in)) # 在第一层后应用 ReLU 激活函数
- prediction_vector = self.fc2(intermediate_vector) # 用第二层计算预测值
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1) # 如果指定,应用 softmax 函数
-
- return prediction_vector
DROPOUT
Dropout的基本思想是在训练过程中,随机地将一部分神经元的输出设置为零。这样可以防止神经元之间形成过强的依赖关系,从而增强模型的泛化能力。
训练模型时,我们可以使用dropout层解决过拟合问题。注意,dropout不适用于评估期间
- import torch.nn as nn
- import torch.nn.functional as F
-
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- super(MultilayerPerceptron, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- intermediate = F.relu(self.fc1(x_in))
- #输出随机失活(dropout),概率为0.5
- output = self.fc2(F.dropout(intermediate, p=0.5))
-
- if apply_softmax:
- output = F.softmax(output, dim=1)
- return output
训练模型:(1)定义所需参数args
- from argparse import Namespace
-
- args = Namespace(
- #数据集路径
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",#vectorizer的json文件
- model_state_file="model.pth",#训练后模型的保存路径
- save_dir="model_storage/ch4/surname_mlp",#模型保存的根目录
- hidden_dim=300, #模型中隐藏层的维度
- #训练参数
- seed=1337, #随机数生成器的种子,用于使实验可复现
- num_epochs=100, #训练模型的迭代次数
- #提前停止训练的标准,通常是在验证集上监测模型性能,连续多次性能未提升则停止
- early_stopping_criteria=5,
- learning_rate=0.001, #学习率
- batch_size=64, #:训练过程中每个批次的样本数量
- )
(2)定义损失函数和优化器
- # 从 CSV 文件加载数据集并创建矢量化器
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
-
- # 获取矢量化器
- vectorizer = dataset.get_vectorizer()
-
- # 初始化分类器
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
-
- # 将分类器移到指定设备(例如 CPU 或 GPU)
- classifier = classifier.to(args.device)
-
- # 定义损失函数,并使用权重来适应数据集
- loss_func = nn.CrossEntropyLoss(weight=torch.ones(len(vectorizer.nationality_vocab)))
-
- # 使用 Adam 优化器,并设置学习率
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
(3)训练循环:训练过程主要分为5个步骤
- # 训练参数
- num_epochs = 50
- running_loss = 0.0
-
- for epoch in range(num_epochs):
- for batch_index, batch_dict in enumerate(train_loader):
- # Step 1: 清零梯度
- optimizer.zero_grad()
-
- # Step 2: 前向传播计算输出
- y_pred = model(batch_dict['x_surname'])
-
- # Step 3: 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_batch = loss.to("cpu").item()
- running_loss += (loss_batch - running_loss) / (batch_index + 1)
-
- # Step 4: 反向传播计算梯度
- loss.backward()
-
- # Step 5: 使用优化器更新参数
- optimizer.step()
-
- print(f'Epoch {epoch+1}, Loss: {running_loss:.4f}')
评估模型:
我们使用PyTorch张量最大函数来得到由最高预测概率表示的最优类。
- def predict_nationality(name, classifier, vectorizer):
- # 将姓名矢量化
- vectorized_name = vectorizer.vectorize(name)
- # 将矢量化后的姓名转换为张量,并调整形状以适应模型输入
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- # 使用分类器进行预测,并应用 softmax 激活函数
- result = classifier(vectorized_name, apply_softmax=True)
-
- # 获取最大概率值和对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 查找预测的国籍和对应的概率值
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- # 返回预测结果和概率
- return {'nationality': predicted_nationality,
- 'probability': probability_value}
不仅要看最好的预测,还要看更多的预测。例如,NLP中的标准实践是采用k-best预测并使用另一个模型对它们重新排序。PyTorch提供了一个torch.topk函数,它提供了一种方便的方法来获得这些预测
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- # 将姓名矢量化
- vectorized_name = vectorizer.vectorize(name)
- # 将矢量化后的姓名转换为张量,并调整形状以适应模型输入
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- # 使用分类器进行预测,并应用 softmax 激活函数
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- # 获取概率值和对应的索引的前 k 个值
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 将张量转换为 NumPy 数组
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- results = []
- # 遍历概率值和索引,将结果存储为字典
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
卷积神经网络介绍:
卷积神经网络(Convolutional Neural Networks, CNNs)是一种专门用于处理具有类似网格结构数据(如图像)的深度学习模型。CNN在图像分类、目标检测、图像分割等任务中表现出色。
卷积神经网络的基本组成包括卷积层、激活函数、池化层和全连接层,其中卷积层使用卷积核扫描输入数据提取局部特征,激活函数(如ReLU)引入非线性使模型能够学习复杂特征,池化层对特征图进行降采样以减少数据量,全连接层则将前面提取的特征映射到样本的类别空间。
3.1数据处理
导入需要的库
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
数据处理
向量化数据:创建Vocabulary、SurnameVectorizer、SurnameDataset类
(1)Vocabulary :用于处理文本并提取词汇以进行映射。主要是将词汇表中的词语(tokens)与它们对应的索引(indices)相互映射。
- class Vocabulary(object):
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 参数:
- token_to_idx (dict): 预先存在的词汇到索引的映射
- add_unk (bool): 是否添加UNK标记的标志
- unk_token (str): 要添加到词汇表中的UNK标记
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 创建从索引到词汇的反向映射
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- # 将UNK标记添加到词汇表中,并保存其索引
- self.unk_index = self.add_token(unk_token)
-
- # 可序列化的字典
- def to_serializable(self):
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):# 实例化词汇表
- return cls(**contents)
-
- #根据词汇更新映射字典
- def add_token(self, token):
- """
- 参数:
- token (str): 要添加到词汇表中的项
- Returns:
- index (int): 对应于该词汇的整数索引
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- # 如果词汇不在词汇表中,添加它并分配新索引
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- #将多个词汇添加到词汇表中
- def add_many(self, tokens):
- return [self.add_token(token) for token in tokens]
-
- #查找词汇,返回索引
- def lookup_token(self, token):
- if self.unk_index >= 0:
- # 返回词汇的索引,如果找不到则返回UNK索引
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- #返回该索引的词汇
- def lookup_index(self, index):
- if index not in self._idx_to_token:
- raise KeyError("索引 (%d) 不在词汇表中" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
(2)SurnameVectorizer:姓氏向量化即将姓氏转换为one-hot编码,并处理国籍标签。
- #协调词汇表并向量化
- class SurnameVectorizer(object):
-
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- """
- 参数:
- surname_vocab (Vocabulary): 将字符映射到整数的词汇表
- nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
- max_surname_length (int): 最长姓氏的长度
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
- self._max_surname_length = max_surname_length
-
- #将姓氏向量化为一个one-hot矩阵
- def vectorize(self, surname):
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df): #surname_df: 姓氏数据集
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- #从可序列化字典实例化向量化器
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- """ 返回可序列化的词汇表和最大姓氏长度的字典 """
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable(),
- 'max_surname_length': self._max_surname_length}
(3)SurnameDataset:初始化数据集,划分训练、验证和测试集,并使用 SurnameVectorizer
将姓氏和国籍标签向量化,计算类别权重,用于处理类别不平衡问题.
- class SurnameDataset(Dataset):
-
- def __init__(self, surname_df, vectorizer): #初始化数据集
- self.surname_df = surname_df
- self._vectorizer = vectorizer
- self.train_df = self.surname_df[self.surname_df.split == 'train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split == 'val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split == 'test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # 计算类别权重
- class_counts = surname_df.nationality.value_counts().to_dict()
-
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
-
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- #加载数据集并创建一个新的向量化器
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split == 'train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- #加载数据集并加载对应的向量化器(
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- #从文件加载向量化器的静态方法
- def load_vectorizer_only(vectorizer_filepath):
- #vectorizer_filepath (str): 序列化的向量化器文件路径
-
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- #使用json保存向量化器到磁盘
-
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- return self._vectorizer
-
- #划分数据集
- def set_split(self, split="train"):
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- # 获取数据集中指定索引的行
- row = self._target_df.iloc[index]
-
- # 将姓氏进行向量化,得到姓氏矩阵
- surname_matrix = self._vectorizer.vectorize(row.surname)
-
- # 将国籍标签转换为对应的索引
- nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- # 返回包含姓氏矩阵和国籍索引的字典
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- return len(self) // batch_size
-
- def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"):
- """
- 生成器函数。确保每个张量位于正确的设备位置。
- 参数:
- dataset (Dataset): 数据集
- batch_size (int): 批量大小
- shuffle (bool): 是否随机打乱数据
- drop_last (bool): 是否丢弃最后一个不完整的批次
- device (str): 设备('cpu'或'cuda')
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
3.2创建模型
SurnameClassifier定义了一个用于姓氏分类的卷积神经网络。它包含卷积层和全连接层,用于处理输入姓氏的特征并进行国籍分类。
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- Args:
- initial_num_channels (int): 输入特征向量的通道数
- num_classes (int): 输出预测向量的大小,即类别数
- num_channels (int): 网络中各层使用的通道数
- """
- super(SurnameClassifier, self).__init__()
-
- # 定义卷积神经网络(ConvNet)
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3), # 第一个卷积层
- nn.ELU(), # 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2), # 第二个卷积层
- nn.ELU(), # 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2), # 第三个卷积层
- nn.ELU(), # 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3), # 第四个卷积层
- nn.ELU() # 激活函数
- )
- # 定义全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
-
- Args:
- x_surname (torch.Tensor): 输入数据张量。x_surname的形状应该是 (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): 是否应用softmax激活函数
- 如果与交叉熵损失一起使用,应该设置为False
- Returns:
- 返回的张量。张量的形状应该是 (batch, num_classes)
- """
- # 通过卷积网络提取特征
- features = self.convnet(x_surname).squeeze(dim=2)
-
- # 通过全连接层生成预测向量
- prediction_vector = self.fc(features)
-
- # 如果需要,应用softmax激活函数
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
3.3训练模型
(1)初始化训练状态参数
- def make_train_state(args):
- #args: 包含训练参数的对象(例如命令行参数或配置对象)
-
- #Returns:: 一个包含训练状态信息的字典
-
- return {'stop_early': False, # 是否早停
- 'early_stopping_step': 0, # 早停计数器
- 'early_stopping_best_val': 1e8, # 记录最佳验证损失,用于早停
- 'learning_rate': args.learning_rate, # 学习率
- 'epoch_index': 0, # 当前的训练周期
- 'train_loss': [], # 记录每个周期的训练损失
- 'train_acc': [], # 记录每个周期的训练准确率
- 'val_loss': [], # 记录每个周期的验证损失
- 'val_acc': [], # 记录每个周期的验证准确率
- 'test_loss': -1, # 测试损失
- 'test_acc': -1, # 测试准确率
- 'model_filename': args.model_state_file} # 模型保存路径
准备工作:设置好设备,设置随机种子保证实验可重复性,设置好文件保存路径
- # 检查是否有可用的CUDA设备
- if not torch.cuda.is_available():
- args.cuda = False
-
- # 设置设备为CUDA或CPU
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- # 设置随机种子以确保可重复性
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- # 创建目录路径,如果不存在则创建
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
- # 设置随机种子
- set_seed_everywhere(args.seed, args.cuda)
-
- # 处理保存目录
- handle_dirs(args.save_dir)
- #训练时更新参数
- def update_train_state(args, model, train_state):
-
- #先保存一个模型
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # 模型提升时保存模型
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # 保存最好的模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # 是否早停
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
(3)计算准确率
- def compute_accuracy(y_pred, y_target):
- y_pred_indices = y_pred.max(dim=1)[1]
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
(4)训练模型并输出结果
- # 获取向量化器
- vectorizer = dataset.get_vectorizer()
-
- # 初始化分类器
- classifier = SurnameClassifier(
- initial_num_channels=len(vectorizer.surname_vocab), # 姓氏词汇表的大小
- num_classes=len(vectorizer.nationality_vocab), # 国籍词汇表的大小
- num_channels=args.num_channels # 网络中使用的通道数
- )
-
- # 将分类器移动到指定的设备(例如GPU或CPU)
- classifier = classifier.to(args.device)
-
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 初始化损失函数,使用交叉熵损失
- loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
-
- # 初始化优化器,使用Adam优化器,并设置学习率
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
-
- # 初始化学习率调度器
- # 当验证损失停止改善时,降低学习率
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(
- optimizer=optimizer,
- mode='min', # 最小化模式
- factor=0.5, # 学习率缩放因子
- patience=1 # 容忍的周期数
- )
-
- # 创建训练状态字典
- train_state = make_train_state(args)
用tqdm创建进度条
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
训练:在每个训练周期内,迭代训练数据集,计算损失和准确率,并使用优化器更新模型参数。
在每个训练周期后,迭代验证数据集,计算损失和准确率,以评估模型在验证集上的性能。
stop_early:通过检查验证集的损失来决定是否提前停止训练,以防止模型过拟合。
最后根据验证集的损失调整学习率,帮助模型更好地收敛。
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # 设置:批量生成器,将损失和准确率置零,设置为训练模式
- dataset.set_split('train')
- batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 训练流程的五个步骤:
-
- # 第一步:清零梯度
- optimizer.zero_grad()
-
- # 第二步:计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 第三步:计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 第四步:使用损失计算梯度
- loss.backward()
-
- # 第五步:使用优化器进行梯度更新
- optimizer.step()
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 更新训练进度条
- train_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
- train_bar.update()
-
- # 保存训练集的损失和准确率
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # 迭代验证数据集
-
- # 设置:批量生成器,将损失和准确率置零,设置为验证模式
- dataset.set_split('val')
- batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 更新验证进度条
- val_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
- val_bar.update()
-
- # 保存验证集的损失和准确率
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- # 更新训练状态(可能包括早停机制)
- train_state = update_train_state(args=args, model=classifier, train_state=train_state)
-
- # 更新学习率调度器
- scheduler.step(train_state['val_loss'][-1])
-
- # 如果需要早停,则退出训练循环
- if train_state['stop_early']:
- break
-
- # 重置训练和验证进度条的位置
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
-
- except KeyboardInterrupt:
- print("Exiting loop")
3.4评估模型
- def predict_nationality(surname, classifier, vectorizer):
- # 将姓氏向量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量化的姓氏转换为张量并增加一个维度
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- # 使用分类器进行预测,并应用 softmax 激活函数
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取最大概率值及其对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 根据索引查找预测的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- # 获取概率值
- probability_value = probability_values.item()
-
- # 返回包含预测国籍及其概率的字典
- return {'nationality': predicted_nationality, 'probability': probability_value}
- #输入姓氏
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.cpu()
- # 使用分类器和向量化器预测姓氏的国籍
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- # 输出预测结果
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
-
- # 向量化姓氏
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
-
- # 使用分类器进行预测
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取前K个概率值和对应的索引
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # 结果的尺寸是 (1, k)
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- # 构建结果列表
- results = []
- for kth_index in range(k):
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- probability_value = probability_values[kth_index]
- results.append({'nationality': nationality,
- 'probability': probability_value})
- return results
-
- # 获取用户输入的姓氏
- new_surname = input("Enter a surname to classify: ")
-
- # 获取用户输入要查看的预测数量
- k = int(input("How many of the top predictions to see? "))
-
- # 如果输入的数量超过国籍总数,默认使用最大国籍数
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- # 使用函数预测前K个国籍
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- # 打印预测结果
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
-
-
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。