当前位置:   article > 正文

基于MLP与CNN的前馈神经网络处理姓氏分类问题_前馈神经网络的实现与应用mlpclassifier代码实例

前馈神经网络的实现与应用mlpclassifier代码实例

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

  前馈神经网络(feedforward neural network,FNN),简称前馈网络,是人工神经网络的一种。前馈神经网络采用一种单向多层结构。其中每一层包含若干个神经元。在此种神经网络中,各神经元可以接收前一层神经元的信号,并产生输出到下一层。第0层叫输入层,最后一层叫输出层,其他中间层叫做隐含层(或隐藏层、隐层)。隐层可以是一层。也可以是多层。

  整个网络中无反馈,信号从输入层向输出层单向传播,可用一个有向无环图表示。

在这里插入图片描述

一个典型的多层前馈神经网络

一、The Multilayer Perceptron(多层感知器)

  多层感知器(MLP)被认为是最基本的神经网络构建模块之一。多层感知器将数据向量作为输入,计算出一个输出值。在MLP中,许多感知器被分组,以便单个层的输出是一个新的向量,而不是单个输出值。同时,它可以将多个层与每个层之间的非线性结合在一起。

  最简单的MLP,如图1-1所示,由三个表示阶段和两个线性层组成。第一阶段是输入向量。这是给定给模型的向量。给定输入向量,第一个线性层计算一个隐藏向量——表示的第二阶段。隐藏向量之所以这样被调用,是因为它是位于输入和输出之间的层的输出。使用这个隐藏的向量,第二个线性层计算一个输出向量。虽然在这个例子中,只展示了一个隐藏的向量,但是有可能有多个中间阶段,每个阶段产生自己的隐藏向量。最终的隐藏向量总是通过线性层和非线性的组合映射到输出向量。

在这里插入图片描述

图1-1一种具有两个线性层和三个表示阶段(输入向量、隐藏向量和输出向量)的MLP的可视化表示

  mlp的力量来自于添加第二个线性层和允许模型学习一个线性分割的的中间表示——该属性能表示一个直线(或更一般的,一个超平面)可以用来区分数据点落在线(或超平面)的哪一边的。学习具有特定属性的中间表示,如分类任务是线性可分的,这是使用神经网络的最深刻后果之一,也是其建模能力的精髓。

1.1 A Simple Example: XOR

  在一个二元分类任务中训练感知器和MLP:星和圆。每个数据点是一个二维坐标。在不深入研究实现细节的情况下,最终的模型预测如图1-2所示。在这个图中,错误分类的数据点用黑色填充,而正确分类的数据点没有填充。在左边的面板中,从填充的形状可以看出,感知器在学习一个可以将星星和圆分开的决策边界方面有困难。然而,MLP(右面板)学习了一个更精确地对恒星和圆进行分类的决策边界。

在这里插入图片描述

图1-2 从感知器(左)和MLP(右)学习的XOR问题的解决方案显示

  图1-2中,每个数据点的真正类是该点的形状:星形或圆形。错误的分类用块填充,正确的分类没有填充。这些线是每个模型的决策边界。在边的面板中,感知器学习一个不能正确地将圆与星分开的决策边界。事实上,没有一条线可以。在右动的面板中,MLP学会了从圆中分离星。

  虽然在图中显示MLP有两个决策边界,这是它的优点,但它实际上只是一个决策边界。决策边界是因为中间表示法改变了空间,使一个超平面同时出现在这两个位置上。在图1-3中,我们可以看到MLP计算的中间值。这些点的形状表示类(星形或圆形)。我们所看到的是,MLP已经学会了“扭曲”数据所处的空间,以便在数据通过最后一层时,用一线来分割它们。

在这里插入图片描述

图1-3 MLP的输入和中间表示是可视化的。从左到右:(1)网络的输入;(2)第一个线性模块的输出;(3)第一个非线性模块的输出;(4)第二个线性模块的输出。第一个线性模块的输出将圆和星分组,而第二个线性模块的输出将数据点重新组织为线性可分的。

  相反,如图1-4所示,感知器没有额外的一层来处理数据的形状,直到数据变成线性可分的。
在这里插入图片描述

图1-4 感知器的输入和输出表示。因为它没有像MLP那样的中间表示来分组和重新组织,所以它不能将圆和星分开。

1.2 Implementing MLPs in PyTorch

  MLP除了简单的感知器之外,还有一个额外的计算层。在例1-1中给出的实现中,使用PyTorch的两个线性模块实例化了这个想法。线性对象被命名为fc1和fc2,它们遵循一个通用约定,即将线性模块称为“完全连接层”,简称为“fc层”。除了这两个线性层外,还有一个修正的线性单元(ReLU),它在被输入到第二个线性层之前应用于第一个线性层的输出。由于层的顺序性,必须确保层中的输出数量等于下一层的输入数量。使用两个线性层之间的非线性是必要的,因为没有它,两个线性层在数学上等价于一个线性层,因此不能建模复杂的模式。MLP只实现反向传播的前向传递。这是因为PyTorch根据模型的定义和向前传递的实现,自动计算出如何进行向后传递和梯度更新。

Example 1-1. Multilayer Perceptron

import torch.nn as nn
import torch.nn.functional as F

class MultilayerPerceptron(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        """
        构造函数,初始化多层感知机的参数
        
        参数:
            input_dim (int): 输入向量的维度
            hidden_dim (int): 第一个线性层的输出维度(隐藏层神经元数量)
            output_dim (int): 第二个线性层的输出维度(输出层神经元数量)
        """
        super(MultilayerPerceptron, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)  # 定义第一个全连接层
        self.fc2 = nn.Linear(hidden_dim, output_dim)  # 定义第二个全连接层

    def forward(self, x_in, apply_softmax=False):
        """
        前向传播函数
        
        参数:
            x_in (torch.Tensor): 输入数据张量
                x_in 的形状应为 (batch_size, input_dim)
            apply_softmax (bool): 是否应用 softmax 激活函数
                如果与交叉熵损失函数一起使用,应设置为 False
        
        返回值:
            结果张量,形状应为 (batch_size, output_dim)
        """
        intermediate = F.relu(self.fc1(x_in))  # 在第一个全连接层后应用 ReLU 激活函数
        output = self.fc2(intermediate)  # 通过第二个全连接层得到输出

        if apply_softmax:
            output = F.softmax(output, dim=1)  # 如果需要,应用 softmax 激活函数
        return output

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

1.3 Surname Classification with a Multilayer Perceptron

  将MLP应用于将姓氏分类到其原籍国的任务流程如下:

首先对每个姓氏的字符进行拆分。

然后,使用词汇表、向量化器和DataLoader类逐步完成从姓氏字符串到向量化小批处理的管道。同时引入了多类输出及其对应的损失函数。

1.3.1 The Surname Dataset

  姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,该数据集具有以下属性——
1,它是相当不平衡的。排名前三的语言占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。
2,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。

Example 1-2. Implementing SurnameDataset

import json
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader

class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        """
        参数:
            surname_df (pandas.DataFrame): 数据集
            vectorizer (SurnameVectorizer): 从数据集实例化的矢量化器
        """
        self.surname_df = surname_df
        self._vectorizer = vectorizer

        self.train_df = self.surname_df[self.surname_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')
        
        # 类别权重
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)

    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv):
        """加载数据集并从头创建一个新的矢量化器
        
        参数:
            surname_csv (str): 数据集的位置
        返回:
            SurnameDataset 的一个实例
        """
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split=='train']
        return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
        """加载数据集和相应的矢量化器。
        在矢量化器已缓存以便重用的情况下使用
        
        参数:
            surname_csv (str): 数据集的位置
            vectorizer_filepath (str): 保存的矢量化器的位置
        返回:
            SurnameDataset 的一个实例
        """
        surname_df = pd.read_csv(surname_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(surname_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        """从文件加载矢量化器的静态方法
        
        参数:
            vectorizer_filepath (str): 序列化矢量化器的位置
        返回:
            SurnameVectorizer 的一个实例
        """
        with open(vectorizer_filepath) as fp:
            return SurnameVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath):
        """使用 json 将矢量化器保存到磁盘
        
        参数:
            vectorizer_filepath (str): 保存矢量化器的位置
        """
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self):
        """ 返回矢量化器 """
        return self._vectorizer

    def set_split(self, split="train"):
        """ 使用数据框中的列选择数据集中的分割 """
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size

    def __getitem__(self, index):
        """PyTorch 数据集的主要入口方法
        
        参数:
            index (int): 数据点的索引
        返回:
            包含数据点的字典:
                特征 (x_surname)
                标签 (y_nationality)
        """
        row = self._target_df.iloc[index]

        surname_vector = \
            self._vectorizer.vectorize(row.surname)

        nationality_index = \
            self._vectorizer.nationality_vocab.lookup_token(row.nationality)

        return {'x_surname': surname_vector,
                'y_nationality': nationality_index}

    def get_num_batches(self, batch_size):
        """给定批大小,返回数据集中批的数量
        
        参数:
            batch_size (int)
        返回:
            数据集中批的数量
        """
        return len(self) // batch_size

    
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"): 
    """
    一个包装了 PyTorch DataLoader 的生成器函数。
    它将确保每个张量位于正确的设备位置。
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145

1.3.2 Vocabulary, Vectorizer, and DataLoader

  为了使用字符对姓氏进行分类,使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。

Example 1-3. Implementing Vocabulary

class Vocabulary(object):
    """处理文本并提取用于映射的词汇的类"""

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        """
        参数:
            token_to_idx (dict): 预先存在的从token到索引的映射
            add_unk (bool): 是否添加UNK标记的标志
            unk_token (str): 要添加到词汇表中的UNK标记
        """

        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token 
                              for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token) 
        
        
    def to_serializable(self):
        """ 返回可以序列化的字典 """
        return {'token_to_idx': self._token_to_idx, 
                'add_unk': self._add_unk, 
                'unk_token': self._unk_token}

    @classmethod
    def from_serializable(cls, contents):
        """ 从序列化字典实例化词汇表 """
        return cls(**contents)

    def add_token(self, token):
        """根据token更新映射字典

        参数:
            token (str): 要添加到词汇表中的项目
        返回:
            index (int): 对应于token的整数
        """
        try:
            index = self._token_to_idx[token]
        except KeyError:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def add_many(self, tokens):
        """将一系列tokens添加到词汇表中
        
        参数:
            tokens (list): 字符串token列表
        返回:
            indices (list): 对应于tokens的索引列表
        """
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        """检索与token关联的索引
           或者如果token不存在,则返回UNK索引
        
        参数:
            token (str): 要查找的token
        返回:
            index (int): 对应于token的索引
        注意:
            `unk_index`需要>=0(已添加到词汇表中)
            才能使用UNK功能
        """
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        """返回与索引关联的token
        
        参数: 
            index (int): 要查找的索引
        返回:
            token (str): 对应于索引的token
        异常:
            KeyError: 如果索引不在词汇表中
        """
        if index not in self._idx_to_token:
            raise KeyError("索引 (%d) 不在词汇表中" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

Example 1-4. Implementing SurnameVectorizer

import numpy as np

class SurnameVectorizer(object):
    """ 矢量化器,负责协调词汇表并使用它们 """
    def __init__(self, surname_vocab, nationality_vocab):
        self.surname_vocab = surname_vocab
        self.nationality_vocab = nationality_vocab

    def vectorize(self, surname):
        """将提供的姓氏矢量化

        参数:
            surname (str): 姓氏
        返回:
            one_hot (np.ndarray): 一个压缩的一热编码
        """
        vocab = self.surname_vocab
        one_hot = np.zeros(len(vocab), dtype=np.float32)
        for token in surname:
            one_hot[vocab.lookup_token(token)] = 1
        return one_hot

    @classmethod
    def from_dataframe(cls, surname_df):
        """从数据框实例化矢量化器

        参数:
            surname_df (pandas.DataFrame): 姓氏数据集
        返回:
            SurnameVectorizer 的一个实例
        """
        surname_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)

        for index, row in surname_df.iterrows():
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)

        return cls(surname_vocab, nationality_vocab)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

1.3.3 The Surname Classifier Model

  SurnameClassifier是本实验的MLP的实现。第一个线性层将输入向量映射到中间向量,并对该向量应用非线性。第二线性层将中间向量映射到预测向量。在最后一步中,可选地应用softmax操作,以确保输出和为1;这就是所谓的“概率”。

  交叉熵损失对于多类分类是最理想的,但是在训练过程中软最大值的计算不仅浪费而且在很多情况下并不稳定。

Example 1-5. The SurnameClassifier as an MLP

import torch.nn as nn
import torch.nn.functional as F

class SurnameClassifier(nn.Module):
    """ A 2-layer Multilayer Perceptron for classifying surnames """
    def __init__(self, input_dim, hidden_dim, output_dim):
        """
        Args:
            input_dim (int): the size of the input vectors
            hidden_dim (int): the output size of the first Linear layer
            output_dim (int): the output size of the second Linear layer
        """
        super(SurnameClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x_in, apply_softmax=False):
        """The forward pass of the classifier

        Args:
            x_in (torch.Tensor): an input data tensor.
                x_in.shape should be (batch, input_dim)
            apply_softmax (bool): a flag for the softmax activation
                should be false if used with the Cross Entropy losses
        Returns:
            the resulting tensor. tensor.shape should be (batch, output_dim)
        """
        intermediate_vector = F.relu(self.fc1(x_in))
        prediction_vector = self.fc2(intermediate_vector)

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

1.3.4 The Training Routine

Example 1-6. The Training

import os
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import json
from argparse import Namespace


def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}

def update_train_state(args, model, train_state):
    """Handle the training state updates.

    Components:
     - Early Stopping: Prevent overfitting.
     - Model Checkpoint: Model is saved if the model is better

    :param args: main arguments
    :param model: model to train
    :param train_state: a dictionary representing the training state values
    :returns:
        a new train_state
    """

    # Save one model at least
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # Save model if performance improved
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # If loss worsened
        if loss_t >= train_state['early_stopping_best_val']:
            # Update step
            train_state['early_stopping_step'] += 1
        # Loss decreased
        else:
            # Save the best model
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # Reset early stopping step
            train_state['early_stopping_step'] = 0

        # Stop early ?
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

def compute_accuracy(y_pred, y_target):
    y_pred_indices = y_pred.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100


def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)



# 加载初始数据集
surname_csv = 'surnames.csv'
df = pd.read_csv(surname_csv)

# 将数据集划分为训练集、验证集和测试集
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['nationality'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['nationality'], random_state=42)

# 添加 split 列
train_df = train_df.copy()
train_df.loc[:, 'split'] = 'train'
val_df = val_df.copy()
val_df.loc[:, 'split'] = 'val'
test_df = test_df.copy()
test_df.loc[:, 'split'] = 'test'

# 合并所有数据集
final_df = pd.concat([train_df, val_df, test_df])

# 保存为 CSV 文件
final_df.to_csv("data/surnames/surnames_with_splits.csv", index=False)


# 参数定义
args = Namespace(
    # 数据和路径信息
    surname_csv="surnames_with_splits.csv",  # 包含拆分信息的数据集路径
    vectorizer_file="vectorizer.json",  # 向量化器的保存路径
    model_state_file="model.pth",  # 模型状态的保存路径
    save_dir="model_storage/ch4/surname_mlp",  # 模型保存目录
    # 模型超参数
    hidden_dim=300,  # 隐藏层维度
    # 训练超参数
    seed=1337,  # 随机种子
    num_epochs=10,  # 训练的迭代次数
    early_stopping_criteria=5,  # 提前停止的标准
    learning_rate=0.001,  # 学习率
    batch_size=64,  # 批处理大小
    # 运行时选项
    cuda=False,  # 是否使用CUDA
    reload_from_files=False,  # 是否从文件重载
    expand_filepaths_to_save_dir=True,  # 是否扩展文件保存路径
)

# 扩展保存路径
if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file)
    args.model_state_file = os.path.join(args.save_dir, args.model_state_file)
    
    print("扩展后的文件路径: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    
# 检查 CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
    
print("使用 CUDA: {}".format(args.cuda))

# 设置随机种子以确保可复现性
def set_seed_everywhere(seed, cuda):
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

set_seed_everywhere(args.seed, args.cuda)

# 处理目录
def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)

handle_dirs(args.save_dir)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157

在这里插入图片描述
  训练中最显著的差异与模型中输出的种类和使用的损失函数有关。在这个例子中,输出是一个多类预测向量,可以转换为概率。正如在模型描述中所描述的,这种输出的损失类型仅限于CrossEntropyLoss和NLLLoss。由于它的简化,我们使用了CrossEntropyLoss。

Example 1-7. Instantiating the dataset, model, loss, and optimizer

# 加载数据集并创建矢量化器
dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
vectorizer = dataset.get_vectorizer()

# 初始化分类器
classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
                               hidden_dim=args.hidden_dim,
                               output_dim=len(vectorizer.nationality_vocab))

# 将分类器转移到设备(CPU或CUDA)
classifier = classifier.to(args.device)    

# 设置损失函数,使用加权的交叉熵损失
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
# 设置优化器,使用Adam优化器
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Example 1-8. A snippet of the training loop

import numpy as np
from tqdm.notebook import tqdm as tqdm_notebook



# 将分类器和类权重转移到设备
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

# 定义损失函数、优化器和学习率调度器
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                 mode='min', factor=0.5,
                                                 patience=1)

# 创建训练状态字典
train_state = make_train_state(args)

# 创建进度条
epoch_bar = tqdm_notebook(desc='training routine', 
                          total=args.num_epochs,
                          position=0)

dataset.set_split('train')
train_bar = tqdm_notebook(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size), 
                          position=1, 
                          leave=True)
dataset.set_split('val')
val_bar = tqdm_notebook(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size), 
                        position=1, 
                        leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index

        # 在训练数据集上迭代
        dataset.set_split('train')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            optimizer.zero_grad()
            y_pred = classifier(batch_dict['x_surname'])
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)
            loss.backward()
            optimizer.step()
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            train_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)

        # 在验证数据集上迭代
        dataset.set_split('val')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.
        running_acc = 0.
        classifier.eval()

        for batch_index, batch_dict in enumerate(batch_generator):
            y_pred =  classifier(batch_dict['x_surname'])
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.to("cpu").item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            val_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
            val_bar.update()

        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        # 更新训练状态
        train_state = update_train_state(args=args, model=classifier,
                                         train_state=train_state)

        scheduler.step(train_state['val_loss'][-1])

        if train_state['stop_early']:
            break

        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("循环中断")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

在这里插入图片描述

# 加载最佳模型
classifier.load_state_dict(torch.load(train_state['model_filename']))

# 将模型和类权重转移到设备
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

# 定义损失函数
loss_func = nn.CrossEntropyLoss(dataset.class_weights)

# 设置数据集为测试集
dataset.set_split('test')

# 生成测试集批次数据
batch_generator = generate_batches(dataset, 
                                   batch_size=args.batch_size, 
                                   device=args.device)

# 初始化运行损失和准确率
running_loss = 0.
running_acc = 0.

# 将模型设置为评估模式
classifier.eval()

# 遍历测试集批次
for batch_index, batch_dict in enumerate(batch_generator):
    # 计算输出
    y_pred =  classifier(batch_dict['x_surname'])
    
    # 计算损失
    loss = loss_func(y_pred, batch_dict['y_nationality'])
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # 计算准确率
    acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
    running_acc += (acc_t - running_acc) / (batch_index + 1)

# 更新训练状态中的测试集损失和准确率
train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc


#打印结果
print("Test loss: {};".format(train_state['test_loss']))
print("Test Accuracy: {}".format(train_state['test_acc']))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

在这里插入图片描述

1.3.5 Model Evaluation and Prediction

1.3.5.1 Classifying A New Surname

  通过查看分类器的top-k预测来为一个新示例开发模型所了解的内容的直觉。
Example 1-9. A function for performing nationality prediction

def predict_nationality(name, classifier, vectorizer):
    # 将名字矢量化
    vectorized_name = vectorizer.vectorize(name)
    vectorized_name = torch.tensor(vectorized_name).view(1, -1)

    # 使用分类器进行预测
    result = classifier(vectorized_name, apply_softmax=True)

    # 获取最大概率值及其对应的索引
    probability_values, indices = result.max(dim=1)
    index = indices.item()

    # 查找预测的国籍及其概率
    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()

    return {'nationality': predicted_nationality,
            'probability': probability_value}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
new_surname = input("请输入要分类的姓氏: ")
# 将分类器移至CPU
classifier = classifier.cpu()
# 预测国籍
prediction = predict_nationality(new_surname, classifier, vectorizer)
# 输出预测结果
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述

1.3.5.2 Retrieving The Top-k Predictions For A New Surname

  不仅要看最好的预测,还要看更多的预测。PyTorch提供了一个torch.topk函数,它提供了一种方便的方法来获得这些预测。

Example 1-10. Predicting the top-k nationalities

def predict_topk_nationality(surname, classifier, vectorizer, k=5):
    """预测姓氏的前K个国籍
    
    参数:
        surname (str): 要分类的姓氏
        classifier (SurnameClassifer): 分类器实例
        vectorizer (SurnameVectorizer): 对应的矢量化器
        k (int): 要返回的前几个国籍
    返回:
        由字典组成的列表,每个字典包含一个国籍和一个概率
    """
    
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
    prediction_vector = classifier(vectorized_surname, apply_softmax=True)
    probability_values, indices = torch.topk(prediction_vector, k=k)
    
    # 返回的大小是 1,k
    probability_values = probability_values[0].detach().numpy()
    indices = indices[0].detach().numpy()
    
    results = []
    for kth_index in range(k):
        nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
        probability_value = probability_values[kth_index]
        results.append({'nationality': nationality, 
                        'probability': probability_value})
    return results

new_surname = input("请输入要分类的姓氏: ")

k = int(input("您希望看到前多少个预测结果? "))
if k > len(vectorizer.nationality_vocab):
    print("对不起! 这是超过我们国籍数量的请求.. 默认最大数量 :)")
    k = len(vectorizer.nationality_vocab)
    
predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)

print("前 {} 个预测结果:".format(k))
print("===================")
for prediction in predictions:
    print("{} -> {} (p={:0.2f})".format(new_surname,
                                        prediction['nationality'],
                                        prediction['probability']))

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

在这里插入图片描述

二、Convolutional Neural Networks(卷积神经网络)

  卷积神经网络(Convolutional Neural Networks, CNN)是一类包含卷积计算且具有深度结构的前馈神经网络(Feedforward Neural Networks),是深度学习(deep learning)的代表算法之一 。卷积神经网络具有表征学习(representation learning)能力,能够按其阶层结构对输入信息进行平移不变分类(shift-invariant classification),因此也被称为“平移不变人工神经网络(Shift-Invariant Artificial Neural Networks, SIANN)” 。

  为了理解不同的设计决策对CNN意味着什么,在本例中,单个“核”应用于输入矩阵。卷积运算(线性算子)的精确数学表达式对于理解这一节并不重要,但是从这个图中可以直观地看出,核是一个小的方阵,它被系统地应用于输入矩阵的不同位置。
在这里插入图片描述

二维卷积运算

  输入矩阵与单个产生输出矩阵的卷积核(也称为特征映射)在输入矩阵的每个位置应用内核。在每个应用程序中,内核乘以输入矩阵的值及其自身的值,然后将这些乘法相加kernel具有以下超参数配置:kernel_size=2,stride=1,padding=0,以及dilation=1。

2.1 Implementing CNNs in PyTorch

Example 2-1. Artificial data and using a Conv1d class

import torch
import torch.nn as nn

batch_size = 2
one_hot_size = 10
sequence_width = 7

# 随机生成数据
data = torch.randn(batch_size, one_hot_size, sequence_width)

# 定义1D卷积层
conv1 = nn.Conv1d(in_channels=one_hot_size, out_channels=16, kernel_size=3)

# 通过卷积层传递数据
intermediate1 = conv1(data)

# 打印输入数据和卷积输出的尺寸
print(data.size())            # 输出: torch.Size([2, 10, 7])
print(intermediate1.size())   # 输出: torch.Size([2, 16, 5])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

在这里插入图片描述

2.2 Classifying Surnames by Using a CNN

2.2.1 The SurnameDataset

    虽然姓氏数据集之前在“示例:带有多层感知器的姓氏分类”中进行了描述,但建议参考“姓氏数据集”来了解它的描述。尽管使用了相同数据集,但在实现上有一个不同之处:数据集由onehot向量矩阵组成,而不是一个收缩的onehot向量。为此,我们实现了一个数据集类,它跟踪最长的姓氏,并将其作为矩阵中包含的行数提供给矢量化器。列的数量是onehot向量的大小(词汇表的大小)。示例2-2对SurnameDataset.__getitem__进行了更改;同时对SurnameVectorizer的进行了更改。

    使用数据集中最长的姓氏来控制onehot矩阵的大小有两个原因。首先,将每一小批姓氏矩阵组合成一个三维张量,要求它们的大小相同。其次,使用数据集中最长的姓氏意味着可以以相同的方式处理每个小批处理。

Example 2-2. SurnameDataset modified for passing the maximum surname length

import torch
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import json

class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        """
        Args:
            surname_df (pandas.DataFrame): 姓氏数据集
            vectorizer (SurnameVectorizer): 从数据集实例化的向量化器
        """
        self.surname_df = surname_df
        self._vectorizer = vectorizer
        self.train_df = self.surname_df[self.surname_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')
        
        # 类别权重
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)


    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv):
        """加载数据集并创建一个新的向量化器
        
        Args:
            surname_csv (str): 数据集的位置
        Returns:
            SurnameDataset 的实例
        """
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split=='train']
        return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
        """加载数据集及其对应的向量化器。
        用于在向量化器已缓存以便重复使用的情况下
        
        Args:
            surname_csv (str): 数据集的位置
            vectorizer_filepath (str): 保存的向量化器的位置
        Returns:
            SurnameDataset 的实例
        """
        surname_df = pd.read_csv(surname_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(surname_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        """从文件加载向量化器的静态方法
        
        Args:
            vectorizer_filepath (str): 序列化向量化器的位置
        Returns:
            SurnameDataset 的实例
        """
        with open(vectorizer_filepath) as fp:
            return SurnameVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath):
        """使用 json 将向量化器保存到磁盘
        
        Args:
            vectorizer_filepath (str): 保存向量化器的位置
        """
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self):
        """返回向量化器"""
        return self._vectorizer

    def set_split(self, split="train"):
        """使用数据框中的列选择数据集的拆分"""
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size

    def __getitem__(self, index):
        """用于 PyTorch 数据集的主要入口方法
        
        Args:
            index (int): 数据点的索引 
        Returns:
            一个字典,包含数据点的特征 (x_data) 和标签 (y_target)
        """
        row = self._target_df.iloc[index]

        surname_matrix = \
            self._vectorizer.vectorize(row.surname)

        nationality_index = \
            self._vectorizer.nationality_vocab.lookup_token(row.nationality)

        return {'x_surname': surname_matrix,
                'y_nationality': nationality_index}

    def get_num_batches(self, batch_size):
        """给定批次大小,返回数据集中的批次数
        
        Args:
            batch_size (int)
        Returns:
            数据集中的批次数
        """
        return len(self) // batch_size

    
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):
    """
    包装 PyTorch DataLoader 的生成器函数。它确保每个张量都位于正确的设备位置上。
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

2.2.2 Vocabulary, Vectorizer, and DataLoader

   尽管词汇表和DataLoader的实现方式与“示例:带有多层感知器的姓氏分类”中的示例相同,但Vectorizer的vectorize()方法已经更改,以适应CNN模型的需要。我们将使用的Conv1d层要求数据张量在第0维上具有批处理,在第1维上具有通道,在第2维上具有特性。

   除了更改为使用onehot矩阵之外,我们还修改了矢量化器,以便计算姓氏的最大长度并将其保存为max_surname_length

Example 2-3. Implementing the Surname Vectorizer for CNNs

class SurnameVectorizer(object):
    """ 姓氏向量化器,用于协调词汇表并将其用于转换 """
    def vectorize(self, surname):
        """
        将姓氏转换为独热编码矩阵形式

        Args:
            surname (str): 姓氏
        Returns:
            one_hot_matrix (np.ndarray): 一个独热编码向量的矩阵
        """

        one_hot_matrix_size = (len(self.character_vocab), self.max_surname_length)
        one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)

        for position_index, character in enumerate(surname):
            character_index = self.character_vocab.lookup_token(character)
            one_hot_matrix[character_index][position_index] = 1

        return one_hot_matrix

    @classmethod
    def from_dataframe(cls, surname_df):
        """ 从数据集 dataframe 实例化向量化器

        Args:
            surname_df (pandas.DataFrame): 姓氏数据集
        Returns:
            SurnameVectorizer 的一个实例
        """
        character_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)
        max_surname_length = 0

        for index, row in surname_df.iterrows():
            max_surname_length = max(max_surname_length, len(row.surname))
            for letter in row.surname:
                character_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)

        return cls(character_vocab, nationality_vocab, max_surname_length)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

2.2.3 Reimplementing the SurnameClassifier with Convolutional Networks

  本例中的新内容是使用sequence和ELU PyTorch模块。序列模块是封装线性操作序列的方便包装器。在这种情况下,我们使用它来封装Conv1d序列的应用程序。

  在本例中,我们将每个卷积的通道数与num_channels超参数绑定。我们可以选择不同数量的通道分别进行卷积运算。这样做需要优化更多的超参数。我们发现256足够大,可以使模型达到合理的性能。

Example 2-3. The CNN-based SurnameClassifier

import torch.nn as nn
import torch.nn.functional as F

class SurnameClassifier(nn.Module):
    def __init__(self, initial_num_channels, num_classes, num_channels):
        """
        姓氏分类器模型的定义
        
        Args:
            initial_num_channels (int): 输入特征向量的大小
            num_classes (int): 输出预测向量的大小
            num_channels (int): 网络中保持不变的通道大小
        """
        super(SurnameClassifier, self).__init__()

        self.convnet = nn.Sequential(
            nn.Conv1d(in_channels=initial_num_channels,
                      out_channels=num_channels, kernel_size=3),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
                      kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
                      kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
                      kernel_size=3),
            nn.ELU()
        )
        self.fc = nn.Linear(num_channels, num_classes)

    def forward(self, x_surname, apply_softmax=False):
        """分类器的前向传播

        Args:
            x_surname (torch.Tensor): 输入数据张量。
                x_surname.shape 应为 (batch, initial_num_channels, max_surname_length)
            apply_softmax (bool): softmax 激活的标志
                如果与交叉熵损失一起使用,则应为 false
        Returns:
            结果张量。张量.shape 应为 (batch, num_classes)
        """
        features = self.convnet(x_surname).squeeze(dim=2)
        prediction_vector = self.fc(features)

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

2.2.4 The Training Routine

Example 2-4. Input arguments to the CNN surname classifier

args = Namespace(
    # 数据和路径信息
    surname_csv="/home/jovyan/surnames_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch4/cnn",
    # 模型超参数
    hidden_dim=100,
    num_channels=256,
    # 训练超参数
    seed=1337,
    learning_rate=0.001,
    batch_size=128,
    num_epochs=10,
    early_stopping_criteria=5,
    dropout_p=0.1,
    # 运行时选项
    cuda=False,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True,
    catch_keyboard_interrupt=True
)


if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
    print("Expanded filepaths: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    
# 检查 CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))

def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)
        
def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)
        
# 设置随机种子以确保可重现性
set_seed_everywhere(args.seed, args.cuda)

# 处理目录
handle_dirs(args.save_dir)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

在这里插入图片描述

2.2.5 Model Evaluation and Prediction

Example 2-5. Using the trained model to make predictions

def predict_nationality(surname, classifier, vectorizer):
    """预测一个新姓氏的国籍
    
    Args:
        surname (str): 要分类的姓氏
        classifier (SurnameClassifer): 分类器的实例
        vectorizer (SurnameVectorizer): 相应的矢量化器
    Returns:
        一个字典,包含最有可能的国籍及其概率
    """
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
    result = classifier(vectorized_surname, apply_softmax=True)

    probability_values, indices = result.max(dim=1)
    index = indices.item()

    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()

    return {'nationality': predicted_nationality, 'probability': probability_value}
new_surname = input("Enter a surname to classify: ")
classifier = classifier.cpu()
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

在这里插入图片描述

总结

  本文主要讨论了多层感知机(MLP)和卷积神经网络(CNN)在处理姓氏分类问题的应用与表现。

  感知机是一个单层神经网络,感知机作为最基本的神经网络模型,由输入层、权重和输出层组成;适用于线性可分的数据集。能够对线性可分的数据进行分类。其输出是输入特征的线性组合经过一个激活函数处理后的结果。

  相比于感知机,MLP包含一个或多个隐藏层,使用非线性激活函数,使其能够解决更复杂的分类问题。然而,对于具有空间结构的数据(如图像)效率不高。

  CNN是一种专门处理具有空间结构数据的神经网络,通过卷积操作提取局部特征,并通过池化操作减少特征维度,最终使用全连接层进行分类或回归。相比于MLP,能有效捕捉数据中的空间特征,参数共享和局部连接减少了模型的计算复杂度,在图像分类、目标检测等任务中表现优异。

  总的来看,感知机适合线性数据,MLP适合复杂但结构化数据,CNN适合具有空间结构的数据。希望本文的介绍能让读者对感知机、MLP和CNN有更深入的理解,并能够在实际项目中灵活应用这些模型,解决实际问题。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/813006
推荐阅读
相关标签
  

闽ICP备14008679号