赞
踩
引入周志华老师在西瓜书中的定义:“感知机由两层神经元组成,输入层接收外界输入信号后传递给输出层,输出层是M-P神经元,也称为阈值逻辑单元(threshold logic unit)”。
感知机(Perceptron)是一个简单的二分类的单层网络模型,是由美国学者FrankRosenblatt在1957年提出来的。其神经元模型如下图所示:
(x1,...,xn)是输入信号,y是输出信号,( w1,...wn)是权重。图中的○称为“神经元”或者“节点”。
输入信号被送往神经元时,会被分别乘以固定的权重( w1,...wn)。神经元会计算传送过来的信号的总和,只有当这个总和超过了阈值θ时,才会输出1。f(x)被称为激活函数 。
单层感知机能够经过简单地学习实现输入值得“与”、“或”和“非”运算。但是,这种单层的感知机有一个致命的问题,本质上是一个二分类模型的它只能分割线性空间。如果样本按照XOR(异或)函数的样子进行分布,单层感知机是无法实现分类的。(假设样本有二维的特征,那么下图中的样本分布,感知机是无法进行分类的。无论怎么分割都无法用一条线将五角星和圆形分开)
多层感知机(Multilayer Perceptron,简称MLP),是一种基于前馈神经网络(Feedforward Neural Network)的深度学习模型,由一个或多个神经元层组成,其中每个神经元层与前一层全连接。多层感知机可以用于解决分类、回归和聚类等各种机器学习问题。
多层感知机(MLP)的出现,则成功解决了单层感知机不能拟合XOR函数的问题。一个MLP简单示例如下,含有两个隐藏层,经过三次线性映射。
多层感知机的工作过程借助上图中的示例很容易理解:
输入层:多层感知机的输入层接受来自数据集的特征向量作为输入。
隐藏层:输入层的数据经过线性变换(通过权重矩阵和偏置向量)的洗礼,到达隐藏层然后应用非线性激活函数。隐藏层的输出成为下一层(可以是另一个隐藏层或输出层)的输入。
输出层:输出层接收最后一个隐藏层的输出,并应用线性变换,产生最终的输出结果。
数据从输入层传播到输出层的过程称为前向传播。在前向传播过程中,每一层的输出作为下一层的输入,并经过相应的线性变换和激活函数处理。
那么,了解了MLP的工作过程,可以知道解决异或的非线性可分问题其实很简单,只需要两层感知机就能解决,如下图所示:
单层感知机VS多层感知机:
1.多层感知机在输入层与输出层之间多了隐藏层;
2.每层神经元与下一层神经元全互连;
3.隐藏层也有激活功能的神经元。
Que1:为什么引入隐藏层?
增加模型表达能力:多层感知机通过引入一个或多个隐藏层,使得模型能够学习和表示更复杂的非线性函数关系。如果没有隐藏层,模型只能进行线性分类或回归,无法有效地处理复杂的输入数据分布。
Que2: 隐层中的激活函数是有必要的吗?
毋庸置疑的,激活函数在隐藏层中是必不可少的。因为神经网络中每一层的输入输出都是一个线性求和的过程,下一层的输出只是承接了输入的线性变换,所以如果没有激活函数,那么无论你构造的神经网络多么复杂,有多少层,最后的输出都是输入的线性组合。而在我们生活中,大多数实际问题中的数据分布和关系都是非线性的,纯粹的线性组合并不能够捕捉和表达这些复杂的关系。
那么,最重要的一点,激活函数用来引入非线性特性:隐藏层的线性变换后跟非线性激活函数的组合,使得多层感知机能够学习复杂的非线性模式和特征(有了“万有逼近性”,神经网络可以任意逼近任何非线性函数),这样神经网络就可以利用到更多的非线性模型中。
同时,神经网络中经典的问题就是梯度弥散和梯度爆炸的问题。在一点上,激活函数在训练神经网络中也起到了重要作用,它们能够帮助梯度在网络中传播,避免梯度传播时出现的问题。
常见的激活函数:
1.Sigmoid函数:将输入压缩到(0, 1)之间,常用于二分类问题的输出层。
2.tanh函数:将输入压缩到(-1, 1)之间,比sigmoid函数具有更强的非线性特性。
3.ReLU函数:非常简单且效果良好,有助于解决传统sigmoid和tanh激活函数的梯度消失问题。
一个二元分类任务中分别训练感知器和MLP,应用上文提及的线性不可分样本:星和圆,将每一层输出都进行可视化。
感知机输出表示如下:只具有线性映射的它无法成功分割星和圆。
MLP输出表示如下:左边是第一次线性映射输出,中间是经过隐藏层的非线性输出,右边则是再一次线性映射输出。第一个线性模块的输出将圆和星分组,而第二个线性模块的输出将数据点重新组织为线性可分的。
在这里我们介绍PyTorch中的一个实现。定义了一个简单的多层感知机模型 MultilayerPerceptron,适用于分类任务。它具有两个线性层,可以处理具有 input_dim
大小的输入数据,并生成 output_dim
大小的输出。
MultilayerPerceptron
类定义了一个包含两个线性层的神经网络模型。
初始化方法__init__
:第一个线性层()将输入数据的维度从映射到,并使用ReLU激活函数;第二个线性层()接收来自第一个线性层的输出,将其映射到self.fc1
input_dim
hidden_dim
self.fc2
output_dim;
forward
方法:定义了数据在模型中的前向传播流程。
- import torch.nn as nn
- import torch.nn.functional as F
-
- '''继承自nn.Module的Python类,表示了一个多层感知机神经网络模型'''
- class MultilayerPerceptron(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- #调用了父类nn.Module的构造函数,确保正确地初始化了继承自nn.Module的部分
- super(MultilayerPerceptron, self).__init__()
- #定义了第一个线性层,输入维度为input_dim,输出维度为hidden_dim,并将其存储在fc1属性中
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- #定义了第二个线性层,输入维度为hidden_dim,输出维度为output_dim,并将其存储在fc2属性中
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the MLP
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- #首先将输入张量通过第一个线性层和ReLU激活函数传递
- intermediate = F.relu(self.fc1(x_in))
- output = self.fc2(intermediate)
-
- #如果apply_softmax为True,则对输出应用softmax激活函数;
- #否则,直接返回未经处理的输出
- if apply_softmax:
- output = F.softmax(output, dim=1)
- return output
我们实例化了MLP,使用大小为3的输入维度、大小为4的输出维度和大小为100的隐藏维度。
- batch_size = 2 # number of samples input at once
- input_dim = 3 # 大小为3的输入维度
- hidden_dim = 100 # 大小为100的隐藏维度
- output_dim = 4 # 大小为4的输出维度
-
- # 初始化模型
- mlp = MultilayerPerceptron(input_dim, hidden_dim, output_dim)
- print(mlp)
可以看到我们所创建的多层感知机:
在这里我们做一个有用的完整性检查。通过传递一些随机输入来快速测试模型的“连接”,如下所示。因为模型还没有经过训练,所以输出结果是随机的。
- import torch
- #定义函数describe,用于打印张量的类型、形状和数值
- def describe(x):
- print("Type: {}".format(x.type()))
- print("Shape/size: {}".format(x.shape))
- print("Values: \n{}".format(x))
-
- #创建了一个随机张量x_input,其形状为(batch_size, input_dim)
- #其中batch_size=2,input_dim=3
- x_input = torch.rand(batch_size, input_dim)
- describe(x_input)#调用函数describe
输出结果:
Softmax函数
softmax有许多源头。在物理学中,它被称为玻尔兹曼或吉布斯分布;在统计学中,它是多项式逻辑回归;在自然语言处理(NLP)社区,它是最大熵(MaxEnt)分类器。在这里,Softmax 函数作为一种常用的激活函数,通常用于多类别分类问题中。
Softmax函数的作用是将原始的线性得分转换为概率分布,使得模型能够对不同类别进行概率预测。在多类别分类问题中,Softmax 函数可以帮助模型选择概率最大的类别作为预测结果。它的原理是将一个向量的元素转化为概率分布,使得每个元素的取值范围在0到1之间,并且所有元素的和为1。
在这里,我们使用上文的MLP模型对输入数据进行前向传播,其中apply_softmax=False表示不对输出应用softmax激活函数。
- y_output = mlp(x_input, apply_softmax=False)
- describe(y_output)
输出结果:有正有负,且和不为1。
将apply_softmax设置为True,则表示对输出应用softmax激活函数。
输出结果:转换为概率,全为正数且和为1。
姓氏分类是指通过机器学习方法将来自不同文化和语言背景的姓氏进行分类或预测其所属的文化或语言类别。在本实验中,我们将MLP应用于将姓氏分类到其原籍国的任务。
数据集格式:
列名: nationality,split,surname
对应值:Arabic,train,Totah
French,val,Albert
French,test,Augustin
特点:
观察数据集,我们可以发现国籍和姓氏之间存在有效和直观的关系。这表明姓氏在很大程度上反映了其所属国家或语言文化的特征。这也反映出在姓氏数据集上进行分类任务的可行性。
实验需要的库如下所示:
- #导入必要的库
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
-
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
为了使用字符对姓氏进行分类,我们使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。
我们根据国籍对姓氏数据集Dataset进行分组(等比例采样),并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
SurnameDataset类:
self._vectorizer.to_serializable()
是 SurnameVectorizer
类的一个方法,它将向量化器对象序列化为一个可以保存为 JSON 的字典格式。
json.dump()
函数用于将字典数据写入文件 vectorizer_filepath
中。这样,一旦调用了 save_vectorizer
方法,当前 SurnameDataset
实例所使用的向量化器就会被保存到指定的文件中,以便之后可以通过 load_vectorizer_only
或者 load_dataset_and_load_vectorizer
方法重新加载使用。generate_batches
函数的作用是利用 PyTorch 的 DataLoader
功能,生成可以直接在训练循环中使用的批次数据。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- 参数:
- surname_df (pandas.DataFrame):数据集
- 向量化器(SurnameVectorizer):从数据集实例化的向量化器
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """加载数据集并从头开始创建一个新的向量化器
-
- 参数:
- surname_csv (str):数据集的位置
- 返回:
- SurnameDataset 的一个实例
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """加载数据集和相应的向量化器。
- 在向量化器中使用的情况已被缓存以供重复使用
-
- 参数:
- surname_csv (str):数据集的位置
- vectorizer_filepath (str):保存的向量化器的位置
- 返回:
- SurnameDataset 的一个实例
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """从文件加载向量化器的静态方法
-
- 参数:
- vectorizer_filepath (str):序列化向量化器的位置
- 返回:
- SurnameVectorizer 的一个实例
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """使用 json 将向量化器保存到磁盘
-
- 参数:
- vectorizer_filepath (str):保存向量化器的位置
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ 返回向量化器 """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ 使用dataframe中的列选择数据集中的拆分 """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """ PyTorch数据集的主要入口点
-
- 参数:
- index(int):数据点的索引
- 返回:
- 包含数据点的:
- features (x_surname)
- label (y_nationality)
- """
- # 获取数据集中给定索引处的行数据
- row = self._target_df.iloc[index]
- # 利用 SurnameVectorizer 将姓氏转换为向量表示
- surname_vector = \
- self._vectorizer.vectorize(row.surname)
- # 使用 SurnameVectorizer 中的 nationality_vocab 属性,将国籍转换为对应的索引
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- # 返回一个字典,包含姓氏向量表示和国籍索引
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """给定批大小,返回数据集中的批次数
-
- 参数:
- 批大小 (int)
- 返回:
- 数据集中的批数
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- PyTorch DataLoader的生成器函数。它将确保每个张量都在写入设备的位置上。
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict
这里我们定义了一个 SurnameVectorizer
类,主要用于将姓氏数据转换为MLP可以处理的向量化表示。它通过构建姓氏和国籍的词汇表,能够将姓氏字符串转换为收缩的one-hot编码,方便后续的分类。
__init__
方法:初始化 SurnameVectorizer
类的实例,存储姓氏、国籍的词汇表。
- class SurnameVectorizer(object):
- """ 定义了初始化方法,用于初始化类的实例属性"""
- def __init__(self, surname_vocab, nationality_vocab):
- # 初始化 surname_vocab 属性,用于存储姓氏的词汇表
- self.surname_vocab = surname_vocab
- # 初始化 nationality_vocab 属性,用于存储国籍的词汇表
- self.nationality_vocab = nationality_vocab
vectorize
方法:接受一个姓氏字符串作为输入,将其转换为向量表示。使用姓氏词汇表 surname_vocab
中的 lookup_token
方法找到每个字符在词汇表中的索引,然后将对应位置的值设为1,实现了一种简单的one-hot编码;返回一个长度为词汇表大小的浮点型数组,表示姓氏的向量化表示。
- '''用于将姓氏转换为向量表示'''
- def vectorize(self, surname):
- """
- 参数:
- surname (str): 姓氏
- Returns:
- one_hot (np.ndarray): 独热编码
- """
- vocab = self.surname_vocab # 将姓氏词汇表存储到局部变量 vocab 中
- # 创建一个全零的数组,用于存储姓氏的 one-hot 编码表示
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:# 遍历姓氏中的每个字符
- # 将字符的 one-hot 编码置为 1
- one_hot[vocab.lookup_token(token)] = 1
-
- # 返回姓氏的 one-hot 编码表示
- return one_hot
from_dataframe
类方法:从包含姓氏和国籍数据的DataFrame中构建 SurnameVectorizer
的实例。遍历DataFrame中的每一行,将姓氏中的每个字符添加到 surname_vocab
中,将国籍添加到 nationality_vocab
中;返回一个 SurnameVectorizer
实例,包含了从数据中构建的词汇表信息。
- '''实例化姓氏数据集'''
- @classmethod
- def from_dataframe(cls, surname_df):
- """
- Args:
- surname_df (pandas.DataFrame): 姓氏数据集
- Returns:
- SurnameVectorizer实例
- """
- # 初始化姓氏词汇表对象,其中 unk_token 参数指定了未知字符的标记
- surname_vocab = Vocabulary(unk_token="@")
- # 初始化国籍词汇表对象,add_unk 参数设为 False,表示不添加未知国籍的标记
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows(): # 遍历数据集中的每一行
- for letter in row.surname: # 遍历当前行中的每个字符
- surname_vocab.add_token(letter) # 将字符添加到姓氏词汇表中
- nationality_vocab.add_token(row.nationality) # 将国籍添加到国籍词汇表中
-
- # 返回一个 SurnameVectorizer 的实例,使用姓氏词汇表和国籍词汇表作为参数
- return cls(surname_vocab, nationality_vocab)
from_serializable
方法:从序列化内容中重建一个 SurnameVectorizer
实例。
to_serializable
方法:将 SurnameVectorizer
实例序列化为一个字典,包含姓氏词汇表和国籍词汇表的序列化表示。
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}
这两个方法用于在不同的环境中保存和加载 SurnameVectorizer
实例。
在这里我们定义了一个 Vocabulary
类,在姓氏分类任务中可以使用它来构建一个词汇表,将所有姓氏(或者其他相关的词汇,如字符级别的标记)映射到唯一的索引。
构建一个词汇表,我们可以得到什么?
将文本数据转换为数字形式,以便输入处理。
统一管理标记到索引的映射,避免在数据处理和模型训练过程中出现不一致的情况。
对于未见过的标记,可以选择添加未知标记并为其分配一个预定义的索引<UNK>,从而增强模型的泛化能力。
- class Vocabulary(object):
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- 初始化方法,用于初始化Vocabulary类的实例
- """
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- # 用于通过索引查找标记的便利数据结构
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- # 设置unknown标记,用于处理未登录词(out-of-vocabulary words)
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- # 如果需要添加unknown标记,则调用add_token方法进行添加
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- # 用于序列化Vocabulary对象的方法,返回一个包含词汇表信息的字典
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """ 从序列化的内容中重建 Vocabulary 类的实例 """
- return cls(**contents)
-
- def add_token(self, token):
- '''
- 将标记添加到词汇表中的方法,若标记已经存在,则返回其索引;
- 若不存在,则将其添加并返回索引
- '''
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- '''
- 接受一个标记列表,通过调用 add_token 方法将每个标记添加到词汇表中
- 并返回每个标记对应的索引列表
- '''
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """
- # 查找标记对应的索引,若标记不存在则使用unknown标记进行返回
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- '''
- 给定一个索引,返回该索引对应的标记。如果索引不存在于词汇表中,则抛出 KeyError
- '''
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- '''
- 返回一个字符串,表示词汇表的大小
- '''
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- '''
- 返回词汇表中标记的数量,即 _token_to_idx 的长度
- '''
- return len(self._token_to_idx)
SurnameClassifier是在上文我们介绍的MLP在Pytorch中的一个实现。第一个线性层将输入向量映射到中间向量,并对该向量应用非线性。第二线性层将中间向量映射到预测向量。在最后一步中,我们通过设置apply_softmax的值,可选地应用softmax操作,以确保输出和为1。
这就是所谓的“概率”。它是可选的原因与我们使用的损失函数的数学公式有关——交叉熵损失。我们研究了“损失函数”中的交叉熵损失。回想一下,交叉熵损失对于多类分类是最理想的,但是在训练过程中软最大值的计算不仅浪费而且在很多情况下并不稳定。
- import torch.nn as nn
- import torch.nn.functional as F
-
- class SurnameClassifier(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- # 调用父类的初始化方法
- super(SurnameClassifier, self).__init__()
- # 定义第一个全连接层,输入维度为input_dim,输出维度为hidden_dim
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- # 定义第二个全连接层,输入维度为hidden_dim,输出维度为output_dim
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- # 使用ReLU激活函数对第一个全连接层的输出进行非线性变换
- intermediate_vector = F.relu(self.fc1(x_in))
- # 第二个全连接层的输出
- prediction_vector = self.fc2(intermediate_vector)
- # prediction_vector = self.fc2(F.dropout(intermediate_vector, p=0.5))#带有dropout
-
- # 如果apply_softmax为True,则对输出进行softmax激活
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
以下是实验所需要的一些帮助实现训练的函数。
make_train_state函数:设置我们训练时的一些状态值,比如是否早停、早停条件值、训练损失、训练正确率等等。
- '''设置训练状态'''
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
update_train_state函数:处理训练状态更新。
- def update_train_state(args, model, train_state):
- """处理训练状态更新
- Components:
- -早期停止:防止过拟合。
- -模型检查点:如果模型更好,则保存模型
- :param args: 主要参数
- :param model: 要训练的模型
- :param train_state: 表示训练状态值的字典
- :returns:
- 新的train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
compute_accuracy函数:用于计算模型的准确率,输入预测值和目标值,返回准确率;
set_seed_everywhere函数:用于设置Numpy、Pytorch或者CUDA随机种子以确保实验的可重复性;
def handle_dirs函数:用于处理目录,如果目录不存在,则创建目录。
- '''定义函数用于计算模型的准确率'''
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)# max函数计算出预测值中最大值的索引
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()#使用 eq 函数比较预测值的索引和目标值,然后计算匹配的数量
- return n_correct / len(y_pred_indices) * 100#返回准确率
-
- '''定义函数用于设置随机种子以确保实验的可重复性'''
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)#设置NumPy的随机种子
- torch.manual_seed(seed)#设置PyTorch的随机种子
- if cuda:#检查是否使用CUDA,如果是,则设置CUDA相关的随机种子
- torch.cuda.manual_seed_all(seed)
-
- '''定义函数,用于处理目录。如果目录不存在,则创建目录'''
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
以下是本次实验使用MLP对姓氏进行分类设置的一些参数:
- args = Namespace(
- surname_csv="data/surnames/surnames_with_splits.csv",# 姓氏数据集的CSV文件路径
- vectorizer_file="vectorizer.json",# 向量化器的JSON文件路径
- model_state_file="model.pth", # 模型状态文件路径
- save_dir="model_storage/ch4/surname_mlp",# 模型保存目录路径
- # 模型参数
- hidden_dim=300,
- # 训练参数
- seed=1337,
- num_epochs=100,
- early_stopping_criteria=5,
- learning_rate=0.001,# 提前停止的标准
- batch_size=64,
- # 训练时段选择
- cuda=False,# 是否使用CUDA加速
- reload_from_files=False,# 是否从文件中重新加载模型
- expand_filepaths_to_save_dir=True,# 是否扩展文件路径以保存目录
- )
-
- if args.expand_filepaths_to_save_dir:#检查是否设置了expand_filepaths_to_save_dir参数
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)#将向量化器文件的路径设为保存目录中
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)#将模型状态文件的路径设为保存目录中
-
- #打印向量化器文件、模型状态文件的路径
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # 检查 CUDA
- if not torch.cuda.is_available():
- args.cuda = False
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- # 设置全局随机种子,以确保整个实验的可重复性
- set_seed_everywhere(args.seed, args.cuda)
-
- # 处理保存目录
- handle_dirs(args.save_dir)
查看我们保存下来的向量化器文件和模型状态:
训练中最显著的差异与模型中输出的种类和使用的损失函数有关。在分类任务中,输出是一个多类预测向量,可以转换为概率。我们使用了交叉熵损失CrossEntropyLoss。
交叉熵损失函数衡量了两个概率分布之间的相似度,或者可以理解为信息论中的信息量。在分类任务中,通常我们有一个真实的概率分布(实际标签的分布)和一个预测的概率分布(模型预测的标签的分布)。交叉熵损失越小,表示两个概率分布越接近,即模型的预测越准确。
假设有 N 个样本,每个样本有 C 个类别,交叉熵损失函数的一般形式为:
其中:是第 i 个样本的第 j 个类别的真实标签(0或1)。 是模型对第 i 个样本的第 j 个类别的预测概率。
二分类任务中的交叉熵损失函数:
特殊得,在二分类任务中,假设每个样本只有两个类别(通常标记为类别 0 和类别 1):
其中: 是第 i 个样本的真实类别标签,通常为 0 或 1。 是模型对第 i 个样本属于类别 1 的预测概率。
以上是对交叉熵损失函数的简单介绍,可以看出它的公式直接反映了概率分布之间的距离,通过最小化交叉熵损失,可以使模型更好地进行分类预测。
接下来是我们实验中数据集、模型、损失函数和优化器的实例化:
- if args.reload_from_files:
- # 如果设置了reload_from_files参数,从检查点训练
- print("Reloading!")
- # 加载数据集和向量化器
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # 否则,创建新的数据集和向量化器
- print("Creating fresh!")# 打印消息,提示创建新的数据集
- # 创建数据集和向量化器
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)# 保存向量化器
-
- vectorizer = dataset.get_vectorizer()# 获取向量化器
- # 创建姓氏分类器
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
- classifier = classifier.to(args.device)# 将分类器放到指定设备上
- dataset.class_weights = dataset.class_weights.to(args.device)# 将数据集的类权重放到指定设备上
-
- # 设置损失函数为交叉熵损失函数,并传入类权重
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- # 设置优化器为Adam,并传入分类器参数和学习率
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
-
- # 设置学习率调度器,根据验证集的表现动态调整学习率
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
利用训练数据,计算模型输出、损失和梯度。然后,使用梯度来更新模型。下面是我们进行模型训练的详细过程:
- train_state = make_train_state(args)#调用函数 make_train_state,创建训练状态
-
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,# 总共的训练轮数
- position=0)# 进度条位置设为0
-
- dataset.set_split('train')# 设置数据集为训练集
- train_bar = tqdm_notebook(desc='split=train',# 显示为训练集进度条
- total=dataset.get_num_batches(args.batch_size),# 计算训练集批次数
- position=1, # 进度条位置设为1
- leave=True) # 训练结束后保留进度条
-
- dataset.set_split('val') # 设置数据集为验证集
- val_bar = tqdm_notebook(desc='split=val', # 显示为验证集进度条
- total=dataset.get_num_batches(args.batch_size), # 计算验证集批次数
- position=1, # 进度条位置设为1
- leave=True) # 验证结束后保留进度条
-
- try:
- for epoch_index in range(args.num_epochs): # 遍历每个训练轮次
- train_state['epoch_index'] = epoch_index # 记录当前训练轮次的索引
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset, # 生成批次数据
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0 # 初始化运行损失为0
- running_acc = 0.0 # 初始化运行准确率为0
- classifier.train() # 设置分类器为训练模式
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # --------------------------------------
- optimizer.zero_grad()
-
- y_pred = classifier(batch_dict['x_surname'])
-
- loss = loss_func(y_pred, batch_dict['y_nationality']) # 计算损失
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- loss.backward()
-
- optimizer.step()
- # -----------------------------------------
-
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)# 更新训练进度条,显示当前损失、准确率和轮次信息
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)# 将当前训练损失记录到训练状态中
- train_state['train_acc'].append(running_acc)# 将当前训练准确率记录到训练状态中
-
- dataset.set_split('val')
- batch_generator = generate_batches(dataset, # 生成验证集批次数据
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval() # 设置分类器为评估模式
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- y_pred = classifier(batch_dict['x_surname'])
-
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)# 更新验证进度条,显示当前损失、准确率和轮次信息
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)# 将当前验证损失记录到训练状态中
- train_state['val_acc'].append(running_acc)# 将当前验证准确率记录到训练状态中
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)#调用函数 更新训练状态
-
- scheduler.step(train_state['val_loss'][-1])# 根据验证损失调整学习率
-
- if train_state['stop_early']: # 如果达到早停条件,则跳出训练循环
- break
-
- train_bar.n = 0 # 重置训练进度条
- val_bar.n = 0 # 重置验证进度条
- epoch_bar.update() # 更新总体进度条
- except KeyboardInterrupt:
- print("Exiting loop") # 如果捕获到键盘中断,则打印退出循环的消息
num_epoch=100,采用不带有dropout的SurnameClassifier模型,从下图的结果中可以看到训练集的损失loss=1.26,准确acc=52.3;验证集的损失loss=1.79,准确acc=46.1。
计算在测试集和训练集上的损失:
- #从磁盘中加载之前训练好的模型参数
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- #将数据集设置为测试模式
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()# 设置分类器为评估模式
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # 计算预测值
- y_pred = classifier(batch_dict['x_surname'])
-
- #计算损失
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- #计算正确率
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
-
- #打印测试结果
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))
可以看到我们在测试集上的损失和正确率分别是损失loss=1.819,准确acc=46.687。
在这里我们定义了一个通过姓氏预测国籍的函数。
predict_nationality函数:接受姓名、一个分类器和一个向量化器。首先使用提供的向量化器对姓名进行向量化;然后应用分类器以获取预测概率,预测的国籍是具有最高概率的国籍; 返回一个包含预测国籍及其概率的字典。
- def predict_nationality(name, classifier, vectorizer):
- vectorized_name = vectorizer.vectorize(name)# 将姓氏向量化
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)# 将向量化的姓氏转换为 PyTorch 张量
- # 使用分类器进行预测,设置 apply_softmax=True 对输出进行 softmax 处理
- result = classifier(vectorized_name, apply_softmax=True)
-
- # 在预测结果中找到最大概率值和对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()# 获取最大概率值对应的索引值
-
- # 使用向量化器的 nationality_vocab 查找索引对应的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()# 获取最大概率值
-
- return {'nationality': predicted_nationality,
- 'probability': probability_value}
- new_surname = input("Enter a surname to classify: ")#输入一个姓氏 new_surname
- classifier = classifier.to("cpu")
- #predict_nationality 函数基于加载的分类器模型 classifier 和特征转换器 vectorizer 进行预测
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- #预测结果包括预测的国籍和相应的概率,然后将其打印输出
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
输入我们测试的姓名样例Wu,得到Wu的最大概率预测结果是Chinese,并且对应概率是0.64。
上面的姓氏分类预测结果以概率形式输出,仅仅只输出概率最大的可能。如果我们不仅要看最好的预测,还要看更多的预测。那么,我们就可以采用k-best预测并使用另一个模型对它们重新排序。
predict_topk_nationality函数:与predict_nationality类似,但它返回前 k 个预测国籍及其概率。
同样,首先对姓名进行向量化,然后应用分类器以获取预测概率, 并选择前 k 个概率最高的国籍;返回一个包含多个字典的列表,每个字典包含一个预测的国籍及其概率。
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- vectorized_name = vectorizer.vectorize(name)
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # returned size is 1,k
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- results = []
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
- #输入另一个姓氏 new_surname
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
- #指定希望看到的前 k 个预测结果
- k = int(input("How many of the top predictions to see? "))
- #如果指定的 k 大于可用的国籍数量,则输出一条消息并将 k 设置为最大可用的国籍数量
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
- #使用 predict_topk_nationality 函数来获取给定姓氏的前 k 个预测结果
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- #每个预测结果包括预测的国籍和相应的概率。然后,将这些结果打印输出
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
再次输入我们测试的姓名样例Wu,这次还需要输入一个K值(3),得到Wu的最大概率预测结果是Chinese,并且对应概率是0.64,排列第二、第三的结果分别是Wu->korean、Wu->Vietnamese,对应的概率分别是0.27 和0.04。
随机失活(Dropout)是一种在神经网络训练中用来减少过拟合现象的技术。在训练过程中,Dropout以一定的概率随机地去除神经网络中的某些神经元及其相互连接。这个概率被称为“丢弃概率”(drop probability),通常设置为0.5。
我们从一段直观(且幽默)的解释开始:“Dropout,简单地说,是指如果你能在喝醉的时候反复学习如何做一件事,那么你应该能够在清醒的时候做得更好。这一见解产生了许多最先进的结果和一个新兴的领域。”
问题:神经网络在训练时很容易过度拟合训练数据,即学习到了训练数据中的噪声和特定的例外,而无法泛化到新的数据上。
解决:通过随机丢弃神经网络中的部分神经元及其连接,Dropout可以防止神经元之间过度拟合训练数据,从而使模型更具泛化能力。
实现:在每个训练批次中,每个神经元(包括输入神经元和隐藏层神经元)都有一定的概率被丢弃,即其输出被设为零。这样一来,每个神经元都不再依赖于特定的其它神经元的存在,因此无法专门对某些特定的输入“做出反应”。
我们尝试带有dropout的SurnameClassifier模型。这里将第二个全连接层的输出采用随机失活,且随机失活率设置为为0.5。
- # 第二个全连接层的输出
- # 采用dropout 随机失活概率=0.5
- output = self.fc2(F.dropout(intermediate, p=0.5))
看看它如何更改结果?验证集的损失loss=1.81,准确acc=45.9,与不带dropout的模型相比仅在验证集上有细微差别,这可能是数据集小,模型简单,没有过大差别。
卷积神经网络(Convolutional Neural Network,CNN)是一种主要用于处理图像和视频等二维数据的深度学习模型。它通过卷积层(Convolutional Layer)、池化层(Pooling Layer)和全连接层(Fully Connected Layer)等组件构成,以有效地提取和学习数据的空间层次特征。
卷积神经网络CNN因其在处理图像和视频数据时的有效性和高效性而广泛应用于计算机视觉任务,如图像分类、目标检测、图像分割等。在这里,我们应用CNN在进行姓氏的分类任务。在本节中,我们将介绍卷积神经网络(CNN),这是一种非常适合检测空间子结构(并因此创建有意义的空间子结构)的神经网络。
卷积神经网络CNN通常由多个卷积层和池化层交替堆叠而成,以及最后的全连接层用于分类。每个卷积层通过卷积操作和非线性激活函数提取特征,池化层则降低特征图的空间尺寸,最后的全连接层将高层次特征映射到最终的输出空间(例如类别概率)。
下图是识别数字7的CNN网络简单结构示例,可以看到输入特征图经过两次的卷积和池化,最后经过全连接层后采用softmax将最终结果输出。
“各司其职”的CNN结构层:
输入层(Input layer):输入数据;
卷积层(Convolutional Layer):卷积层是CNN的核心部分,用于提取输入数据中的特征;
池化层(Pooling Layer):即降采样层,用于减少卷积层输出的空间尺寸,减少参数数量和计算复杂度,同时保留重要特征;
全连接层(Fully Connected Layer):全连接层将卷积层和池化层提取的特征转换为最终的输出(例如分类结果);每个神经元与前一层的所有神经元相连,形成完全连接的结构;
输出层(Output layer):输出结果。
卷积层是CNN的核心部分,用于提取输入数据中的特征。
卷积层进行卷积操作,通过滤波器(或称为卷积核),在输入数据上进行局部感知,计算出每个滤波器在数据上滑动时的内积,生成特征图(Feature Map)。
卷积操作示例如下:可以看到以一定间隔(Stride=1)滑动卷积核(Kernel Size=3*3)的窗口,将各个位置上卷积核的元素和输入的对应元素相乘,然后再求和(有时将这个计算称为乘积累加运算),将这个结果保存到输出的对应位置。
虽然经典卷积是通过指定核的具体值来设计的,但是CNN是通过指定控制CNN行为的超参数来设计的,然后使用梯度下降来为给定数据集找到最佳参数。两个主要的超参数控制卷积的形状(称为kernel_size)和卷积将在输入数据张量(称为stride)中相乘的位置。还有一些额外的超参数控制输入数据张量被0填充了多少(称为padding),以及当应用到输入数据张量(称为dilation)时,乘法应该相隔多远。
1.卷积核(Kernel Size)
定义了每个卷积滤波器的空间大小,通常是一个正方形或矩形。卷积将输入中的空间(或时间)本地信息组合在一起,每个卷积的本地信息量由内核大小控制。然而,通过增加核的大小,也会减少输出的大小。
2.步长(Stride)
定义了滤波器在输入上移动的步长,影响输出特征图的尺寸。如果步长与核相同,则内核计算不会重叠。另一方面,如果跨度为1,则内核重叠最大。输出张量可以通过增加步幅的方式被有意的压缩来总结信息。
3.填充(Padding)
在输入数据的边缘周围添加额外的值(通常是0),用于控制输出特征图的尺寸和边缘像素的处理。
选择填充操作往往是因为stride和kernel_size允许控制每个计算出的特征值有多大范围,它们会无意识地缩小特征映射的总大小(卷积的输出)。为了抵消这一点,输入数据张量被人为地增加了长度(如果是一维、二维或三维)、高度(如果是二维或三维)和深度(如果是三维),方法是在每个维度上附加和前置0。这意味着CNN将执行更多的卷积,但是输出形状可以控制,而不会影响所需的核大小、步幅或扩展。
5.频道(channel)
非正式地,通道(channel)是指沿输入中的每个点的特征维度。
例如,在图像中,对应于RGB组件的图像中的每个像素有三个通道。在使用卷积时,文本数据也可以采用类似的概念。从概念上讲,如果文本文档中的“像素”是单词,那么通道的数量就是词汇表的大小。如果我们更细粒度地考虑字符的卷积,通道的数量就是字符集的大小(在本例中刚好是词汇表)。
对应地,我们需要了解卷积核数量(Number of Kernals),决定了卷积层输出的特征图数量,每个滤波器学习到不同的特征。有几个卷积核(滤波器)就会输出几张特征图。
6.膨胀(DILATION)
膨胀控制卷积核如何应用于输入矩阵。这个概念结合下图会更容易理解,膨胀相当于在核中跨跃——在核中的元素或核的应用之间存在一个step size,即存在“holes”。
图中的膨胀值dilation=1(默认值)增加到2,意味着当应用于输入矩阵时kernel_size=2的卷积核的元素彼此之间是两个空格。从默认值开始膨胀的增加意味着核矩阵的元素在与输入矩阵相乘时进一步分散开来。进一步增大扩张会加剧这种扩散。
Que:刚才我们提到,有几个卷积核就有多少个特征图。我们采用更多的卷积核,就会有更多的特征图,也意味着提取到的特征也非常多。这么多特征都是我们所需要的么?
解决:当然不是,多余的特征会给我们带来问题:过拟合和维度过高。这时我们可以利用池化层。
池化又称为下采样,当我们进行卷积操作后,再将得到的特征图进行特征提取,将其中最具有代表性的特征提取出来,可以起到减小过拟合和降低维度的作用。
方法:池化通常由两种方法
1.最大池化(Max Pooling):在每个池化窗口中选取最大值作为池化结果。
2.平均池化(Average Pooling):在每个池化窗口中取平均值作为池化结果。
第一步是将PyTorch的Conv1d类的一个实例应用到三维数据张量。通过检查输出的大小可以知道张量减少了多少。
在这里我们首先创建输入张量,data
的大小为 (2, 10, 7)
,
其中,batch_size = 2
:表示批量大小为 2;
one_hot_size = 10
:表示每个时间步上的输入特征维度为 10;
sequence_width = 7
:表示序列长度为 7。
再设置1D卷积层,其相关参数(in_channels=10, out_channels=16, kernel_size=3)。
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
-
- batch_size = 2
- one_hot_size = 10
- sequence_width = 7
- # 创建输入张量,大小为(2,10,7)
- data = torch.randn(batch_size, one_hot_size, sequence_width)
- conv1 = nn.Conv1d(in_channels=one_hot_size, out_channels=16,
- kernel_size=3)# 创建1D卷积层
- # 对输入进行第一个卷积操作得到中间结果
- intermediate1 = conv1(data)
- print(data.size())# 打印输入数据的大小
- print(intermediate1.size()) # 打印第一个中间结果的大小
对输入进行第一个卷积操作得到中间结果如下,可以看到输入张量是[2,10,7],输入通道是10,经过核大小为3的一维卷积,输出通道是16。而输出特征图的长度计算=输入序列长度-核大小+1,即 7-3+1=5。
所以输出张量是[2,16,5]。
进一步减小输出张量的主要方法有三种,第一种方法是创建额外的卷积并按顺序应用它们。
在这里,我们创建了另外两层卷积。最终,对应的sequence_width (dim=2)维度的大小将为1。
- conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3)# 创建第二个1D卷积层
- conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3)# 创建第三个1D卷积层
-
- # 对第一个中间结果进行第二个卷积操作得到第二个中间结果
- intermediate2 = conv2(intermediate1)
- # 对第二个中间结果进行第三个卷积操作得到最终的中间结果
- intermediate3 = conv3(intermediate2)
-
- print(intermediate2.size())# 打印第二个中间结果的大小
- print(intermediate3.size())# 打印最终中间结果的大小
-
- # 压缩最终中间结果的尺寸
- y_output = intermediate3.squeeze()
- print(y_output.size())
经过三次卷积之后,最终的输出在最终维度上的大小为1,压缩后结果[2,64]:
可以看到在每次卷积中,通道维数的大小都会增加,因为通道维数是每个数据点的特征向量。张量实际上是一个特征向量的最后一步是去掉讨厌的尺寸1维,我们使用squeeze()方法来实现这一点。该方法将删除size=1的所有维度并返回结果。
另外还有两种方法可以将张量简化为每个数据点的一个特征向量。第一种方法,只需使用PyTorch的view()方法将所有向量平展成单个向量。第二种方法使用一些数学运算来总结向量中的信息。最常见的操作是算术平均值,但沿feature map维数求和和使用最大值也是常见的。
- # 方法 2
- print(intermediate1.view(batch_size, -1).size())# 查看第一个中间结果reshape后的大小
-
- # 方法 3
- print(torch.mean(intermediate1, dim=2).size())# 使用平均值对第一个中间结果进行降维操作
- print(torch.max(intermediate1, dim=2)[0].size())# 使用最大值对第一个中间结果进行降维操作
- print(torch.sum(intermediate1, dim=2).size())# 对第一个中间结果进行求和操作
输出结果如下:
上面一节我们探索了张量经过卷积后如何变化成为特征向量。接下来我们可以来用CNN实战,进行姓氏分类任务。
尽管我们使用了来自“基于多层感知器的姓氏分类”中的相同数据集,但在实现上有一个不同之处:数据集由onehot向量矩阵组成,而不是一个收缩的onehot向量。为此,我们实现了一个数据集类,它跟踪最长的姓氏,并将其作为矩阵中包含的行数提供给矢量化器。下面是我们对`SurnameDataset.__getitem__`的更改:
- class SurnameDataset(Dataset):
- def __getitem__(self, index):
- """
- 从数据集中获取单个样本。
- 参数:
- index (int): 样本索引。
- 返回:
- dict: 包含姓氏矩阵和国籍索引的字典。
- """
- row = self._target_df.iloc[index]
-
- # 使用Vectorizer将姓氏向量化为矩阵
- surname_matrix = \
- self._vectorizer.vectorize(row.surname, self._max_seq_length)
-
- # 获取国籍在Vocabulary中的索引
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
除了更改为使用onehot矩阵之外,我们还修改了向量化器,以便计算姓氏的最大长度并将其保存为max_surname_length:
- class SurnameVectorizer(object):
- def vectorize(self, surname):
- """
- Args:
- surname (str): 姓氏
- Returns:
- one_hot_matrix (np.ndarray): one-hot矩阵
- """
-
- one_hot_matrix_size = (len(self.character_vocab), self.max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- # 遍历姓氏的每个字符,将其转换为独热向量
- for position_index, character in enumerate(surname):
- character_index = self.character_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """从数据集中初始化向量化器
- Args:
- surname_df (pandas.DataFrame): 姓氏数据集
- Returns:
- 姓氏向量化实例
- """
- character_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- # 遍历数据帧中的每一行,构建character_vocab和nationality_vocab
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:# 遍历字符串每个字符
- character_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(character_vocab, nationality_vocab, max_surname_length)
这里我们定义了一个卷积神经网络模型的分类器SurnameClassifier类,使用sequence和ELU PyTorch模块。序列模块是封装线性操作序列的方便包装器。在这种情况下,我们使用它来封装Conv1d序列的应用程序。ELU是类似于上文介绍的ReLU的非线性函数,但是它不是将值裁剪到0以下,而是对它们求幂。
- import torch.nn as nn
- import torch.nn.functional as F
- # 定义一个用于姓氏分类的卷积神经网络模型
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- super(SurnameClassifier, self).__init__()
- # 定义卷积神经网络的序列
- self.convnet = nn.Sequential(
- # 第一层卷积
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- # ELU 激活函数
- nn.ELU(),
- # 第二层卷积
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- # ELU 激活函数
- nn.ELU(),
- # 第三层卷积
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- # ELU 激活函数
- nn.ELU(),
- # 第四层卷积
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- # ELU 激活函数
- nn.ELU()
- )
- # 全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- # 通过卷积神经网络提取特征
- features = self.convnet(x_surname).squeeze(dim=2)
- # 通过全连接层得到预测向量
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- # 返回模型的输出
- return prediction_vector
我们也可以像MLP一样使用带有dropout的CNN模型,失活率设为0.5:
- import torch.nn.functional as F
- # 定义一个用于姓氏分类的卷积神经网络模型
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels, dropout_p=0.5):
- # 调用基类的构造函数
- super(SurnameClassifier, self).__init__()
- # 定义卷积神经网络的序列
- self.convnet = nn.Sequential(
- ......
- nn.ELU()
- )
- #dropout层
- self.dropout = nn.Dropout(p=dropout_p)
- # 全连接层
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False, training=False):
- # 通过卷积神经网络提取特征
- features = self.convnet(x_surname).squeeze(dim=2)
- if training:
- features = self.dropout(features)
- # 通过全连接层得到预测向量
- prediction_vector = self.fc(features)
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
- # 返回模型的输出
- return prediction_vector
批处理标准化(Batch Normalization)是设计网络时经常使用的一种工具。BatchNorm对CNN的输出进行转换,方法是将激活量缩放为零均值和单位方差。它用于Z-transform的平均值和方差值每批更新一次,这样任何单个批中的波动都不会太大地移动或影响它。
在这里我们使用了如何用卷积和线性层实例化和使用批处理规范:
- # ...
- self.conv1 = nn.Conv1d(in_channels=1, out_channels=10,
- kernel_size=5,
- stride=1)# 卷积层
- self.conv1_bn = nn.BatchNorm1d(num_features=10)# 批归一化层
- # ...
-
- def forward(self, x):
- # ...
- x = F.relu(self.conv1(x))# 应用卷积层
- x = self.conv1_bn(x)# 应用批归一化
- # ...
训练过程如下所示,训练集上的损失loss=1.03,正确率57.4,而验证集上损失loss=1.93,正确率54.3:
可以看到我们在测试集上的损失和正确率分别是:损失loss=1.784,准确acc=53.971;
而在我们使用MLP训练模型时在测试集上的损失loss=1.819,准确acc=46.687。从两个模型的测试效果中可以看出CNN模型在本次姓氏分类任务中较优于MLP模型。
我们使用训练好的CNN模型进行形式类别的预测,predict_nationality()函数的一部分发生了更改,我们没有使用视图方法重塑新创建的数据张量以添加批处理维度,而是使用PyTorch的unsqueeze()函数在批处理应该在的位置添加大小为1的维度。
- def predict_nationality(surname, classifier, vectorizer):
- # 将姓氏向量化
- vectorized_surname = vectorizer.vectorize(surname)
- # 将向量化的姓氏转换为 PyTorch 张量,并在第 0 维增加一个维度
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- # 使用分类器进行预测,设置 apply_softmax=True 对输出进行 softmax 处理
- result = classifier(vectorized_surname, apply_softmax=True)
-
- # 在预测结果中找到最大概率值和对应的索引
- probability_values, indices = result.max(dim=1)
- index = indices.item()# 获取最大概率值对应的索引值
-
- # 使用向量化器的 nationality_vocab 查找索引对应的国籍
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()# 获取最大概率值
-
- # 返回预测的国籍和其对应的概率值
- return {'nationality': predicted_nationality, 'probability': probability_value}
相同的更改反映在predict_topk_nationality()函数中:
- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- results = []
- for kth_index in range(k):
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- probability_value = probability_values[kth_index]
- results.append({'nationality': nationality,
- 'probability': probability_value})
- return results
-
- new_surname = input("Enter a surname to classify: ")
-
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
那么我们使用CNN模型测试的预测结果如何呢?
仅输出最大概率类别:
输出概率排名前三的预测结果:
从结果中可以看到,输入我们测试的姓名样例Wu,得到Wu的最大概率预测结果是Chinese,并且对应概率是0.64,排列第二、第三的结果分别是Wu->Scottish、Wu->Korean,对应的概率分别是0.09 和0.08。
下图是我们使用MLP模型进行姓氏分类的预测结果:
从两个预测结果比较可知,CNN模型预测正确类别的概率更大,且与不正确类别的概率差距同样更大,这正是我们所期望看到的,正确类别能够明显区分于其他类别。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。