当前位置:   article > 正文

Pytorch 之 前馈神经网络 NLP姓氏分类 —— 从MLP到CNN

Pytorch 之 前馈神经网络 NLP姓氏分类 —— 从MLP到CNN

一. 感知机部分

1.1 perceptron (感知机)

感知机是现存最简单的神经网络,用于二分类任务的线性分类模型,只有简单的输入层和输出层,通过线性组合与激活函数输出分类结果。感知器的一个历史性的缺点是它不能学习数据中存在的一些非常重要的模式。例如,图1-1中绘制的数据点。在异或(XOR)情况下,决策边界不能是一条直线(也称为线性可分)。下图中,感知器不能完成分类任务。


图1-1 XOR数据集中的两个类绘制为圆形和星形。请注意,没有任何一行可以分隔这两个类。

1.2 The Multilayer Perceptron(多层感知器)

多层感知器(MLP)是对感知器的扩展。感知器将数据向量作为输入,计算出一个输出值。在MLP中,许多感知器被分组,以便单个层的输出是一个新的向量,而不是单个输出值,从而将多个层与每个层之间的非线性结合在一起。。在PyTorch中,只需设置线性层中的输出特性的数量即可实现。MLP的另一个方面是,它将多个层与每个层之间的非线性结合在一起。

最简单的MLP(三层感知机),如图1-2所示,由三个表示层和两个线性层组成。第一层(输入层)是输入向量。如在餐馆评论的情绪进行分类任务中,输入向量可以是Yelp评论的一个one-hot表示。给定输入向量,第一个线性层计算出了隐藏向量(第二层)。使用这个隐藏向量,第二个线性层计算一个输出向量(第三层 or 输出层)。在像Yelp评论分类这样的二分类任务中,输出向量仍然可以是0 or 1。在多类设置中,即本blog后面的“示例:带有多层感知器的姓氏分类”一节中看到,输出向量同类数量大小相同。虽然在这个例子中,我们只展示了一个隐藏的向量,但是实际中MLP也可能会有多个中间层(隐藏层),每个阶段产生自己的隐藏向量。最终的隐藏向量总是通过线性层和非线性的组合映射到输出向量。

图1-2 一种具有两个线性层和三个表示阶段(输入向量、隐藏向量和输出向量)的MLP的可视化表示

多层感知机(MLP)的进步在于添加了第二个线性层,并允许模型学习一个线性分割的中间表示。这种表示的特性在于能够通过一条直线(or 一个超平面)来区分数据点位于直线(or 超平面)的哪一侧。学习到特定属性的中间表示使得分类任务变得线性可分,是其建模能力的精髓所在。

如图1-3所示。在XOR示例中,错误分类的数据点用黑色填充,而正确分类的数据点没有填充。在左边的面板中,从填充的形状可以看出,感知器在学习一个可以将星星和圆分开的决策边界方面有困难。然而,MLP(右面板)学习了一个更精确的决策边界。


图1-3 从感知器(左)和MLP(右)学习的XOR问题的解决方案显示

尽管图中显示MLP具有两个决策边界,但实际上它只有一个决策边界!由于中间表示法改变了空间,使得一个超平面同时出现在这两个位置上,才会出现这样的效果。在图1-4中,我们可以看到MLP计算的中间值。这些点的形状表示类别(星形或圆形)。从图中可以看出,神经网络(在本例中为MLP)已经学会了“扭曲”数据所处的空间,使得数据通过最后一层时,可以用一条直线将其分割开来。

在这里插入图片描述

图1-4 MLP的输入和中间表示是可视化的。从左到右:(1)网络的输入;(2)第一个线性模块的输出;(3)第一个非线性模块的输出;(4)第二个线性模块的输出。

相反,如图1-5所示,感知器没有额外的一层来处理数据的形状来让数据线性可分


图1-5 感知器的输入和输出表示

1.3 Surname Classification with MLP(姓氏分类之MLP)

1.3.1 任务介绍

The Surname Dataset(姓氏数据集)收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。数据集的结构并不平衡。排名前三的种类占据了数据的60%以上,分别是英语姓氏(27%),俄语姓氏(21%),和阿拉伯语姓氏(14%)。此外,该数据集的另一个显著特点是在国籍和姓氏的正字法(拼写)之间存在一种有效且直观的关系。有些拼写变体与原籍国联系非常紧密(例如“O’Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。

1.3.2 代码实现与运行结果

考虑神经网络需要经常性的调试,推荐配合jupyter食用

Vocabulary, Vectorizer, and DataLoader

为了使用字符对姓氏进行分类,我们使用词汇表、向量化器和DataLoader将姓氏字符串转换为向量化的minibatches。

class Vocabulary(object):
    """处理文本并提取用于映射的词汇表的类"""
    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        """
        初始化词汇表
        参数:
            token_to_idx (dict): 预先存在的词到索引的映射字典
            add_unk (bool): 是否添加 UNK 令牌的标志
            unk_token (str): 要添加到词汇表中的 UNK 令牌
        """
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        # 创建索引到词的反向映射字典
        self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)  # 添加 UNK 令牌并获取其索引
        
    def to_serializable(self):
        """返回可序列化的字典"""
        return {
            'token_to_idx': self._token_to_idx, 
            'add_unk': self._add_unk, 
            'unk_token': self._unk_token
        }

    @classmethod
    def from_serializable(cls, contents):
        """从序列化字典实例化词汇表"""
        return cls(**contents)

    def add_token(self, token):
        """根据令牌更新映射字典
        参数:
            token (str): 要添加到词汇表中的令牌
        返回:
            index (int): 与令牌对应的整数索引
        """
        try:
            index = self._token_to_idx[token]
        except KeyError:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def add_many(self, tokens):
        """将多个令牌添加到词汇表中
        参数:
            tokens (list): 字符串令牌列表
        返回:
            indices (list): 与这些令牌对应的索引列表
        """
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        """检索与令牌关联的索引,如果令牌不存在则返回 UNK 索引
        参数:
            token (str): 要查找的令牌
        返回:
            index (int): 与令牌对应的索引
        备注:
            `unk_index` 需要 >= 0(已添加到词汇表)才能启用 UNK 功能
        """
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        """返回与索引关联的令牌
        参数: 
            index (int): 要查找的索引
        返回:
            token (str): 与索引对应的令牌
        抛出:
            KeyError: 如果索引不在词汇表中
        """
        if index not in self._idx_to_token:
            raise KeyError("索引 (%d) 不在词汇表中" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
class SurnameVectorizer(object):
    """用于协调词汇表并将其用于向量化的矢量化器"""
    def __init__(self, surname_vocab, nationality_vocab):
        """
        初始化矢量化器
        参数:
            surname_vocab (Vocabulary): 将字符映射到整数的词汇表
            nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
        """
        self.surname_vocab = surname_vocab
        self.nationality_vocab = nationality_vocab

    def vectorize(self, surname):
        """
        将姓氏转换为一维的独热编码向量
        参数:
            surname (str): 姓氏
        返回:
            one_hot (np.ndarray): 独热编码后的向量
        """
        vocab = self.surname_vocab
        one_hot = np.zeros(len(vocab), dtype=np.float32)  # 创建长度为词汇表大小的零向量
        for token in surname:
            one_hot[vocab.lookup_token(token)] = 1  # 对姓氏中的每个字符进行独热编码
        return one_hot

    @classmethod
    def from_dataframe(cls, surname_df):
        """
        从数据集的 DataFrame 实例化矢量化器
        参数:
            surname_df (pandas.DataFrame): 姓氏数据集
        返回:
            SurnameVectorizer 的实例
        """
        surname_vocab = Vocabulary(unk_token="@")  # 初始化带有UNK令牌的姓氏词汇表
        nationality_vocab = Vocabulary(add_unk=False)  # 初始化不带UNK令牌的国籍词汇表

        # 遍历数据集,添加所有姓氏中的字符到姓氏词汇表中,并添加所有国籍到国籍词汇表中
        for index, row in surname_df.iterrows():
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)
        return cls(surname_vocab, nationality_vocab)

    @classmethod
    def from_serializable(cls, contents):
        """
        从可序列化的字典中实例化矢量化器
        参数:
            contents (dict): 序列化的词汇表内容
        返回:
            SurnameVectorizer 的实例
        """
        surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
        nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
        return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)

    def to_serializable(self):
        """
        返回可序列化的字典
        """
        return {
            'surname_vocab': self.surname_vocab.to_serializable(),
            'nationality_vocab': self.nationality_vocab.to_serializable()
        }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        """
        初始化姓氏数据集
        参数:
            surname_df (pandas.DataFrame): 数据集
            vectorizer (SurnameVectorizer): 从数据集中实例化的矢量化器
        """
        self.surname_df = surname_df
        self._vectorizer = vectorizer

        self.train_df = self.surname_df[self.surname_df.split == 'train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split == 'val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split == 'test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {
            'train': (self.train_df, self.train_size),
            'val': (self.val_df, self.validation_size),
            'test': (self.test_df, self.test_size)
        }
        self.set_split('train')
        
        # 类别权重
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)

    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv):
        """从头加载数据集并创建新的矢量化器
        参数:
            surname_csv (str): 数据集的位置
        返回:
            SurnameDataset 的实例
        """
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split == 'train']
        return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
        """加载数据集和相应的矢量化器。
        在矢量化器已缓存以供重用的情况下使用
        参数:
            surname_csv (str): 数据集的位置
            vectorizer_filepath (str): 已保存的矢量化器的位置
        返回:
            SurnameDataset 的实例
        """
        surname_df = pd.read_csv(surname_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(surname_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        """静态方法从文件加载矢量化器
        参数:
            vectorizer_filepath (str): 序列化矢量化器的位置
        返回:
            SurnameVectorizer 的实例
        """
        with open(vectorizer_filepath) as fp:
            return SurnameVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath):
        """使用 json 将矢量化器保存到磁盘
        参数:
            vectorizer_filepath (str): 保存矢量化器的位置
        """
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self):
        """返回矢量化器"""
        return self._vectorizer

    def set_split(self, split="train"):
        """使用数据帧中的列选择数据集中的拆分"""
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size

    def __getitem__(self, index):
        """PyTorch 数据集的主要入口方法
        参数:
            index (int): 数据点的索引
        返回:
            包含数据点特征(x_surname)和标签(y_nationality)的字典
        """
        row = self._target_df.iloc[index]

        surname_vector = self._vectorizer.vectorize(row.surname)

        nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)

        return {'x_surname': surname_vector, 'y_nationality': nationality_index}

    def get_num_batches(self, batch_size):
        """给定批量大小,返回数据集中的批次数量
        参数:
            batch_size (int)
        返回:
            数据集中的批次数量
        """
        return len(self) // batch_size

    
def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"): 
    """
    包装 PyTorch DataLoader 的生成器函数。
    确保每个张量在正确的设备位置上。
    参数:
        dataset (Dataset): 数据集
        batch_size (int): 批量大小
        shuffle (bool): 是否打乱数据
        drop_last (bool): 是否丢弃最后一个不完整的批次
        device (str): 设备("cpu" 或 "cuda")
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136

Surname Classifier Model

SurnameClassifier是本实验中实现的多层感知机(MLP)。第一个线性层将输入向量映射到中间向量,并对该向量应用非线性激活函数。第二个线性层将中间向量映射到预测向量。

在最后一步中,可以选择应用softmax操作,以确保输出的和为1,这样就可以解释为“概率”。之所以说是可选的,是因为我们使用的损失函数的数学公式——交叉熵损失。我们在“损失函数”中研究了交叉熵损失。回想一下,交叉熵损失是多类分类的理想选择,但在训练过程中计算softmax不仅浪费计算资源,而且在许多情况下并不稳定。

import torch.nn as nn
import torch.nn.functional as F

class SurnameClassifier(nn.Module):
    """用于姓氏分类的两层多层感知器"""
    
    def __init__(self, input_dim, hidden_dim, output_dim):
        """
        初始化分类器

        参数:
            input_dim (int): 输入向量的维度
            hidden_dim (int): 第一个全连接层的输出维度
            output_dim (int): 第二个全连接层的输出维度
        """
        super(SurnameClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)  # 定义第一个全连接层
        self.fc2 = nn.Linear(hidden_dim, output_dim)  # 定义第二个全连接层

    def forward(self, x_in, apply_softmax=False):
        """分类器的前向传播过程

        参数:
            x_in (torch.Tensor): 输入数据张量,形状应为 (batch, input_dim)
            apply_softmax (bool): 是否应用 softmax 激活函数的标志
                如果与交叉熵损失函数一起使用,应设置为 False
        返回:
            结果张量,形状应为 (batch, output_dim)
        """
        intermediate_vector = F.relu(self.fc1(x_in))  # 第一个全连接层并应用 ReLU 激活函数
        prediction_vector = self.fc2(intermediate_vector)  # 第二个全连接层

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)  # 如果需要,应用 softmax 激活函数

        return prediction_vector  # 返回预测向量
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

Training

from argparse import Namespace
from collections import Counter
import json
import os
import string
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm_notebook
def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)
args = Namespace(
    # Data and path information
    surname_csv="surnames_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch4/surname_mlp",
    # Model hyper parameters
    hidden_dim=300,
    # Training  hyper parameters
    seed=1337,
    num_epochs=100,
    early_stopping_criteria=5,
    learning_rate=0.001,
    batch_size=64,
    # Runtime options
    cuda=False,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True,
)

if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
    print("Expanded filepaths: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    
# Check CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
    
print("Using CUDA: {}".format(args.cuda))

# # Set seed for reproducibility
set_seed_everywhere(args.seed, args.cuda)
# # handle dirs
handle_dirs(args.save_dir)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

在这里插入图片描述

#-----------------------------------------------------------------------------------# 

# 从数据集中加载并创建矢量化器
dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
vectorizer = dataset.get_vectorizer()

# 初始化姓氏分类器
classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
                               hidden_dim=args.hidden_dim,
                               output_dim=len(vectorizer.nationality_vocab))

# 将分类器模型移到指定设备(例如:CPU 或 GPU)
classifier = classifier.to(args.device)    

# 初始化损失函数,并使用类别权重来处理类别不平衡问题
loss_func = nn.CrossEntropyLoss(dataset.class_weights)

# 初始化优化器,使用 Adam 优化算法并设置学习率
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)

if args.reload_from_files:
    # training from a checkpoint
    print("Reloading!")
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # create dataset and vectorizer
    print("Creating fresh!")
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)
    
vectorizer = dataset.get_vectorizer()
classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab), 
                               hidden_dim=args.hidden_dim, 
                               output_dim=len(vectorizer.nationality_vocab))


def make_train_state(args):
    """
    创建并初始化训练状态字典
    参数:
        args: 主参数
    返回:
        包含训练状态值的字典
    """
    return {
        'stop_early': False,  # 是否早停
        'early_stopping_step': 0,  # 早停步数
        'early_stopping_best_val': 1e8,  # 早停最佳验证损失
        'learning_rate': args.learning_rate,  # 学习率
        'epoch_index': 0,  # 当前周期索引
        'train_loss': [],  # 训练损失列表
        'train_acc': [],  # 训练准确率列表
        'val_loss': [],  # 验证损失列表
        'val_acc': [],  # 验证准确率列表
        'test_loss': -1,  # 测试损失
        'test_acc': -1,  # 测试准确率
        'model_filename': args.model_state_file  # 模型文件名
    }

def update_train_state(args, model, train_state):
    """
    处理训练状态更新
    组件:
     - 早停:防止过拟合
     - 模型检查点:如果模型表现更好,则保存模型
    参数:
        args: 主参数
        model: 训练的模型
        train_state: 表示训练状态值的字典
    返回:
        更新后的训练状态字典
    """
    # 至少保存一个模型
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # 如果模型性能改善,则保存模型
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # 如果损失变差
        if loss_t >= train_state['early_stopping_best_val']:
            # 更新步数
            train_state['early_stopping_step'] += 1
        # 损失减少
        else:
            # 保存最佳模型
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # 重置早停步数
            train_state['early_stopping_step'] = 0

        # 是否早停?
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

def compute_accuracy(y_pred, y_target):
    """
    计算预测结果的准确率
    参数:
        y_pred (torch.Tensor): 预测的张量
        y_target (torch.Tensor): 目标的张量
    返回:
        准确率百分比
    """
    _, y_pred_indices = y_pred.max(dim=1)
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
# 将分类器模型和类别权重移到指定设备(例如:CPU 或 GPU)
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

# 初始化损失函数,并使用类别权重来处理类别不平衡问题
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
# 初始化优化器,使用 Adam 优化算法并设置学习率
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
# 初始化学习率调度器,在损失不再降低时调整学习率
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                 mode='min', factor=0.5,
                                                 patience=1)

# 初始化训练状态
train_state = make_train_state(args)

# 初始化进度条
epoch_bar = tqdm_notebook(desc='training routine', 
                          total=args.num_epochs,
                          position=0)

# 设置数据集的拆分方式为训练集,并初始化训练进度条
dataset.set_split('train')
train_bar = tqdm_notebook(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size), 
                          position=1, 
                          leave=True)
# 设置数据集的拆分方式为验证集,并初始化验证进度条
dataset.set_split('val')
val_bar = tqdm_notebook(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size), 
                        position=1, 
                        leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index

        # 迭代训练数据集

        # 设置:批量生成器,初始化损失和准确率为 0,设置训练模式
        dataset.set_split('train')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            # 训练过程的五个步骤:

            # --------------------------------------
            # 步骤 1. 将梯度归零
            optimizer.zero_grad()

            # 步骤 2. 计算输出
            y_pred = classifier(batch_dict['x_surname'])

            # 步骤 3. 计算损失
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 步骤 4. 使用损失计算梯度
            loss.backward()

            # 步骤 5. 使用优化器更新权重
            optimizer.step()
            # -----------------------------------------
            # 计算准确率
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)

            # 更新进度条
            train_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)

        # 迭代验证数据集

        # 设置:批量生成器,初始化损失和准确率为 0,设置评估模式
        dataset.set_split('val')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.
        running_acc = 0.
        classifier.eval()

        for batch_index, batch_dict in enumerate(batch_generator):

            # 计算输出
            y_pred =  classifier(batch_dict['x_surname'])

            # 步骤 3. 计算损失
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.to("cpu").item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 计算准确率
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            val_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            val_bar.update()

        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        # 更新训练状态
        train_state = update_train_state(args=args, model=classifier,
                                         train_state=train_state)

        # 调整学习率
        scheduler.step(train_state['val_loss'][-1])

        # 早停
        if train_state['stop_early']:
            break

        # 重置进度条
        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("Exiting loop")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131

在这里插入图片描述

简单的测试环节

def predict_nationality(name, classifier, vectorizer):
    """
    预测姓氏的国籍

    参数:
        name (str): 输入的姓氏
        classifier (SurnameClassifier): 已训练的分类器模型
        vectorizer (SurnameVectorizer): 矢量化器,用于将姓氏和国籍映射到整数

    返回:
        dict: 包含预测的国籍和相应的概率值
    """
    # 将姓氏矢量化
    vectorized_name = vectorizer.vectorize(name)
    # 将矢量化后的姓氏转换为张量,并调整形状为 (1, -1)
    vectorized_name = torch.tensor(vectorized_name).view(1, -1)
    # 使用分类器进行预测,并应用 softmax 激活函数
    result = classifier(vectorized_name, apply_softmax=True)

    # 获取最大概率值和相应的索引
    probability_values, indices = result.max(dim=1)
    index = indices.item()

    # 根据索引查找预测的国籍
    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()

    return {'nationality': predicted_nationality,
            'probability': probability_value}


new_surname = input("Enter a surname to classify: ")
classifier = classifier.to("cpu")
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在这里插入图片描述

----------------------------------------------------------手动分割-------------------------------------------------------------

二. 卷积神经网络部分(CNN)

2.1 CNN介绍

卷积神经网络(Convolutional Neural Network, CNN)是一种专为处理具有网格状拓扑数据(如图像)的深度学习模型。它在图像识别和分类任务中表现出色,广泛应用于计算机视觉领域。

MLP不能利用字段中的顺序。例如在姓氏数据集中,姓氏可以有(不同长度的)段,这些段可以显示出相当多关于其起源国家的信息(如“O’Neill”中的“O”、“Antonopoulos”中的“opoulos”、“Nagasawa”中的“sawa”或“Zhu”中的“Zh”)。这些段的长度可以是可变的,但是很难在不显式编码的情况下捕获它们,故而利用MLP模型进行学习会损失这部分信息

CNN是一种非常适合检测空间子结构(并因此创建有意义的空间子结构)的神经网络。CNNs通过使用少量的权重来扫描输入数据张量,从而产生表示子结构检测的输出张量。

2.1.1 CNN Hyperparameters

为了理解不同的设计对CNN意味着什么,我们在图2-1中展示了一个示例。在本例中,单个“核(kernel)”应用于输入矩阵。卷积运算(线性算子)的精确数学表达式对于理解这一节并不重要,但是从这个图中可以直观地看出,核是一个小的方阵,它被系统地应用于输入矩阵的不同位置。


图2-1 二维卷积运算。

DIMENSION OF THE CONVOLUTION OPERATION

在PyTorch中,卷积可以是一维、二维或三维的,分别由Conv1d、Conv2d和Conv3d模块实现。一维卷积适用于时间序列数据,二维卷积用于图像处理,而三维卷积则处理视频数据中的时空信息。我们使用二维卷积进行说明

CHANNELS

通道(channel)是指输入中每个点的特征维度。例如,图像的每个像素有三个通道(RGB)。在卷积操作中,文本数据也可以采用类似的概念,将单词视为“像素”,通道数量为词汇表大小。PyTorch的卷积实现中,in_channels参数表示输入通道数,out_channels表示输出通道数。合理选择输出通道数对模型性能至关重要。

图2-2 卷积运算用两个输入矩阵(两个输入通道)表示相应的核也有两层,它将每层分别相乘,然后对结果求和。参数配置:input_channels=2, output_channels=1, kernel_size=2, tride=1, padding=0, and dilation=1.


图2-3 一种具有一个输入矩阵(一个输入通道)和两个卷积的卷积运算核(两个输出通道)。这些核分别应用于输入矩阵,并堆叠在输出张量。参数配置:input_channels=1, output_channels=2, kernel_size=2, tride=1, padding=0, and dilation=1.

KERNEL SIZE

核矩阵的宽度称为核大小(PyTorch中的kernel_size)。核大小控制卷积操作中本地信息的组合范围。较小的核大小捕捉细粒度特性,而较大的核大小则捕捉较大范围的模式。核大小的选择会影响输出的大小,较大的核会减少输出矩阵的尺寸。

图2-4 将kernel_size=3的卷积应用于输入矩阵。结果是一个折衷的结果:在每次将内核应用于矩阵时,都会使用更多的局部信息,但输出的大小会更小.

STRIDE

步幅(stride)控制卷积核在输入上的移动步长。如果步幅与核大小相同,卷积计算不会重叠;如果步幅为1,卷积核最大程度重叠。增加步幅可以压缩输出张量,减少计算量,同时总结更多信息。

图2-5 应用于具有超参数步长的输入的kernel_size=2的卷积核等于2。这会导致内核采取更大的步骤,从而产生更小的输出矩阵。对于更稀疏地对输入矩阵进行二次采样非常有用。

PADDING

填充(padding)用于在输入数据的边缘添加额外的值(通常是0),以控制输出的形状,避免卷积操作缩小特征映射的总大小。通过填充,可以在不影响核大小和步幅的情况下,保持输出矩阵的尺寸与输入矩阵一致。

图2-6 应用于高度和宽度等于的输入矩阵的kernel_size=2的卷积2。但是,由于填充(用深灰色正方形表示),输入矩阵的高度和宽度可以被放大。这通常与大小为3的内核一起使用,这样输出矩阵将等于输入矩阵的大小。

DILATION

膨胀(dilation)控制卷积核在应用时的扩展程度。增加膨胀值意味着在核元素之间引入空格,从而扩大卷积的感受野,而不增加参数数量。膨胀卷积在堆叠多个卷积层时非常有用,可以指数级地增大感受野。

图2-7 应用于超参数dilation=2的输入矩阵的kernel_size=2的卷积。从默认值开始膨胀的增加意味着核矩阵的元素在与输入矩阵相乘时进一步分散开来。进一步增大扩张会加剧这种扩散。

2.2 代码实现

SurnameDataset.__getitem__方法在此得到了修改,代码结构基本一致,注释略

from argparse import Namespace
from collections import Counter
import json
import os
import string

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm_notebook
class Vocabulary(object):

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token 
                              for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token) 
        
        
    def to_serializable(self):
        return {'token_to_idx': self._token_to_idx, 
                'add_unk': self._add_unk, 
                'unk_token': self._unk_token}

    @classmethod
    def from_serializable(cls, contents):
        return cls(**contents)

    def add_token(self, token):
        try:
            index = self._token_to_idx[token]
        except KeyError:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def add_many(self, tokens):
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        if index not in self._idx_to_token:
            raise KeyError("the index (%d) is not in the Vocabulary" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)
class SurnameVectorizer(object):
    """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
    def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
        self.surname_vocab = surname_vocab
        self.nationality_vocab = nationality_vocab
        self._max_surname_length = max_surname_length

    def vectorize(self, surname):
        one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
        one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
                               
        for position_index, character in enumerate(surname):
            character_index = self.surname_vocab.lookup_token(character)
            one_hot_matrix[character_index][position_index] = 1
        
        return one_hot_matrix

    @classmethod
    def from_dataframe(cls, surname_df):
        surname_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)
        max_surname_length = 0

        for index, row in surname_df.iterrows():
            max_surname_length = max(max_surname_length, len(row.surname))
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)

        return cls(surname_vocab, nationality_vocab, max_surname_length)

    @classmethod
    def from_serializable(cls, contents):
        surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
        nationality_vocab =  Vocabulary.from_serializable(contents['nationality_vocab'])
        return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab, 
                   max_surname_length=contents['max_surname_length'])

    def to_serializable(self):
        return {'surname_vocab': self.surname_vocab.to_serializable(),
                'nationality_vocab': self.nationality_vocab.to_serializable(), 
                'max_surname_length': self._max_surname_length}
class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        self.surname_df = surname_df
        self._vectorizer = vectorizer
        self.train_df = self.surname_df[self.surname_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')
        
        # Class weights
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)


    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv):
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split=='train']
        return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
        surname_df = pd.read_csv(surname_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(surname_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        with open(vectorizer_filepath) as fp:
            return SurnameVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath):
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self):
        return self._vectorizer

    def set_split(self, split="train"):
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size

    def __getitem__(self, index):
        row = self._target_df.iloc[index]

        surname_matrix = \
            self._vectorizer.vectorize(row.surname)

        nationality_index = \
            self._vectorizer.nationality_vocab.lookup_token(row.nationality)

        return {'x_surname': surname_matrix,
                'y_nationality': nationality_index}

    def get_num_batches(self, batch_size):

        return len(self) // batch_size

    
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):

    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
class SurnameClassifier(nn.Module):
    def __init__(self, initial_num_channels, num_classes, num_channels):
        super(SurnameClassifier, self).__init__()
        
        self.convnet = nn.Sequential(
            nn.Conv1d(in_channels=initial_num_channels, 
                      out_channels=num_channels, kernel_size=3),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels, 
                      kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels, 
                      kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels, 
                      kernel_size=3),
            nn.ELU()
        )
        self.fc = nn.Linear(num_channels, num_classes)

    def forward(self, x_surname, apply_softmax=False):
        features = self.convnet(x_surname).squeeze(dim=2)
        prediction_vector = self.fc(features)
        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}
def update_train_state(args, model, train_state):
    # Save one model at least
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # Save model if performance improved
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # If loss worsened
        if loss_t >= train_state['early_stopping_best_val']:
            # Update step
            train_state['early_stopping_step'] += 1
        # Loss decreased
        else:
            # Save the best model
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # Reset early stopping step
            train_state['early_stopping_step'] = 0

        # Stop early ?
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria
    return train_state
def compute_accuracy(y_pred, y_target):
    y_pred_indices = y_pred.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100
args = Namespace(
    # Data and Path information
    surname_csv="data/surnames/surnames_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch4/cnn",
    # Model hyper parameters
    hidden_dim=100,
    num_channels=256,
    # Training hyper parameters
    seed=1337,
    learning_rate=0.001,
    batch_size=128,
    num_epochs=100,
    early_stopping_criteria=5,
    dropout_p=0.1,
    # Runtime options
    cuda=False,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True,
    catch_keyboard_interrupt=True
)


if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
    print("Expanded filepaths: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    
# Check CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))

def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)
        
def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)
        
# Set seed for reproducibility
set_seed_everywhere(args.seed, args.cuda)

# handle dirs
handle_dirs(args.save_dir)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
if args.reload_from_files:
    # training from a checkpoint
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # create dataset and vectorizer
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)
    
vectorizer = dataset.get_vectorizer()

classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab), 
                               num_classes=len(vectorizer.nationality_vocab),
                               num_channels=args.num_channels)

classifer = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                           mode='min', factor=0.5,
                                           patience=1)

train_state = make_train_state(args)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
epoch_bar = tqdm_notebook(desc='training routine', 
                          total=args.num_epochs,
                          position=0)

dataset.set_split('train')
train_bar = tqdm_notebook(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size), 
                          position=1, 
                          leave=True)
dataset.set_split('val')
val_bar = tqdm_notebook(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size), 
                        position=1, 
                        leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index
        dataset.set_split('train')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            optimizer.zero_grad()
            y_pred = classifier(batch_dict['x_surname'])

            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)
            loss.backward()
            optimizer.step()
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)

            train_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)
        dataset.set_split('val')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.
        running_acc = 0.
        classifier.eval()

        for batch_index, batch_dict in enumerate(batch_generator):
            y_pred =  classifier(batch_dict['x_surname'])
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            val_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            val_bar.update()
        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        train_state = update_train_state(args=args, model=classifier,
                                         train_state=train_state)
        scheduler.step(train_state['val_loss'][-1])
        if train_state['stop_early']:
            break
        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("Exiting loop")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
classifier.load_state_dict(torch.load(train_state['model_filename']))
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)
loss_func = nn.CrossEntropyLoss(dataset.class_weights)

dataset.set_split('test')
batch_generator = generate_batches(dataset, 
                                   batch_size=args.batch_size, 
                                   device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    y_pred =  classifier(batch_dict['x_surname'])
    loss = loss_func(y_pred, batch_dict['y_nationality'])
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)
    acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
    running_acc += (acc_t - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc
print("Test loss: {};".format(train_state['test_loss']))
print("Test Accuracy: {}".format(train_state['test_acc']))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

测试

def predict_nationality(surname, classifier, vectorizer):
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
    result = classifier(vectorized_surname, apply_softmax=True)
    probability_values, indices = result.max(dim=1)
    index = indices.item()
    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()
    return {'nationality': predicted_nationality, 'probability': probability_value}
new_surname = input("Enter a surname to classify: ")
classifier = classifier.cpu()
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

三. 写在最后

在本文中,我们探讨了从感知机到多层感知机(MLP),再到卷积神经网络(CNN)在姓氏分类任务中的应用和实现。感知机作为最简单的神经网络模型,虽然在一些简单的任务中表现良好,但在处理复杂数据模式时存在局限性。MLP通过增加隐藏层和非线性激活函数,显著提升了模型的表达能力,使得其能够解决更多复杂的分类问题。然而,对于具有空间结构的数据,如图像和视频,MLP仍然不能充分利用数据中的空间信息。

为了解决这一问题,我们引入了卷积神经网络(CNN)。CNN通过使用卷积层来捕捉数据中的局部特征,极大地提升了模型在图像和视频处理中的表现。在本文的示例中,我们展示了如何使用CNN来处理姓氏分类任务,通过卷积操作捕捉姓氏中局部字符序列的特征,从而提高分类精度。

通过对MLP和CNN的比较,我们可以看到不同模型在处理不同类型数据时的优势和劣势。MLP适用于较为简单和结构化的数据,而CNN在处理具有空间结构的数据时表现出色。这为我们在实际应用中选择合适的模型提供了指导。

总的来说,理解和掌握这些基础的神经网络模型是深入研究深度学习和神经网络的重要基础。希望通过本文的介绍,读者能够对感知机、MLP和CNN有更深入的理解,并能够在实际项目中灵活应用这些模型解决实际问题。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/719451
推荐阅读
相关标签
  

闽ICP备14008679号