赞
踩
多层感知机(Multilayer Perceptron,简称MLP),MLP 是一种最基本的前馈神经网络。它包含一个输入层、一个或多个隐藏层以及一个输出层。每一层都由多个神经元组成,每个神经元与下一层的每个神经元相连。是一种常见的基于前馈神经网络的深度学习模型。
在 MLP 中,每个神经元都使用激活函数来引入非线性特性。常见的激活函数包括 Sigmoid、Tanh 和 ReLU 等。通过多层神经元的组合和激活函数的非线性作用,MLP 能够学习复杂的非线性关系,从而适用于各种机器学习任务
多层感知机在机器学习和深度学习领域被广泛应用,特别是在图像识别、自然语言处理和推荐系统等任务中取得了很好的效果。它的灵活性和能力使得它成为了人工神经网络的基本构建块之一。
其基本的网络架构如下:
图1、MLP框架图
如下图,每个数据点的真正类别为该点的形状,星形或圆形。把错误的分类数据填充为黑色,正确的分类数据不进行填充。图中的这些虚线是每个模型的决策边界。
图2、XOR问题下的感知机与多层感知机结果对比
在XOR异或问题中,上图左边为感知机得到的分类结果,可以看到只得到了一条决策边界,得到的分类结果较差,几乎一半数据点都被标位了黑色填充,对比看右边多层感知机的分类结果,可以看出其分类效果明显比感知机要更好,可以清晰的分出圆和星两类,并且显示MLP有两个决策边界,这是它的优点,但它实际上只是一个决策边界,决策边界就是这样出现的,因为中间表示法改变了空间,使一个超平面同时出现在这两个位置上。
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
-
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
- class Vocabulary(object):
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
- # 创建一个将索引映射到标记的字典
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- # 添加未知标记,并将其索引保存为unk_index
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- return cls(**contents)
-
- def add_token(self, token):
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)

首先,初始化Vocabulary类,其中定义的参数有token_to_idx (dict)为一个将标记映射到其索引的字典。默认为None。add_unk (bool)为是否添加未知标记。默认为True。unk_token (str)为未知标记的字符串表示。默认为"<UNK>"。
这个"Vocabulary"的类,用于构建一个词汇表。词汇表是一个将标记映射到唯一索引的数据结构。这个类提供了初始化一个空的词汇表或者从现有的标记到索引的字典初始化词汇表。再将标记添加到词汇表中,并返回标记在词汇表中对应的索引。并且可以批量添加多个标记到词汇表中,并返回每个标记在词汇表中对应的索引列表。同时可以根据标记查找对应的索引。也可以根据索引查找对应的标记。然后将词汇表对象转化为可序列化的字典,以便保存到文件或进行网络传输。最后从可序列化的字典中重新创建词汇表对象。
- class SurnameVectorizer(object):
- def __init__(self, surname_vocab, nationality_vocab):
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}

定义一个名为"SurnameVectorizer"的类,它用于将姓氏转化为向量形式表示。用"Vocabulary"的词汇表类来管理标记到索引的映射关系,并将姓氏和国籍转化为对应的独热编码向量。首先初始化一个"SurnameVectorizer"对象,需要传入一个"Vocabulary"对象作为姓氏词汇表以及作为国籍词汇表。然后将姓氏转化为独热编码向量方法"vectorize",它会将姓氏中的每个字母根据姓氏词汇表转化为对应的索引,并创建一个全零的向量,将对应的索引位置置为1。
通过从姓氏数据帧中构建"SurnameVectorizer"对象的类方法"from_dataframe",它会遍历数据帧中的每个姓氏和国籍,将姓氏中的每个字母添加到姓氏词汇表中,将国籍添加到国籍词汇表中,并返回一个初始化了词汇表的"SurnameVectorizer"对象。
通过从可序列化的字典中重新创建"SurnameVectorizer"对象的类方法"from_serializable",它会使用可序列化的词汇表来创建一个"SurnameVectorizer"对象。将"SurnameVectorizer"对象转化为可序列化的字典形式的方法"to_serializable",以便保存到文件或进行网络传输。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- return self._vectorizer
-
- def set_split(self, split="train"):
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- row = self._target_df.iloc[index]
-
- surname_vector = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict

通过实现了一个"SurnameDataset"类,用于加载和处理姓氏数据集,并提供了一些用于数据处理和批处理的辅助函数。
首先初始化一个"SurnameDataset"对象,需要传入姓氏数据帧和"SurnameVectorizer"对象作为数据集的向量化器。根据数据帧中的"split"列将数据集划分为训练集、验证集和测试集,并记录每个划分的大小。
定义了一个内部字典"_lookup_dict",用于根据给定的划分获取对应的数据帧和大小。设置目标划分的方法"set_split",用于切换数据集使用的划分。实现了辅助函数"get_num_batches",用于计算给定批大小下的批次数量;类方法"load_dataset_and_make_vectorizer",用于从姓氏数据的CSV文件中加载数据集,并创建对应的向量化器;类方法"load_dataset_and_load_vectorizer",用于从姓氏数据的CSV文件和向量化器的文件路径中加载数据集和向量化器。
辅助函数"generate_batches",用于生成批次数据的生成器,它使用PyTorch的DataLoader来实现批处理和数据加载,并在每个批次中将数据移动到指定的设备。
- class SurnameClassifier(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim):
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector
定义一个姓氏分类MLP模型对姓氏数据进行分类任务。
我们初始化一个"SurnameClassifier"对象,需要传入输入维度、隐藏层维度和输出维度作为参数。在初始化过程中,创建了两个全连接层,分别是self.fc1和self.fc2。其中,self.fc1是输入层到隐藏层的线性变换,self.fc2是隐藏层到输出层的线性变换。
实现了前向传播的方法"forward",接受输入张量x_in,并根据模型的参数进行计算。首先,将输入张量通过self.fc1进行线性变换,并通过ReLU激活函数获得中间向量。然后,将中间向量通过self.fc2进行线性变换,得到预测向量。
在这个简单的两层全连接神经网络模型。输入维度决定了输入张量的大小,隐藏层维度决定了模型的复杂度,输出维度决定了分类的类别数量。
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
这里我们定义一个辅助函数来计算模型的准确率
- def train_model(model, dataset, vectorizer, optimizer, loss_func, num_epochs, batch_size, device):
- model = model.to(device)
- for epoch in range(num_epochs):
- dataset.set_split('train')
- batch_generator = dataset.generate_batches(batch_size=batch_size, device=device)
- running_loss = 0.0
- running_acc = 0.0
- model.train()
- for batch in batch_generator:
- optimizer.zero_grad()
- y_pred = model(batch['x_data'])
- loss = loss_func(y_pred, batch['y_target'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch + 1)
- loss.backward()
- optimizer.step()
- acc_t = compute_accuracy(y_pred, batch['y_target'])
- running_acc += (acc_t - running_acc) / (batch + 1)
- print(f"Epoch {epoch+1}/{num_epochs} - Loss: {running_loss:.4f}, Accuracy: {running_acc:.4f}")

定义模型训练的核心函数,包含前向传播、损失计算、反向传播和参数更新
- def validate_model(model, dataset, vectorizer, loss_func, batch_size, device):
- dataset.set_split('val')
- batch_generator = dataset.generate_batches(batch_size=batch_size, device=device)
- running_loss = 0.0
- running_acc = 0.0
- model.eval()
- with torch.no_grad():
- for batch in batch_generator:
- y_pred = model(batch['x_data'])
- loss = loss_func(y_pred, batch['y_target'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch + 1)
- acc_t = compute_accuracy(y_pred, batch['y_target'])
- running_acc += (acc_t - running_acc) / (batch + 1)
- print(f"Validation - Loss: {running_loss:.4f}, Accuracy: {running_acc:.4f}")
模型参数如下(供参考)
model:要验证的模型
dataset:验证数据集
vectorizer:数据集的向量化器
loss_func:损失函数
batch_size:批大小
device:设备
根据指定的批大小和设备生成一个批次数据的生成器,初始化运行损失和准确率为0.0。
将模型设置为评估模式,通过model.eval()实现。使用torch.no_grad()上下文管理器,禁止梯度计算,以减少内存和计算开销。对于每个批次数据,通过模型进行前向传播并计算预测值。根据预测值和真实标签计算损失,并将损失值累加到running_loss中。根据预测值和真实标签计算准确率,并将准确率累加到running_acc中。最后,输出验证结果,包括平均损失和准确率,通过打印语句实现。
- args = Namespace(
- surname_csv="data/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- cuda=True,
- seed=1337,
- learning_rate=0.001,
- batch_size=64,
- num_epochs=100,
- early_stopping_criteria=5,
- hidden_dim=100,
- )
设置实验的超参数,包括学习率、批量大小、隐藏层维度、训练轮数等
- if not torch.cuda.is_available():
- args.cuda = False
- args.device = torch.device("cuda" if args.cuda else "cpu")
- np.random.seed(args.seed)
- torch.manual_seed(args.seed)
-
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- vectorizer = dataset.get_vectorizer()
- model = MLP(input_dim=len(vectorizer.surname_vocab), hidden_dim=args.hidden_dim, output_dim=len(vectorizer.nationality_vocab))
-
- optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)
- loss_func = nn.CrossEntropyLoss()
-
- train_model(model, dataset, vectorizer, optimizer, loss_func, args.num_epochs, args.batch_size, args.device)
- validate_model(model, dataset, vectorizer, loss_func, args.batch_size, args.device)
通过载入数据集和向量化器,初始化模型和优化器,并执行训练和验证过程,这里不再赘述。
- torch.save(model.state_dict(), args.model_state_file)
- dataset.save_vectorizer(args.vectorizer_file)
训练完成后,保存模型的状态和向量化器,以便后续使用
- # 加载模型状态
- model.load_state_dict(torch.load(args.model_state_file))
- model = model.to(args.device)
-
- # 测试模型
- dataset.set_split('test')
- batch_generator = dataset.generate_batches(batch_size=args.batch_size, device=args.device)
- running_acc = 0.0
- model.eval()
- with torch.no_grad():
- for batch in batch_generator:
- y_pred = model(batch['x_data'])
- acc_t = compute_accuracy(y_pred, batch['y_target'])
- running_acc += (acc_t - running_acc) / (batch + 1)
- print(f"Test Accuracy: {running_acc:.4f}")
加载保存的模型状态和向量化器,并在测试集上进行评估
图3、得到的损失和准确率
- def predict_nationality(model, surname, vectorizer, max_length):
- model.eval()
- vectorized_surname = torch.tensor(vectorizer.vectorize(surname)).unsqueeze(0)
- result = model(vectorized_surname, apply_softmax=True)
- probability_values, indices = result.max(dim=1)
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(indices.item())
- return {'nationality': predicted_nationality, 'probability': probability_values.item()}
-
- # 示例预测
- new_surname = "Smith"
- prediction = predict_nationality(model, new_surname, vectorizer, max_length=20)
- print(f"Surname: {new_surname} -> Nationality: {prediction['nationality']} (Probability: {prediction['probability']:.4f})")
我们用一个函数来使用训练好的模型对一个新的姓氏数据进行预测,以达到对模型的预测效果的观察
图4、预测测试结果
- def get_top_k_predictions(model, surname, vectorizer, k=5):
- model.eval()
- vectorized_surname = torch.tensor(vectorizer.vectorize(surname)).unsqueeze(0)
- result = model(vectorized_surname, apply_softmax=True)
- probability_values, indices = torch.topk(result, k)
- predicted_nationalities = [vectorizer.nationality_vocab.lookup_index(idx) for idx in indices[0].tolist()]
- probabilities = probability_values[0].tolist()
- return list(zip(predicted_nationalities, probabilities))
-
- # 示例获取前K个预测
- top_k_predictions = get_top_k_predictions(model, "Smith", vectorizer, k=5)
- for nationality, probability in top_k_predictions:
- print(f"Nationality: {nationality} (Probability: {probability:.4f})")
把输入的姓氏向量化为张量形式,并添加额外的维度,然后将向量化后的姓氏通过模型进行前向传播,并应用Softmax函数处理预测结果,通过model(vectorized_surname, apply_softmax=True)实现。
使用torch.topk函数获取预测结果中概率值最高的前K个值,并返回这些概率值和对应的索引。
根据索引通过向量化器的国籍词汇表,将索引转换为对应的国籍标签。将预测的国籍标签和概率值组合成元组的列表,并返回结果。
在示例中,我们调用get_top_k_predictions函数,以姓氏"Smith"为例,获取模型对该姓氏的前5个预测结果。然后,使用循环遍历结果列表,并打印每个预测的国籍和对应的概率值。这个函数可以用于获取模型对给定姓氏的前K个预测结果,并提供相应的国籍标签和概率值
虽然 MLP 在许多任务上表现出色,但在处理大规模高维数据集时可能会受到限制。此时,其他类型的神经网络结构,如卷积神经网络(CNN),可能更适合处理特定类型的数据和任务
图5、二维卷积图像
CNN的基本结构由输入层、卷积层、池化层、全连接层及输出层构成。卷积层和池化层一般会取若干个,采用卷积层和池化层交替设置,即一个卷积层连接一个池化层,池化层后再连接一个卷积层,依此类推。由于卷积层中输出特征图的每个神经元与其输入进行局部连接,并通过对应的连接权值与局部输入进行加权求和再加上偏置值,得到该神经元输入值,该过程等同于卷积过程,CNN也由此而得名[6]
图6、卷积层 – 池化层- 卷积层 – 池化层 – 卷积层 – 全连接层(例)
1、卷积神经网络能够自动学习输入数据的特征,无需手动设计。
2、卷积操作的参数共享,减少了模型参数数量,这样可以降低过拟合风险。
3、卷积神经网络的深层结构可以能够逐渐抽象学习更高层次的特征。
4、适用于大规模数据集,能够处理大量的数据训练。
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
首先导入需要用到的库,argparse、collections、json、os、numpy、pandas、torch等
- class Vocabulary(object):
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- return cls(**contents)
-
- def add_token(self, token):
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)

定义一个Vocabulary类。这个类用于处理文本数据并提取词汇表以进行映射。该类具有添加单词、查找单词索引等功能。__init__是初始化方法,可以传入一个预先存在的token_to_idx映射,也可以选择UNK标记。to_serializable是将Vocabulary对象转换为可序列化的字典形式。from_serializable能从可序列化的字典中实例化Vocabulary对象。add_token是根据token更新映射字典,并返回对应的索引。add_many是将一个字符串列表中的多个token添加到Vocabulary中,并返回对应的索引列表。lookup_token是根据token查找对应的索引,如果token不存在,则返回UNK的索引。而lookup_index是根据索引查找对应的token,如果索引不存在,则抛出KeyError异常。最后__str__可返回Vocabulary对象的字符串表示,__len__可返回Vocabulary对象的大小。
- class SurnameVectorizer(object):
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
- self._max_surname_length = max_surname_length
-
- def vectorize(self, surname):
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable(),
- 'max_surname_length': self._max_surname_length}

定义一个SurnameVectorizer类。这个类是一个向量化器。该类协调使用Vocabularies并进行向量化处理。它将姓氏和国籍映射为整数,并生成一个独热编码矩阵。首先需要传入一个姓氏的Vocabulary对象、一个国籍的Vocabulary对象和最长姓氏的长度。再将输入的姓氏字符串转换为一个one-hot编码的矩阵。这个from_dataframe使从姓氏的数据集DataFrame实例化Vectorizer对象。
from_serializable是从可序列化的字典中实例化Vectorizer对象。最后to_serializable是将Vectorizer对象转换为可序列化的字典形式。
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- self.surname_df = surname_df
- self._vectorizer = vectorizer
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- return self._vectorizer
-
- def set_split(self, split="train"):
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- row = self._target_df.iloc[index]
-
- surname_matrix = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict

定义一个SurnameDataset的类。这个类是一个自定义的数据集类,用于加载和处理数据集。在类的初始化方法中,传入了姓氏数据框和一个Vectorizer对象。然后根据数据框中的拆分列将数据分为训练集、验证集和测试集,并记录每个数据集的大小。该类还定义了一个_lookup_dict字典,用于根据split参数获取相应的数据集和大小。set_split方法用于设置当前要使用的数据集。通过读取姓氏数据框和向量化器文件路径来创建SurnamesDataset对象。其中load_vectorizer_only和save_vectorizer函数用于单独加载和保存Vectorizer对象,而get_vectorizer函数返回当前数据集使用的Vectorizer对象,__len__函数返回当前数据集的大小。然后用__getitem__根据索引返回对应的数据。它首先将姓氏向量化为矩阵,然后查找国籍对应的索引,最后返回一个包含姓氏矩阵和国籍索引的字典,get_num_batches函数则根据批次大小返回批次数。最后generate_batches函数使用DataLoader类来生成批次数据。它将数据加载到指定的设备上,并使用yield关键字返回批次数据。
- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- super(SurnameClassifier, self).__init__()
-
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- nn.ELU()
- )
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- features = self.convnet(x_surname).squeeze(dim=2)
-
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

我们定义一个SurnameClassifier类,用于实现姓氏分类的卷积神经网络模型。初始化时,输入了初始通道数、类别数和通道数。然后定义了一个包含多个卷积层和激活函数的卷积神经网络。最后,定义了一个全连接层用于输出最终的预测结果。forward函数定义了前向传播的过程。它首先将输入的姓氏数据经过卷积神经网络得到特征,然后通过全连接层得到预测向量。apply_softmax参数如果为True,则对预测向量应用softmax函数进行归一化处理。
- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
该函数用于创建一个保存训练状态的字典。
- def update_train_state(args, model, train_state):
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state

该函数用于处理训练状态的更新。函数的作用包括早期停止,为了防止过拟合,如果验证集的损失值在连续的若干个epoch中没有改善,就停止训练。也包括模型检查点,如果模型的性能提高了,则保存模型。
- def compute_accuracy(y_pred, y_target):
- y_pred_indices = y_pred.max(dim=1)[1]
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
定义函数compute_accuracy,用于计算模型的准确率。函数的输入参数包括预测值y_pred和目标值y_target。
- args = Namespace(
- # Data and Path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/cnn",
- # Model hyper parameters
- hidden_dim=100,
- num_channels=256,
- # Training hyper parameters
- seed=1337,
- learning_rate=0.001,
- batch_size=128,
- num_epochs=100,
- early_stopping_criteria=5,
- dropout_p=0.1,
- # Runtime options
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- catch_keyboard_interrupt=True
- )
-
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # Check CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
- # Set seed for reproducibility
- set_seed_everywhere(args.seed, args.cuda)
-
- # handle dirs
- handle_dirs(args.save_dir)

其中包含了各种数据和路径信息、模型超参数、训练超参数和运行时选项。hidden_dim为隐藏层的维度。num_channels为卷积层的输出通道数。learning_rate为学习率。batch_size为批大小。
num_epochs为训练的总epoch数。dropout_p为Dropout层的概率。
- if args.reload_from_files:
- # training from a checkpoint
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # create dataset and vectorizer
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- vectorizer = dataset.get_vectorizer()
-
- classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab),
- num_classes=len(vectorizer.nationality_vocab),
- num_channels=args.num_channels)
-
- classifer = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- train_state = make_train_state(args)
-
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")
-
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
-
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))

首先根据args.reload_from_files的值,决定是从文件中重新加载模型还是创建新的数据集和向量化器。然后,通过dataset.get_vectorizer()获取向量化器对象,并使用向量化器的属性初始化SurnameClassifier分类器。接下来,定义损失函数loss_func为交叉熵损失函数,其中使用数据集的类别权重进行加权。定义优化器optimizer为Adam优化器,将分类器的参数传递给优化器。定义学习率调度器scheduler为ReduceLROnPlateau调度器,用于在验证集上监测损失的改善情况,并动态调整学习率。
训练时使用tqdm_notebook创建了三个进度条,分别用于整体epoch进度、训练数据集进度和验证数据集进度。通过一个for循环迭代args.num_epochs次,对每个epoch进行训练和验证。通过update_train_state函数更新训练状态,包括保存模型和判断是否提前停止训练。最后,调用scheduler.step根据验证集上的损失调整学习率。如果train_state['stop_early']为True,则提前结束训练。
最后评估模型在测试数据集上的性能,并打印测试集的损失和准确率。首先,使用torch.load函数将保存的模型状态字典加载到分类器中。再将分类器移动到指定的设备,并将数据集的类别权重也移动到相同的设备。然后,定义损失函数,设置测试集,生成测试数据集的批次数据。初始化损失和准确率为0,循环执行以下操作:计算输出,计算损失,计算准确率,更新测试集的损失和准确率。将最终的测试集损失和准确率存储在train_state字典中的相应键中。
图7、模型性能指标
- def predict_nationality(surname, classifier, vectorizer):
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- result = classifier(vectorized_surname, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
predict_nationality函数用于根据给定的姓氏预测其国籍。首先,将输入的姓氏使用矢量化器进行矢量化,并将其转换为PyTorch张量并添加一个维度。然后,通过将矢量化的姓氏输入到分类器中,得到预测结果。使用max函数找到预测结果中概率最大的值和对应的索引。将索引转换为相应的国籍标签,将概率值转换为标量值。最后,将预测的国籍和概率值存储在一个字典中,并返回该字典作为函数的输出。
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.cpu()
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))
-
- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
-
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # returned size is 1,k
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- results = []
- for kth_index in range(k):
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- probability_value = probability_values[kth_index]
- results.append({'nationality': nationality,
- 'probability': probability_value})
- return results
-
- new_surname = input("Enter a surname to classify: ")
-
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

此函数用于从给定的姓氏预测出前K个可能的国籍。首先,将输入的姓氏使用矢量化器进行矢量化接下来,使用循环遍历前K个结果,将每个结果的索引转换为相应的国籍标签,将概率值存储在结果字典中,并将结果字典添加到结果列表中。接下来,代码从用户输入中获取一个姓氏,并将其存储在new_surname变量中。然后,从用户输入中获取要返回的前K个预测结果的数量,并将其存储在变量k中。如果用户输入的k大于国籍词汇表的大小,则将其设置为国籍词汇表的大小。再调用predict_topk_nationality函数,传入用户输入的姓氏、分类器、矢量化器和k作为参数,得到预测结果。最后,使用print函数打印前K个预测结果。
用户通过input函数提示输入一个姓氏,并将输入的姓氏存储在new_surname变量中。接下来,将分类器移动到CPU设备上,使用cpu方法。然后,调用predict_nationality函数,传入用户输入的姓氏、分类器和矢量化器作为参数,得到预测结果。最后,使用print函数打印姓氏、预测的国籍和概率值。
图8、预测结果
当使用MLP实现姓氏分类时,通常会将姓氏作为输入,并通过一系列全连接层和非线性激活函数来学习特征表示和分类决策。MLP是一种经典的前馈神经网络,其隐藏层的节点可以捕捉输入的不同特征,并通过输出层的softmax函数将输入映射到不同的类别上。MLP通常需要手动设计特征提取器和分类器的结构,并使用反向传播算法来优化模型参数。
当使用CNN实现姓氏分类时,可以利用CNN在图像处理领域的强大能力。将姓氏表示为字符级别的图像,然后通过卷积层、池化层和全连接层来学习特征表示和分类决策。CNN可以自动学习输入中的局部特征,并通过层级结构将这些特征组合起来进行分类。在姓氏分类中,CNN可以有效地捕捉字符之间的空间关系和重要特征。
MLP和CNN在姓氏分类中的主要区别在于它们的网络结构和工作原理。MLP更适合处理较简单的结构化数据,而CNN则更适合处理图像或具有空间关系的数据。MLP需要手动设计特征提取器和分类器的结构,而CNN可以自动学习特征。此外,CNN通常具有更好的参数共享和平移不变性,使其在处理图像等领域具有优势。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。