赞
踩
目录
前馈网络是一种神经网络模型,其信息流向从输入层到输出层单向流动,没有反馈或循环连接。在这种网络中,每一层的神经元接收前一层神经元的输出,并计算其激活值,然后将这些激活值传递给下一层,直到到达输出层。这种层次化的结构使得前馈网络能够有效地处理具有复杂特征的数据,并通过逐层提取特征的方式来实现对数据的分类、回归或其他任务。
接下来将探索和使用两种前馈神经网络:多层感知器和卷积神经网络。并将它们应用到自然语言处理的实际问题上,实现对姓氏的分类。
实验使用数据保存在surnames.csv文件中,每个数据含有姓氏和国籍两个属性。首先使用pandas库中的read_csv函数读取文件,再根据国籍划分训练集并创建字典,最后为划分后的数据添加分割属性(“train”,“val”,“test”),并保存文件,作为后续训练模型使用的数据集。
- # 导入需要的库
- import collections
- import numpy as np
- import pandas as pd
- import re
- from argparse import Namespace
-
- # 定义参数(包括训练集、验证集、测试集的比例,输入输出的文件名)
- args = Namespace(
- raw_dataset_csv="surnames.csv",
- train_proportion=0.7,
- val_proportion=0.15,
- test_proportion=0.15,
- output_munged_csv="surnames_with_splits.csv",
- seed=1337
- )
-
- # 读取原始数据并查看前5条
- surnames = pd.read_csv(args.raw_dataset_csv, header=0)
- surnames.head()
-
- # 按国籍划分训练集并创建字典
- # 使用collections类中的defaultdict()方法来为字典提供默认值,防止引发‘KeyError’异常
- by_nationality = collections.defaultdict(list)
- for _, row in surnames.iterrows():
- by_nationality[row.nationality].append(row.to_dict())
-
- # 创建划分数据
- final_list = []
- np.random.seed(args.seed)
- for _, item_list in sorted(by_nationality.items()):
- np.random.shuffle(item_list)
- n = len(item_list)
- n_train = int(args.train_proportion*n)
- n_val = int(args.val_proportion*n)
- n_test = int(args.test_proportion*n)
-
- # 给数据集添加分割属性
- for item in item_list[:n_train]:
- item['split'] = 'train'
- for item in item_list[n_train:n_train+n_val]:
- item['split'] = 'val'
- for item in item_list[n_train+n_val:]:
- item['split'] = 'test'
-
- # 添加到最终列表
- final_list.extend(item_list)
-
-
- # 创建最终的姓氏数据集
- final_surnames = pd.DataFrame(final_list)
-
- # 统计各个分割后的数据样本数量
- final_surnames.split.value_counts()
-
- # 输出最终数据集的前5条
- final_surnames.head()
-
- # 将最终姓氏数据保存为CSV文件
- final_surnames.to_csv(args.output_munged_csv, index=False)
处理前的数据集 | 处理后的数据集 |
新保存的文件中数据增加了"spilt"属性,即完成了后续对训练集、测试集、验证集的划分。
在自然语言处理 (NLP) 中,定义类是一种重要的编程实践,它有助于组织和管理代码,同时简化复杂的任务。
例如,在这个实验中,对于姓氏的处理,可以定义Vocabulary类用于将文本转换为索引或索引转换为文本。这种类的存在有助于在NLP任务中创建和维护词汇表,使得文本数据能够被有效地处理和表示。通过这种方式,可以确保我们能够在模型训练和使用过程中对文本数据进行一致的处理。
定义一个Vocabulary类,用于管理词汇表,方便地将文本转换为索引或索引转换为文本,包含以下方法:
__init__
方法:初始化Vocabulary类对象,可指定词汇表的初始词-索引映射,是否添加未知词(add_unk),以及未知词的标记(unk_token)。
to_serializable
方法:返回一个可以被序列化(例如用于保存到文件或网络传输)的表示Vocabulary对象的字典。
from_serializable
类方法:从序列化内容中重构Vocabulary对象。
add_token
方法:向词汇表中添加一个新的词,并返回其索引。如果词已存在,则返回其现有索引。
add_many
方法:批量添加多个词到词汇表中,返回一个包含每个词对应索引的列表。
lookup_token
方法:查找给定词的索引,若未找到且允许添加未知词,则返回未知词索引。
lookup_index
方法:根据给定的索引返回对应的词,若索引不存在则触发KeyError异常。
__str__
方法:返回一个描述词汇表大小的字符串。
__len__
方法:返回词汇表中词的数量。
- class Vocabulary(object):
- """用于处理文本并提取词汇以进行映射的类"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 参数:
- token_to_idx (dict): token到索引的预先存在的映射字典
- add_unk (bool): 指示是否添加UNK标记的标志
- unk_token (str): 要添加到词汇表中的UNK标记
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- """返回可序列化的字典"""
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """从序列化的字典实例化词汇表"""
- return cls(**contents)
-
- def add_token(self, token):
- """根据token更新映射字典。
- 参数:
- token (str): 要添加到词汇表中的项
- 返回:
- index (int): 对应于token的整数
- """
- if token in self._token_to_idx:
- index = self._token_to_idx[token]
- else:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """将单词列表添加到词汇表中
-
- Args:
- tokens (list): 单词列表
- Returns:
- indices (list): 单词对应的索引列表
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """查找单词对应的索引,如果单词不存在则返回UNK索引
-
- Args:
- token (str): 要查找的单词
- Returns:
- index (int): 单词对应的索引
- Notes:
- 为了使用UNK功能,unk_index需要>=0(已添加到词汇表中)
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """根据索引查找对应的单词
-
- Args:
- index (int): 要查找的索引
- Returns:
- token (str): 索引对应的单词
- Raises:
- KeyError: 如果索引不存在于词汇表中
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
再定义一个SurnameVectorizer类,主要功能是在自然语言处理任务中用于对姓氏进行向量化,为机器学习算法提供可用的输入形式。包含以下方法:
__init__
方法:初始化SurnameVectorizer类对象,接受三个参数:surname_vocab(姓氏的词汇表)、nationality_vocab(国籍的词汇表)、max_surname_length(最大姓氏长度)。
vectorize
方法:接受一个姓氏作为参数,返回一个独热编码的矩阵,表示给定的姓氏。独热编码矩阵的大小为(surname_vocab的长度, max_surname_length),在矩阵中将姓氏的字符按照其在词汇表中的索引进行独热编码表示。
from_dataframe
类方法:从一个包含姓氏和国籍信息的DataFrame中构建SurnameVectorizer对象。在此方法中,首先创建一个针对姓氏的词汇表(surname_vocab),然后遍历数据框,找到最大的姓氏长度,将每个字符添加到姓氏词汇表中,同时将国籍添加到国籍的词汇表(nationality_vocab)中,并最终返回一个新的SurnameVectorizer对象。
from_serializable
类方法:从可序列化内容中重构SurnameVectorizer对象。该方法从内容中恢复surname_vocab、nationality_vocab和max_surname_length,并创建一个新的SurnameVectorizer对象。
to_serializable
方法:返回一个可以被序列化的字典,表示SurnameVectorizer对象的属性和状态。此字典包含了surname_vocab、nationality_vocab和max_surname_length的可序列化表示。
- class SurnameVectorizer(object):
- """ 向量化器,协调词汇表并将其应用于数据 """
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- """
- 参数:
- surname_vocab (Vocabulary): 将字符映射到整数的词汇表
- nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
- max_surname_length (int): 姓氏的最大长度
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
- self._max_surname_length = max_surname_length
-
- def vectorize(self, surname):
- """
- 将姓氏向量化
- 参数:
- surname (str): 姓氏
- 返回:
- one_hot_matrix (np.ndarray): 一个独热向量矩阵
- """
-
- # 定义独热向量矩阵的大小并初始化独热向量矩阵
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- # 遍历姓氏的每个字符,并在独热编码矩阵中将对应位置的值设置为1
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """
- 从数据框实例化SurnameVectorizer对象。
- 参数:
- surname_df (pandas.DataFrame): 包含姓氏数据的数据框。
- 返回:
- SurnameVectorizer的一个实例。
- """
- # 初始化姓氏和国籍的词汇表以及最大姓氏长度
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- # 遍历数据框的每一行,更新最大姓氏长度和词汇表
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable(),
- 'max_surname_length': self._max_surname_length}
再定义一个SurnameDataset类,目的是创建一个数据集对象,对输入的数据进行分割和处理,并与向量化器一起使用,为后续的算法提供可用的数据集。包含以下方法:
__init__
方法:初始化SurnameDataset类对象,接受两个参数:surname_df(包含姓氏和国籍信息的DataFrame)和vectorizer(对姓氏进行向量化的实例)。在初始化过程中,它会基于DataFrame的'split'列进行数据拆分,并计算每个数据集(训练集、验证集和测试集)的大小。另外,根据国籍的数量计算了类权重(class weights)。
load_dataset_and_make_vectorizer
类方法:从姓氏的CSV文件中加载数据,并创建一个新的SurnameDataset对象。在此方法中,它会读取CSV文件并根据'split'列创建训练数据集的DataFrame,然后使用SurnameVectorizer的类方法from_dataframe创建一个向量化器,最终返回一个新的SurnameDataset对象。
load_dataset_and_load_vectorizer
类方法:从姓氏的CSV文件和向量化器的文件路径中加载数据和向量化器,并创建一个新的SurnameDataset对象。在这里,它会根据给定的CSV文件路径读取数据,并使用load_vectorizer_only方法加载向量化器,最终返回一个新的SurnameDataset对象。
load_vectorizer_only
静态方法:从向量化器的文件路径中加载向量化器实例。该方法打开向量化器的文件,加载其中的内容,并使用SurnameVectorizer的from_serializable方法创建一个向量化器实例。
save_vectorizer
方法:该方法将向量化器保存到文件中。它打开一个文件,将向量化器序列化为json格式,然后保存到文件中。
get_vectorizer
方法:能够获取当前实例的向量化器,直接返回向量化器实例。
set_split
方法:根据传入的参数选择数据集中的拆分部分,例如可以选择训练集、验证集或测试集。它设置目标拆分和对应的数据框以及大小。
__len__
方法:返回数据集的大小,即目标数据集的大小。
__getitem__
方法:根据索引获取数据集中的样本。它获取目标数据框中特定索引的行,然后使用向量化器将姓氏转换为矩阵,再查找国籍的索引。最后返回一个包含姓氏矩阵和国籍索引的字典。
get_num_batches
方法:接受一个批量大小,返回数据集中样本的批量数量。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- 一个用于处理姓氏数据集的PyTorch数据集类。
- Args:
- surname_df (pandas.DataFrame): 包含数据集的数据框。
- vectorizer (SurnameVectorizer): 从数据集实例化的向量化器。
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- # 根据数据集中的划分标签,将数据集分为训练集、验证集和测试集
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- # 将数据集和对应的大小存储在字典中,便于后续索引
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- # 设置默认使用训练集
- self.set_split('train')
-
- # 计算类别权重,用于处理数据几种的类别不平衡问题
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """
- 加载数据集并从头创建一个新的向量化器。
- Args:
- surname_csv (str): 数据集的文件路径。
- Returns:
- SurnameDataset的一个实例。
- """
- # 从CSV文件中读取数据集
- surname_df = pd.read_csv(surname_csv)
-
- # 从训练集数据框实例化向量化器
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """加载数据集和对应的向量化器。
- 用于在向量化器已经被缓存以便重复使用的情况下。
- Args:
- surname_csv (str): 数据集的位置
- vectorizer_filepath (str): 已保存的向量化器的位置
- Returns:
- SurnameDataset的一个实例
- """
- # 读取数据集
- surname_df = pd.read_csv(surname_csv)
- # 加载向量化器
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- # 返回SurnameData的一个实例
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """从文件中加载向量化器的静态方法。
- Args:
- vectorizer_filepath (str): 序列化向量化器的位置
- Returns:
- SurnameVectorizer的一个实例
- """
- with open(vectorizer_filepath) as fp:
- # 使用json加载序列化向量化器
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """使用json将向量化器保存到磁盘。
- Args:
- vectorizer_filepath (str): 保存向量化器的位置
- """
- with open(vectorizer_filepath, "w") as fp:
- # 将向量化器序列化为json并保存到文件中
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ 返回向量化器 """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """使用数据框中的列选择数据集中的拆分部分"""
- self._target_split = split # 设置目标拆分
- self._target_df, self._target_size = self._lookup_dict[split] # 获取目标数据框和大小
-
- def __len__(self):
- # 返回数据集的大小
- return self._target_size
-
- def __getitem__(self, index):
- """PyTorch数据集的主要入口方法
-
- Args:
- index (int): 数据点的索引
- Returns:
- 一个字典,包含数据点的特征(x_data)和标签(y_target)
- """
- # 获得指定索引处的数据行
- row = self._target_df.iloc[index]
-
- # 使用向量化器将姓氏转换为矩阵表示
- surname_matrix = \
- self._vectorizer.vectorize(row.surname)
-
- # 查找国籍的索引
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- # 返回包含姓氏矩阵和国籍索引的字典
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """给定批次大小,返回数据集中的批次数量
-
- Args:
- batch_size (int)
- Returns:
- 数据集中的批次数量
- """
- # 返回数据集大小除以批次大小得到的结果
- return len(self) // batch_size
注:在Python中,类方法(class method)、实例方法(instance method)和静态方法(static method)是有区别的。
实例方法:实例方法是针对类的实例进行操作的方法,它们可以访问和修改实例的属性,并且使用 self
参数来表示对实例的引用。
类方法:类方法是针对整个类进行操作的方法,它们使用 cls
参数来表示对类本身的引用。它们可以对类的属性进行操作,并且可以在没有创建类的实例的情况下被调用。
静态方法:静态方法与类或实例无关,它们不需要 self
或 cls
参数,并且通常在一个类的作用域中,但是与类和实例无关。静态方法通常用于在类的作用域中组织相关的功能,但与类的实例无关。
总结:Vocabulary类用于将文本转换为索引或索引转换为文本,确保模型训练和使用过程中对文本数据进行一致的处理。SurnameVectorizer类的作用在于创建一个数据集对象,对输入的数据进行分割和处理,并与向量化器一起使用。这种类的定义使得针对文本数据的处理变得更加便捷和可重用,从而提高了代码的可维护性和可扩展性。同时,SurnameDataset类允许创建数据集对象,对输入的数据进行分割和处理,并与向量化器一起使用。通过将数据处理的任务封装到特定的类中,我们可以更加灵活地处理数据,同时保持代码的组织结构,从而方便地应用于不同的NLP任务。
多层感知器(MLP)是一种前向结构的人工神经网络,它的核心作用是将一组输入向量映射到一组输出向量。其结构可以被看作是一个有向图,由多个节点层组成,每一层的节点都全连接到下一层的节点。除了输入节点,每个节点都是一个带有非线性激活函数的神经元。
多层感知器一般有很多个层次,可以看成是神经网络的基本形式。其中,输入层接收原始数据,隐藏层则对数据进行处理,而输出层则产生网络的最终输出。隐藏层中的每一个节点都相当于一个感知器,并且每个感知器都具有一定的参数。
多层感知器的具有以下的特点:
感知器推广:多层感知器克服了单层感知器只能学习线性可分数据的弱点,通过引入隐藏层,使其能够学习并处理线性不可分的数据。
激活函数:为了处理分类问题中阈值函数不可微的问题,多层感知器引入了激活函数。激活函数为每个节点提供了非线性特性,使得多层感知器能够逼近任何非线性函数。
反向传播算法:多层感知器使用反向传播算法进行训练。在训练过程中,网络首先通过前向传播计算输出,然后比较输出与真实值之间的差异,接着通过反向传播算法调整网络的权重和偏置,以减小这种差异。这个过程反复进行,直到网络达到一定的精度要求或训练轮次达到预设上限。
通用估计器:多层感知器可以被视为一个通用估计器,理论上能够估计任何非线性函数。特别是具有一个隐藏层的MLP,只要隐藏节点的个数足够多,就能够学习输入的任意非线性函数。
多层感知器具有较强的拟合能力,但也可能出现“过拟合”现象。为了应对这个问题,可以采用如dropout等技术,人为地关闭掉一些节点,以防止网络过度拟合训练数据。
首先定义多层感知机类,该类继承自 PyTorch 的 nn.Module 。MLP 是一个前馈神经网络,由多个全连接层组成,每一层后面跟着一个激活函数。
- # 定义多层感知机类
- class MultilayerPerceptron(nn.Module):
-
- def __init__(self, input_size, hidden_size=2, output_size=3,
- num_hidden_layers=1, hidden_activation=nn.Sigmoid):
- """初始化权重。
- Args:
- input_size (int): 输入的大小
- hidden_size (int): 隐藏层的大小
- output_size (int): 输出的大小
- num_hidden_layers (int): 隐藏层的数量
- hidden_activation (torch.nn.*): 激活函数类
- """
- super(MultilayerPerceptron, self).__init__()
- self.module_list = nn.ModuleList()
-
- interim_input_size = input_size
- interim_output_size = hidden_size
-
- # 创建隐藏层模块列表
- for _ in range(num_hidden_layers):
- self.module_list.append(nn.Linear(interim_input_size, interim_output_size))
- self.module_list.append(hidden_activation())
- interim_input_size = interim_output_size
-
- # 定义最后的全连接层
- self.fc_final = nn.Linear(interim_input_size, output_size)
-
- # 记录前向传播过程中的信息
- self.last_forward_cache = []
-
- def forward(self, x, apply_softmax=False):
- """MLP的前向传播
- Args:
- x_in (torch.Tensor): 输入数据张量。
- x_in.shape 应该是 (batch, input_dim)
- apply_softmax (bool): softmax 激活函数的标志
- 如果与交叉熵损失一起使用,应该为 false
- Returns:
- 结果张量。tensor.shape 应该是 (batch, output_dim)
- """
- self.last_forward_cache = []
- self.last_forward_cache.append(x.to("cpu").numpy())
-
- # 遍历模块列表并进行前向传播
- for module in self.module_list:
- x = module(x)
- self.last_forward_cache.append(x.to("cpu").data.numpy())
-
- # 最终输出层
- output = self.fc_final(x)
- self.last_forward_cache.append(output.to("cpu").data.numpy())
-
- # 当使用softmax作为激活函数时
- if apply_softmax:
- output = F.softmax(output, dim=1)
-
- return output
单层感知机的缺陷主要在于其只能解决线性可分问题,对于非线性可分问题则无法进行分类。而多层感知机则解决了这一问题。为了直观的看出多层感知机的工作原理,我们可以通过可视化输入层、隐藏层、输出层的数据来进行观察。
下面的代码可视化了初始数据集,将不同标签类型的数据用不同形状的点绘制。
- # 设置随机数种子以确保结果的可重复性
- seed = 24
-
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
- np.random.seed(seed)
-
- # 获取数据
- x_data, y_truth = get_toy_data(batch_size=1000)
-
- # 将数据转换为Numpy数组
- x_data = x_data.data.numpy()
- y_truth = y_truth.data.numpy().astype(np.int64)
-
- # 确定类别数量
- n_classes = len(set(LABELS))
-
- # 初始化用于存储每个类别数据和颜色的列表
- all_x = [[] for _ in range(n_classes)]
- all_colors = [[] for _ in range(n_classes)]
-
- # 定义绘图所需的颜色和标记
- colors = ['black', 'white']
- markers = ['o', '*']
-
- # 将数据按类别分组
- for x_i, y_true_i in zip(x_data, y_truth):
- all_x[y_true_i].append(x_i)
- all_colors[y_true_i].append(colors[y_true_i])
-
- # 将数据转换为Numpy数组
- all_x = [np.stack(x_list) for x_list in all_x]
-
- # 绘制散点图
- _, ax = plt.subplots(1, 1, figsize=(10,5))
-
- for x_list, color_list, marker in zip(all_x, all_colors, markers):
- ax.scatter(x_list[:, 0], x_list[:, 1], edgecolor='black', marker=marker, facecolor="white", s=100)
-
- # 美化图形
- plt.tight_layout()
- plt.axis('off')
-
- plt.title("");
-
- # 保存图像
- plt.savefig("images/data.png") # 保存为PNG格式的图像
- plt.savefig("images/data.pdf") # 保存为PDF格式的图像
其中,生成数据的函数和可视化模型预测结果的函数定义如下:
- # 生成数据集
- def get_toy_data(batch_size):
- # 确保中心点和标签数量相等
- assert len(CENTERS) == len(LABELS), 'centers should have equal number labels'
-
- x_data = []
- y_targets = np.zeros(batch_size)
- n_centers = len(CENTERS)
-
- for batch_i in range(batch_size):
- center_idx = np.random.randint(0, n_centers)
-
- # 生成符合正态分布的随机数据点
- x_data.append(np.random.normal(loc=CENTERS[center_idx]))
- y_targets[batch_i] = LABELS[center_idx]
-
- return torch.tensor(x_data, dtype=torch.float32), torch.tensor(y_targets, dtype=torch.int64)
- # 可视化模型预测结果
- def visualize_results(perceptron, x_data, y_truth, n_samples=1000, ax=None, epoch=None,
- title='', levels=[0.3, 0.4, 0.5], linestyles=['--', '-', '--']):
- # 获取模型预测结果
- _, y_pred = perceptron(x_data, apply_softmax=True).max(dim=1)
-
- # 将预测结果、输入数据和真实标签都转化为数组的形式
- y_pred = y_pred.data.numpy()
- x_data = x_data.data.numpy()
- y_truth = y_truth.data.numpy()
-
- # 获取类别数量
- n_classes = len(set(LABELS))
-
- # 初始化存储各类别数据点和颜色的列表
- all_x = [[] for _ in range(n_classes)]
- all_colors = [[] for _ in range(n_classes)]
-
- # 定义数据点颜色和标记类型
- colors = ['black', 'white']
- markers = ['o', '*']
-
- # 遍历输入数据、预测结果和真实标签
- for x_i, y_pred_i, y_true_i in zip(x_data, y_pred, y_truth):
- # 将数据点按照真实标签分组
- all_x[y_true_i].append(x_i)
-
- #根据预测结果确定数据点颜色
- if y_pred_i == y_true_i:
- all_colors[y_true_i].append("white")
- else:
- all_colors[y_true_i].append("black")
- #all_colors[y_true_i].append(colors[y_pred_i])
-
- # 将数据点列表转换为NumPy数组
- all_x = [np.stack(x_list) for x_list in all_x]
-
- # 如果未提供坐标轴,则创建一个新的
- if ax is None:
- _, ax = plt.subplots(1, 1, figsize=(10,10))
-
- # 绘制数据点
- for x_list, color_list, marker in zip(all_x, all_colors, markers):
- ax.scatter(x_list[:, 0], x_list[:, 1], edgecolor="black", marker=marker, facecolor=color_list, s=100)
-
- # 设置坐标轴范围
- xlim = (min([x_list[:,0].min() for x_list in all_x]),
- max([x_list[:,0].max() for x_list in all_x]))
-
- ylim = (min([x_list[:,1].min() for x_list in all_x]),
- max([x_list[:,1].max() for x_list in all_x]))
-
- # 绘制超平面
- xx = np.linspace(xlim[0], xlim[1], 30)
- yy = np.linspace(ylim[0], ylim[1], 30)
- YY, XX = np.meshgrid(yy, xx)
- xy = np.vstack([XX.ravel(), YY.ravel()]).T
-
- for i in range(n_classes):
- # 将数据xy输入神经网络perceptron中进行分类
- Z = perceptron(torch.tensor(xy, dtype=torch.float32),
- apply_softmax=True)
-
- # 提取出当前分类的结果
- Z = Z[:, i].data.numpy().reshape(XX.shape)
-
- # 用不同颜色的等高线将不同类别的区域画出来
- ax.contour(XX, YY, Z, colors=colors[i], levels=levels, linestyles=linestyles)
-
- # 添加标题
- plt.suptitle(title)
-
- # 如果epoch不为空,将当前epoch数目展示在图像上
- if epoch is not None:
- plt.text(xlim[0], ylim[1], "Epoch = {}".format(str(epoch)))
定义单层感知机,设置感知机参数,包括输入层、隐藏层、输出层大小,并打印感知机结构。可视化多层感知机的初始状态。
- # 定义神经网络的参数
- input_size = 2
- output_size = len(set(LABELS))
- num_hidden_layers = 0
- hidden_size = 2 # 虽然未使用但仍然设置了值
-
- # 设置随机数种子以确保结果的可重复性
- seed = 2
-
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
- np.random.seed(seed)
-
- # 初始化多层感知机(定义了输入输出神经元个数,隐藏层个数)
- mlp1 = MultilayerPerceptron(input_size=input_size,
- hidden_size=hidden_size,
- num_hidden_layers=num_hidden_layers,
- output_size=output_size)
-
- # 打印多层感知机的结构
- print(mlp1)
-
- # 设置模型训练的批大小
- batch_size = 1000
-
- # 获取静态数据用于可视化
- x_data_static, y_truth_static = get_toy_data(batch_size)
- fig, ax = plt.subplots(1, 1, figsize=(10,5))
-
- # 可视化多层感知机的初始状态
- visualize_results(mlp1, x_data_static, y_truth_static,
- ax=ax, title='Initial Perceptron State', levels=[0.5])
-
- plt.axis('off')
-
- # 保存为PNG格式的图像
- plt.savefig('images/perceptron_initial.png')
这段代码首先定义了用于构建单层感知机的相关参数,包括输入层的大小、输出层的大小(由唯一标签数量决定)、隐藏层的数量和大小。这些参数为后续初始化感知机提供了必要的配置。
接着,代码通过设置随机数种子确保了PyTorch、CUDA和NumPy在生成随机数时的可重复性,这有助于在多次运行代码时获得一致的结果,便于实验和调试。
随后,利用定义好的参数初始化了一个两层感知机,其中MultilayerPerceptron
是在2.2节中定义的神经网络类。初始化完成后,通过打印语句输出了感知机的结构信息,以便了解模型的详细配置。
然后,代码设置了模型训练时使用的批大小用于后续的训练过程。
最后,代码通过调用get_toy_data
函数获取了一批用于可视化的数据,并利用visualize_results
函数绘制了多层感知机在初始化状态下的决策边界。这个可视化过程有助于直观地理解感知机在训练前的初始状态,以及它如何对输入数据进行分类。绘制完成后,代码关闭了坐标轴的显示,并将图像保存为PNG格式的文件,方便后续查看和分析。
接着,选择优化器、损失函数,定义学习率,定义用于判断是否提前停止训练的函数,然后对模型进行训练,并在每一轮迭代后使用visualize_results函数可视化当前训练结果。
- # 初始化损失列表
- losses = []
-
- # 每个批次的样本数量
- batch_size = 10000
-
- # 总共批次数量
- n_batches = 10
-
- # 最大迭代次数
- max_epochs = 10
-
- # 损失变化
- loss_change = 1.0
-
- # 上一次损失
- last_loss = 10.0
-
- # 损失变化阈值
- change_threshold = 1e-3
-
- # 当前迭代次数
- epoch = 0
-
- # 所有图像文件的列表
- all_imagefiles = []
-
- lr = 0.01 # 学习率
-
- # 使用Adam优化器和交叉熵损失函数
- optimizer = optim.Adam(params=mlp1.parameters(), lr=lr)
- cross_ent_loss = nn.CrossEntropyLoss()
-
- # 判断是否提前终止训练的函数
- def early_termination(loss_change, change_threshold, epoch, max_epochs):
- # 根据损失变化和最大迭代次数判断是否提前终止
- terminate_for_loss_change = loss_change < change_threshold
- terminate_for_epochs = epoch > max_epochs
-
- # 如果满足任一条件,则提前终止训练
- return terminate_for_epochs
-
- # 当尚未满足提前终止条件时,进行训练
- while not early_termination(loss_change, change_threshold, epoch, max_epochs):
- for _ in range(n_batches):
- # 步骤0:获取数据
- x_data, y_target = get_toy_data(batch_size)
-
- # 步骤1:梯度清零
- mlp1.zero_grad()
-
- # 步骤2:进行前向传播
- y_pred = mlp1(x_data).squeeze()
-
- # 步骤3:计算损失
- loss = cross_ent_loss(y_pred, y_target.long())
-
- # 步骤4:进行反向传播
- loss.backward()
-
- # 步骤5:优化器更新参数
- optimizer.step()
-
- # 辅助步骤:记录损失值
- loss_value = loss.item()
- losses.append(loss_value)
- loss_change = abs(last_loss - loss_value)
- last_loss = loss_value
-
- # 可视化当前训练结果
- fig, ax = plt.subplots(1, 1, figsize=(10,5))
- visualize_results(mlp1, x_data_static, y_truth_static, ax=ax, epoch=epoch,
- title=f"{loss_value:0.2f}; {loss_change:0.4f}")
- plt.axis('off')
- epoch += 1 # 迭代次数加一
-
- # 将图像保存为文件
- all_imagefiles.append(f'images/perceptron_epoch{epoch}_toylearning.png')
- plt.savefig(all_imagefiles[-1])
从结果中可以看出,单层感知机只能绘制一条直线对数据点进行分类,而不能解决异或问题,泛化性能较差。
对于两层感知机的初始化和训练过程与2.3.2节中类似,只需要将隐藏层数量设置为1即可。因此这里不再做代码的展示,仅展示结果。
通过与单层感知机最终状态的对比,不难发现两层感知机的性能明显优于单层感知机。两层感知机由于隐层的存在可以解决异或问题。
与2.3.3节相同,三层感知器初始化和训练的过程代码不做展示。定义三层感知器结构如下:
为了寻找多层感知器的分类结果优于单层感知器的原因,我们可以可视化感知器的中间表示。
定义函数来绘制多层感知机模型在处理输入数据时的中间表示。定义了一个名为plot_intermediate_representations
的函数,它接受三个参数:mlp_model
(多层感知机模型)、plot_title
(图的标题)和figsize
(图的大小)。
- # 定义函数以绘制中间表示
- def plot_intermediate_representations(mlp_model, plot_title, figsize=(10,2)):
- # 获取数据
- x_data, y_target = get_toy_data(batch_size)
-
- # 使用多层感知机模型进行预测
- y_pred = mlp_model(x_data, True).detach().numpy()
-
- # 将数据转换为NumPy数组
- x_data = x_data.numpy()
- y_target = y_target.numpy()
-
- # 定义颜色和标记
- colors = ['black', 'white']
- markers = ['o', '*']
-
- # 初始化绘图标记列表
- plot_markers = []
- class_zero_indices = []
- class_one_indices = []
-
- # 根据目标类别将数据索引分组
- for i in range(y_target.shape[0]):
- if y_target[i] == 0:
- class_zero_indices.append(i)
- else:
- class_one_indices.append(i)
- class_zero_indices = np.array(class_zero_indices)
- class_one_indices = np.array(class_one_indices)
- # plot_markers.append(markers[y_target[i]])
-
- # 创建子图
- fig, axes = plt.subplots(1, len(mlp_model.last_forward_cache), figsize=figsize)
-
- # 遍历类别和数据索引进行绘制
- for class_index, data_indices in enumerate([class_zero_indices, class_one_indices]):
- # 绘制输入数据点
- axes[0].scatter(x_data[data_indices,0], x_data[data_indices,1], edgecolor='black', facecolor="white",
- marker=markers[class_index], s=[200,400][class_index])
- axes[0].axis('off')
-
- # 遍历中间表示并绘制
- for i, activations in enumerate(mlp_model.last_forward_cache[1:], 1):
- axes[i].scatter(activations[data_indices,0], activations[data_indices,1], edgecolor='black', facecolor="white",
- marker=markers[class_index], s=[200,400][class_index])
- axes[i].axis('off')
-
- # 调整布局
- plt.tight_layout()
-
- # 添加总标题
- plt.suptitle(plot_title, size=15)
- plt.subplots_adjust(top=0.75)
在函数内部,首先通过get_toy_data
函数获取了输入数据x_data
和目标标签y_target
,这个数据是一个用于演示感知器分类效果的数据集。
接着,使用mlp_model
对输入数据进行预测,并将预测结果y_pred
转换为NumPy数组。同时,也将输入数据x_data
和目标标签y_target
转换为NumPy数组,以便后续处理。
然后,定义了两种颜色和两种标记,这些将用于在图中区分不同类别的数据点。
接下来,根据目标标签y_target
,将数据点的索引分为两组:一组属于类别0,另一组属于类别1。这是通过遍历y_target
数组并将索引分别添加到class_zero_indices
和class_one_indices
两个列表中实现的。
之后,使用plt.subplots
创建了一个子图数组axes
,子图的数量与mlp_model.last_forward_cache
的长度相同。
接着,开始绘制数据点。对于每个类别(类别0和类别1),在第一个子图(即输入数据的表示)上绘制对应类别的数据点,使用不同的标记和大小来区分不同类别的点;再遍历mlp_model.last_forward_cache
中的每一层激活值(中间表示),并在对应的子图上绘制属于当前类别的数据点的激活值。这样,可以直观地看到数据点在模型的不同层次上的表示。
最后,在所有子图绘制完成后,使用plt.tight_layout
调整子图之间的布局,确保它们不会重叠,使用函数添加标题并调整图之间的距离。
下面代码用于绘制三层感知器每一层的数据点。修改plot_intermediate_representations
函数中的参数,能得到不同层数的感知器的输入和中间表示。
- # 调用函数绘制三层感知机的输入和中间表示并保存图像
- plot_intermediate_representations(mlp3,
- "The 3-layer Multilayer Perceptron's Input and Intermediate Representation",
- figsize=(13, 3))
- plt.savefig("images/mlp3_intermediate.png")
- plt.savefig("images/mlp3_intermediate.pdf")
通过以上运行结果,可以发现:单层感知器由于其线性特性,只能处理线性可分问题。这意味着它只能识别输入数据中的线性模式,而无法捕捉到非线性关系。因此,在面对如异或问题等非线性问题时,单层感知器的表现会受限,无法有效地进行分类或决策。
两层感知器通过引入一个隐藏层(中间层),增加了网络的非线性处理能力。这使得两层感知器能够识别并处理输入数据中的非线性关系。通过可视化其输入和中间表示,可以观察到数据在隐藏层中的转换过程,以及最终如何被映射到输出层。这种转换过程是非线性的,使得两层感知器能够解决单层感知器无法处理的问题,如异或问题。
最后,三层感知器具有更复杂的网络结构,包括更多的隐藏层。这使得它能够捕获更高阶的非线性关系,并在输入空间中形成更复杂的决策边界。通过可视化其输入和中间表示,可以进一步观察到数据在多个隐藏层中的转换和表示过程。这种深度结构使得三层感知器在处理复杂问题和大型数据集时具有更强的泛化能力和更高的准确性。
这里我们将MLP应用于将姓氏分类到其原籍国的任务。在第一节中,已经完成了根据国籍对数据集分组的任务,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集。同时也完成了对Vocabulary类、SurnameVectorizer类和SurnameDataset类的定义。
接下来,定义一个名为SurnameClassifier
的神经网络模型类,用于姓氏分类任务。这个模型是一个简单的多层感知机,通过前馈传播对输入数据进行分类。思路和具体代码如下:
初始化模型结构:在__init__
方法中,定义了两个线性层(self.fc1
和self.fc2
),分别用于将输入数据映射到隐藏层,以及将隐藏层映射到输出层。
前向传播:在forward
方法中,定义了模型的前向传播过程。输入数据x_in
首先通过第一个线性层self.fc1
,并应用ReLU激活函数得到中间向量。接着,中间向量通过第二个线性层self.fc2
得到最终的预测向量。
可选的Softmax激活:如果apply_softmax
参数为True,则在返回预测向量之前,会对其应用softmax激活函数,将输出转换为概率分布。这通常在模型输出层不使用softmax作为激活函数,而在计算损失函数(如交叉熵损失)时需要概率分布时使用。
- class SurnameClassifier(nn.Module):
- """ 一个能完成姓氏分类的两层感知机 """
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): 输入向量的大小
- hidden_dim (int): 第一个线性层的输出大小
- output_dim (int): 第二个线性层的输出大小
- """
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """分类器的前向传播
-
- Args:
- x_in (torch.Tensor): 输入数据张量.
- x_in 的形状应该是 (batch, input_dim)
- apply_softmax (bool): 是否使用softmax激活函数的标志
- 使用交叉熵损失时应设置为FALSE
- Returns:
- 结果张量。张量的形状应该为 (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
这个模型是一个通用的神经网络结构,可以用于任何具有固定输入维度和输出类别的分类任务。通过调整隐藏层的大小(hidden_dim
),可以改变模型的复杂度,以适应不同的数据集和任务需求。
然后,设置并初始化多层感知机模型的训练过程,包括加载模型和数据集到指定设备、定义损失函数和优化器、创建训练进度条、设置数据集为训练集和验证集,以及进行训练循环。在训练循环中,代码迭代每个批次的数据,执行模型的前向传播、计算损失、反向传播和参数更新,以优化模型性能。通过监控训练过程中的损失和准确率,可以评估模型的训练效果,并在必要时调整学习率等超参数以加速收敛或提高性能。
- # 将分类器模型和数据集的类权重加载到指定设备
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 定义损失函数为交叉熵损失,同时考虑类权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 定义优化器为Adam,并传入分类器模型的参数和学习率
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
-
- # 定义学习率调度器,当监测的指标不再改善时,将学习率按给定因子减小
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- # 创建训练状态字典
- train_state = make_train_state(args)
-
- # 创建训练条用于显示训练过程
- epoch_bar = tqdm(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- # 设置数据集为训练集
- dataset.set_split('train')
-
- # 创建训练集进度条
- train_bar = tqdm(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- # 设置数据集为验证集
- dataset.set_split('val')
-
- # 创建验证集进度条
- val_bar = tqdm(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # 迭代训练数据集
-
- # 设置:批处理生成器,将损失和准确率设为0,设置为训练模式
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 训练过程包括以下5个步骤:
-
- # --------------------------------------
- # step 1. 清零梯度
- optimizer.zero_grad()
-
- # step 2. 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. 使用损失计算梯度
- loss.backward()
-
- # step 5. 使用优化器执行梯度更新
- optimizer.step()
- # -----------------------------------------
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 更新训练进度条
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # 迭代验证数据集
-
- # 设置:批处理生成器,将损失和准确率设为0,设置为评估模式
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # 计算输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失值
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 更新验证进度条
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- # 更新训练状态
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- # 更新优化器学习率
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
下面,使用最佳可用的模型在测试集上进行评估,计算模型在测试集上的损失和准确率,并将这些指标保存到训练状态字典中,以便后续分析和比较。通过评估模型在测试集上的性能,可以了解模型的泛化能力,即模型对未见过的数据的预测能力。
- # 使用最佳可用模型计算损失和准确率
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- # 将分类器和数据集的类别权重移动到指定的设备上
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- # 定义损失函数为交叉熵损失函数,考虑数据集的类别权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- # 设置数据集为测试集
- dataset.set_split('test')
-
- # 定义生成批次数据的生成器
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
-
- # 初始化运行损失和准确率
- running_loss = 0.
- running_acc = 0.
-
- # 将分类器设置为评估模式
- classifier.eval()
-
- # 迭代批次生成器中的每个批次
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算模型输出
- y_pred = classifier(batch_dict['x_surname'])
-
- # 计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- # 更新运行损失
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # 计算准确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- # 更新运行准确率
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # 保存测试集的损失和准确率到训练状态中
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
-
- # 输出测试集的损失和准确率
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
在完成对模型的训练以及模型指标计算后,使用模型实现对新姓氏国籍的预测。下面的代码表示:在用户输入一个新的姓氏后,使用训练好的分类器和相应的向量化器来预测该姓氏最可能的国籍及其概率。
- def predict_nationality(surname, classifier, vectorizer):
- """预测一个新姓氏的国籍
-
- Args:
- surname (str): 要分类的姓氏
- classifier (SurnameClassifer): 分类器的实例
- vectorizer (SurnameVectorizer): 相应的向量化器
- Returns:
- 一个字典,包含最可能的国籍及其概率
- """
- # 对姓氏进行向量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 增加维度以符合模型的输入要求
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- # 使用分类器进行预测
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 获取最大概率值和对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- # 获取预测的国籍和概率值
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
-
-
- # 输入一个新的姓氏
- new_surname = input("Enter a surname to classify: ")
-
- # 使用分类器进行预测并输出预测结果
- classifier = classifier.to("cpu")
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
除了可以预测新姓氏的国籍,模型也可以输出新姓氏最可能的前几个国籍和概率。
2.4节中的代码只展示了应用多层感知机的主要框架,但是其中的一些函数没有定义。现补充如下:
- def make_train_state(args):
- # 创建训练状态字典并返回
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- def update_train_state(args, model, train_state):
- """
- 处理训练状态更新。
- Components:
- - 提前停止: 防止过拟合。
- - 模型检查点: 如果模型更好,则保存模型。
- :param args: 主要参数
- :param model: 待训练模型
- :param train_state: 表示训练状态值的字典
- :returns:
- 一个新的train_state
- """
-
- # 至少保存一个模型
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # 如果模型性能提高则保存模型
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # 如果损失变大
- if loss_t >= train_state['early_stopping_best_val']:
- # 更新步骤
- train_state['early_stopping_step'] += 1
- # 如果损失减少
- else:
- # 保存最佳模型
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # 重置提前停止步骤
- train_state['early_stopping_step'] = 0
-
- # 是否提前停止 ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- # 计算预测的准确率
- def compute_accuracy(y_pred, y_target):
- # 将预测分数最大的索引作为预测的输出
- _, y_pred_indices = y_pred.max(dim=1)
-
- # 统计预测值与真实值相等的样本个数
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
- # 设置随机种子
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- # 判断一个文件夹是否存在,不存在则自动创建
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
设置模型参数及模型初始化:
- args = Namespace(
- # 数据和路径信息
- surname_csv="surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- # 模型超参数设置
- hidden_dim=300,
- # 训练超参数设置
- seed=1337,
- num_epochs=100,
- early_stopping_criteria=5,
- learning_rate=0.001,
- batch_size=64,
- # 运行选择
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- )
-
- if args.expand_filepaths_to_save_dir:
- # 连接向量器的路径名
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- # 连接模型状态的路径名
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 查看CUDA是否可用
- if not torch.cuda.is_available():
- args.cuda = False
-
- # 使用合适的设备
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
-
- # 为结果的可再现性设置种子
- set_seed_everywhere(args.seed, args.cuda)
-
- # 处理存储路径
- handle_dirs(args.save_dir)
-
- if args.reload_from_files:
- # 从一个检查点开始训练
- print("Reloading!")
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # 创建一个数据集和向量器
- print("Creating fresh!")
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- # 获取数据集的向量化器
- vectorizer = dataset.get_vectorizer()
-
- # 初始化姓氏分类器模型
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
卷积神经网络(CNN)是一种特殊的神经网络结构,其特点在于能够自动提取输入数据的特征,并有效处理具有网格结构的数据。CNN通过卷积层、池化层和全连接层等组件的有机结合,实现了层次化的特征提取和抽象。其中,卷积层通过多个卷积核对输入数据进行局部感知和参数共享,有效降低了网络复杂度并提高了特征提取的鲁棒性;池化层则通过下采样操作减少了数据维度,进一步增强了特征的平移不变性。
构建SurnameClassifier
模型,该模型通过卷积神经网络提取输入数据的特征,并使用全连接层进行分类。它可以接收一定格式的输入数据张量,并输出每个类别的预测概率向量。
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- 初始化函数
-
- Args:
- initial_num_channels (int): 输入特征向量的大小
- num_classes (int): 输出预测向量的大小
- num_channels (int): 网络中使用的常数通道大小
- """
- super(SurnameClassifier, self).__init__()
-
- # 定义卷积神经网络的结构
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3), # 第一个卷积层
- nn.ELU(), # 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2), # 第二个卷积层
- nn.ELU(), # 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2), # 第三个卷积层
- nn.ELU(), # 激活函数
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3), # 第四个卷积层
- nn.ELU() # 激活函数
- )
-
- # 定义全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
-
- Args:
- x_surname (torch.Tensor): 输入数据张量。
- x_surname.shape 应为 (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): softmax 激活的标志
- 如果与交叉熵损失一起使用,应为 false
- Returns:
- 结果张量。tensor.shape 应为 (batch, num_classes)
- """
- # 卷积层后的特征提取
- features = self.convnet(x_surname).squeeze(dim=2)
-
- # 使用全连接层进行分类
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- # 如果应用softmax激活函数
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
训练过程与训练所需的函数与2.4节和2.5节展示的类似。只有使用的模型不同。下面只展示运行结果:
从结果中可以看出,卷积神经网络模型的性能优于多层感知机。
批处理标准化是设计网络时经常使用的一种工具。BatchNorm对CNN的输出进行转换,方法是将激活量缩放为零均值和单位方差。它用于Z-transform的平均值和方差值每批更新一次,这样任何单个批中的波动都不会太大地移动或影响它。BatchNorm允许模型对参数的初始化不那么敏感,并且简化了学习速率的调整。
在SurnameClassifier模型中加入批处理规范化层。
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- 初始化函数
-
- Args:
- initial_num_channels (int): 输入特征向量的大小
- num_classes (int): 输出预测向量的大小
- num_channels (int): 网络中使用的常数通道大小
- """
- super(SurnameClassifier, self).__init__()
-
- # 定义卷积神经网络的结构
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels, out_channels=num_channels, kernel_size=3), # 第一个卷积层
- nn.BatchNorm1d(num_channels), # 批标准化层
- nn.ELU(), # 激活函数
-
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels, kernel_size=3, stride=2), # 第二个卷积层
- nn.BatchNorm1d(num_channels), # 批标准化层
- nn.ELU(), # 激活函数
-
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels, kernel_size=3, stride=2), # 第三个卷积层
- nn.BatchNorm1d(num_channels), # 批标准化层
- nn.ELU(), # 激活函数
-
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels, kernel_size=3), # 第四个卷积层
- nn.BatchNorm1d(num_channels), # 批标准化层
- nn.ELU() # 激活函数
- )
-
- # 定义全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """分类器的前向传播
-
- Args:
- x_surname (torch.Tensor): 输入数据张量。
- x_surname.shape 应为 (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): softmax 激活的标志
- 如果与交叉熵损失一起使用,应为 false
- Returns:
- 结果张量。tensor.shape 应为 (batch, num_classes)
- """
- # 卷积层后的特征提取
- features = self.convnet(x_surname).squeeze(dim=2)
-
- # 使用全连接层进行分类
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- # 如果应用softmax激活函数
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
从结果可以看出,使用CNN作为模型进行分类训练时,加入批规范化后模型在测试集上的性能提升。这是因为批规范化能够解决内部变量偏移问题,使每一层的输出都具有适当的尺度,从而加速模型的训练过程并提升稳定性。具体来说,批规范化通过标准化每个mini-batch的激活值,使得它们的均值接近0,方差接近1,这有助于模型更好地学习数据的特征表示。此外,批规范化还通过引入可学习的缩放和偏移参数,使得模型有能力还原原始的输入分布,从而保持模型的容量和灵活性。
在CNN中,卷积层通常会产生大量的参数,这些参数的分布可能会随着训练的进行而发生变化,导致模型训练的不稳定。批规范化的引入可以稳定这些参数的分布,使得模型在训练过程中更加鲁棒,更容易找到最优解。这也有助于减少梯度消失或爆炸问题,从而进一步提高模型的性能。但是会增加计算开销,使训练速度降低。
https://pytorch-cn.readthedocs.io/zh/latest/package_references/functional/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。