赞
踩
多层感知机MLP(Multilayer Perceptron),也是人工神经网络(ANN,Artificial Neural Network),是一种全连接(全连接:MLP由多个神经元按照层次结构组成,每个神经元都与上一层的所有神经元相连)的前馈神经网络模型
输入层(Input Layer):
接受输入数据,通常是一维向量,每个输入节点对应一个特征。
隐藏层(Hidden Layers):
一层或多层的神经元,每个神经元连接到上一层的所有神经元,并通过权重进行加权求和。
隐藏层中的每个神经元都应用一个非线性激活函数(如ReLU、sigmoid、tanh),以引入非线性特性。
输出层(Output Layer):
生成最终的预测结果,输出层的神经元数量取决于具体的任务,例如分类任务中的类别数量。
权重和偏置(Weights and Biases):
每条连接都有一个权重,神经元还有一个偏置项,这些参数在训练过程中通过反向传播算法进行优化。
激活函数(Activation Function):
激活函数用于引入非线性特性,常用的激活函数包括ReLU、sigmoid、tanh等。
损失函数(Loss Function):
损失函数用于衡量模型预测与真实标签之间的差距,常见的损失函数有均方误差(MSE)、交叉熵损失等。
反向传播(Backpropagation):
反向传播是一种通过梯度下降优化权重和偏置的算法,它根据损失函数的梯度更新模型参数,使得模型在训练数据上的预测更加准确。
前向传播(Forward Propagation):
输入数据经过各层神经元的加权求和和激活函数,逐层传递到输出层,生成预测结果。
计算损失(Calculate Loss):
根据预测结果和真实标签计算损失值。
反向传播和参数更新(Backpropagation and Parameter Update):
通过反向传播算法计算损失函数相对于权重和偏置的梯度,并使用优化算法(如梯度下降)更新参数。
激活函数(如 ReLU、sigmoid、tanh)引入了非线性,使得 MLP 能够捕捉和表示输入数据中的复杂非线性关系。通过多个隐藏层,MLP 能够逐层提取和组合数据的特征,每一层都在前一层特征的基础上构建更高级的表示。多层结构和非线性激活函数共同作用,使得 MLP 具有强大的表示能力,可以表示复杂的决策边界和数据模式。可以解决下图这类非线性且高维的边界决策。
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集有一些使其有趣的属性。第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。第二个特点是,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“O ‘Neill”、“Antonopoulos”、“Nagasawa”或“Zhu”)。
为了创建最终的数据集,我们从一个比补充材料中包含的版本处理更少的版本开始,并执行了几个数据集修改操作。第一个目的是减少这种不平衡——原始数据集中70%以上是俄文,这可能是由于抽样偏差或俄文姓氏的增多。为此,我们通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本。接下来,我们根据国籍对数据集进行分组,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
- import collections
- import numpy as np
- import pandas as pd
- import re
-
- from argparse import Namespace
-
- args = Namespace(
- raw_dataset_csv="data/surnames/surnames.csv",
- train_proportion=0.7,
- val_proportion=0.15,
- test_proportion=0.15,
- output_munged_csv="data/surnames/surnames_with_splits.csv",
- seed=1337
- )
-
- # Read raw data
- surnames = pd.read_csv(args.raw_dataset_csv, header=0)
-
- surnames.head()
-
- # Unique classes
- set(surnames.nationality)
-
- # Splitting train by nationality
- # Create dict
- by_nationality = collections.defaultdict(list)
- for _, row in surnames.iterrows():
- by_nationality[row.nationality].append(row.to_dict())
-
- # Create split data
- final_list = []
- np.random.seed(args.seed)
- # 按国籍遍历分类后的数据
- for _, item_list in sorted(by_nationality.items()):
- np.random.shuffle(item_list)# 随机打乱数据列表
- n = len(item_list)# 数据总数
- n_train = int(args.train_proportion*n)# 训练集大小
- n_val = int(args.val_proportion*n) # 验证集大小
- n_test = int(args.test_proportion*n)# 测试集大小
-
- # Give data point a split attribute
- # 给数据点添加分割属性
- for item in item_list[:n_train]:
- item['split'] = 'train'# 标记为训练集
- for item in item_list[n_train:n_train+n_val]:
- item['split'] = 'val'# 标记为验证集
- for item in item_list[n_train+n_val:]:
- item['split'] = 'test' # 标记为测试集
-
- # Add to final list
- final_list.extend(item_list)
-
- # Write split data to file
- # 将数据写入CSV文件
- final_surnames = pd.DataFrame(final_list)
-
- final_surnames.split.value_counts()
- final_surnames.head()
- # Write munged data to CSV
- final_surnames.to_csv(args.output_munged_csv, index=False)

划分后的前5行数据,训练集、验证集、测试集数量
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
-
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
- class Vocabulary(object):
- """Class to process text and extract vocabulary for mapping"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- Args:
- token_to_idx (dict): a pre-existing map of tokens to indices
- add_unk (bool): a flag that indicates whether to add the UNK token
- unk_token (str): the UNK token to add into the Vocabulary
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- """ returns a dictionary that can be serialized """
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """ instantiates the Vocabulary from a serialized dictionary """
- return cls(**contents)
-
- def add_token(self, token):
- """Update mapping dicts based on the token.
- Args:
- token (str): the item to add into the Vocabulary
- Returns:
- index (int): the integer corresponding to the token
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """Add a list of tokens into the Vocabulary
-
- Args:
- tokens (list): a list of string tokens
- Returns:
- indices (list): a list of indices corresponding to the tokens
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """Retrieve the index associated with the token
- or the UNK index if token isn't present.
-
- Args:
- token (str): the token to look up
- Returns:
- index (int): the index corresponding to the token
- Notes:
- `unk_index` needs to be >=0 (having been added into the Vocabulary)
- for the UNK functionality
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """Return the token associated with the index
-
- Args:
- index (int): the index to look up
- Returns:
- token (str): the token corresponding to the index
- Raises:
- KeyError: if the index is not in the Vocabulary
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)

- class SurnameVectorizer(object):
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
- def __init__(self, surname_vocab, nationality_vocab):
- """
- Args:
- surname_vocab (Vocabulary): maps characters to integers
- nationality_vocab (Vocabulary): maps nationalities to integers
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
-
- def vectorize(self, surname):
- """
- Args:
- surname (str): the surname
- Returns:
- one_hot (np.ndarray): a collapsed one-hot encoding
- """
- vocab = self.surname_vocab
- one_hot = np.zeros(len(vocab), dtype=np.float32)
- for token in surname:
- one_hot[vocab.lookup_token(token)] = 1
-
- return one_hot
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """Instantiate the vectorizer from the dataset dataframe
-
- Args:
- surname_df (pandas.DataFrame): the surnames dataset
- Returns:
- an instance of the SurnameVectorizer
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
-
- for index, row in surname_df.iterrows():
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable()}

- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- surname_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
-
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameVectorizer
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's:
- features (x_surname)
- label (y_nationality)
- """
- row = self._target_df.iloc[index]
-
- surname_vector = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_vector,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict

- class SurnameClassifier(nn.Module):
- """ A 2-layer Multilayer Perceptron for classifying surnames """
- def __init__(self, input_dim, hidden_dim, output_dim):
- """
- Args:
- input_dim (int): the size of the input vectors
- hidden_dim (int): the output size of the first Linear layer
- output_dim (int): the output size of the second Linear layer
- """
- super(SurnameClassifier, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.fc2 = nn.Linear(hidden_dim, output_dim)
-
- def forward(self, x_in, apply_softmax=False):
- """The forward pass of the classifier
-
- Args:
- x_in (torch.Tensor): an input data tensor.
- x_in.shape should be (batch, input_dim)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, output_dim)
- """
- intermediate_vector = F.relu(self.fc1(x_in))
- prediction_vector = self.fc2(intermediate_vector)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
-
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state
-
- def compute_accuracy(y_pred, y_target):
- _, y_pred_indices = y_pred.max(dim=1)
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100

- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
- args = Namespace(
- # Data and path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/surname_mlp",
- # Model hyper parameters
- hidden_dim=300,
- # Training hyper parameters
- seed=1337,
- num_epochs=100,
- early_stopping_criteria=5,
- learning_rate=0.001,
- batch_size=64,
- # Runtime options
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- )
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # Check CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
-
- print("Using CUDA: {}".format(args.cuda))
-
-
- # Set seed for reproducibility
- set_seed_everywhere(args.seed, args.cuda)
-
- # handle dirs
- handle_dirs(args.save_dir)

- if args.reload_from_files:
- # training from a checkpoint
- print("Reloading!")
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # create dataset and vectorizer
- print("Creating fresh!")
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- vectorizer = dataset.get_vectorizer()
- classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
- hidden_dim=args.hidden_dim,
- output_dim=len(vectorizer.nationality_vocab))
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
-
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- train_state = make_train_state(args)
-
- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.to("cpu").item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")

损失、准确率计算函数
- # compute the loss & accuracy on the test set using the best available model
-
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))

模型训练结果及训练损失和准确率
- def predict_nationality(surname, classifier, vectorizer):
- """Predict the nationality from a new surname
-
- Args:
- surname (str): the surname to classifier
- classifier (SurnameClassifer): an instance of the classifier
- vectorizer (SurnameVectorizer): the corresponding vectorizer
- Returns:
- a dictionary with the most likely nationality and its probability
- """
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).view(1, -1)
- result = classifier(vectorized_surname, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

示例
在NLP等领域,通常不仅需要关注最佳预测结果,还需要考虑更多预测的情况。一个常见的做法是采用k-best预测,然后利用另一个模型对这些预测进行重新排序。PyTorch提供了一个便捷的函数torch.topk,可用于获取这些预测中的前k个最佳结果。
- vectorizer.nationality_vocab.lookup_index(8)
- def predict_topk_nationality(name, classifier, vectorizer, k=5):
- vectorized_name = vectorizer.vectorize(name)
- vectorized_name = torch.tensor(vectorized_name).view(1, -1)
- prediction_vector = classifier(vectorized_name, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # returned size is 1,k
- probability_values = probability_values.detach().numpy()[0]
- indices = indices.detach().numpy()[0]
-
- results = []
- for prob_value, index in zip(probability_values, indices):
- nationality = vectorizer.nationality_vocab.lookup_index(index)
- results.append({'nationality': nationality,
- 'probability': prob_value})
-
- return results
-
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.to("cpu")
-
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

卷积神经网络(Convolutional Neural Network,简称CNN)是一种深度学习模型,广泛应用于图像和视频识别、自然语言处理等领域。它的主要特点是通过卷积层(Convolutional Layer)和池化层(Pooling Layer)来提取特征,从而减少参数的数量和计算的复杂度。
卷积层(Convolutional Layer):
通过卷积操作,将图像或输入数据与若干个卷积核(filter)进行卷积,生成特征图(feature map)。
每个卷积核在图像上滑动,提取局部区域的信息,通过共享参数(即同一个卷积核在不同位置使用相同的参数),大大减少了参数的数量。
激活函数(Activation Function):
常用的激活函数是ReLU(Rectified Linear Unit),即将输入中的负值置为零,保持正值不变。
激活函数引入了非线性,使得神经网络可以学习到更复杂的特征。
池化层(Pooling Layer):
通过降采样(如最大池化Max Pooling或平均池化Average Pooling),减小特征图的尺寸,从而减少计算量和过拟合的风险。
最大池化是取局部区域中的最大值,平均池化是取局部区域的平均值。
全连接层(Fully Connected Layer):
将卷积层和池化层提取到的特征图展平(flatten)并输入到全连接层进行分类或回归。
全连接层与传统的神经网络相似,每个神经元与上一层的所有神经元相连接。
损失函数(Loss Function)和优化器(Optimizer):
损失函数用于衡量模型预测值与真实值之间的差距,常用的损失函数有交叉熵损失(Cross-Entropy Loss)和均方误差(Mean Squared Error)。
优化器用于更新模型的参数,常用的优化器有随机梯度下降(SGD)、Adam等。
正则化(Regularization):
为了防止过拟合,常用的正则化技术有Dropout和L2正则化。
通道(Channel)
非正式地,通道(channel)是指沿输入中的每个点的特征维度。在图像中,对应于RGB组件的图像中的每个像素有三个通道。在使用卷积时,文本数据也可以采用类似的概念。从概念上讲,如果文本文档中的“像素”是单词,那么通道的数量就是词汇表的大小。如果我们更细粒度地考虑字符的卷积,通道的数量就是字符集的大小(在本例中刚好是词汇表)。在PyTorch卷积实现中,输入通道的数量是in_channels参数。卷积操作可以在输出(out_channels)中产生多个通道。可以将其视为卷积运算符将输入特征维“映射”到输出特征维。
核大小(Kernel Size)
核矩阵的宽度称为核大小(PyTorch中的kernel_size
)。
局部感知:通过卷积操作,CNN能够自动提取局部特征,适合处理图像数据。
参数共享:通过共享卷积核的参数,CNN大大减少了模型的参数数量,提升了计算效率。
平移不变性:卷积操作具有平移不变性,即在输入图像平移的情况下,卷积层的输出不变。
图像分类:如手写数字识别(MNIST)、物体识别(ImageNet)。
目标检测:如人脸检测、行人检测。
语义分割:如医学图像分割。
视频分析:如动作识别。
自然语言处理:如文本分类、情感分析
多层感知器(MLP)适合处理结构化的低维数据,通过全连接层逐层传递信息,但对高维数据效果较差。卷积神经网络(CNN)专为处理高维数据如图像和视频设计,通过卷积和池化层提取局部特征,参数更少,性能更强。简单来说,MLP适合处理平坦的表格数据,CNN则擅长处理带有空间结构的图像和视频数据。
- from argparse import Namespace
- from collections import Counter
- import json
- import os
- import string
-
- import numpy as np
- import pandas as pd
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- from tqdm import tqdm_notebook
- class Vocabulary(object):
- """Class to process text and extract vocabulary for mapping"""
-
- def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
- """
- Args:
- token_to_idx (dict): a pre-existing map of tokens to indices
- add_unk (bool): a flag that indicates whether to add the UNK token
- unk_token (str): the UNK token to add into the Vocabulary
- """
-
- if token_to_idx is None:
- token_to_idx = {}
- self._token_to_idx = token_to_idx
-
- self._idx_to_token = {idx: token
- for token, idx in self._token_to_idx.items()}
-
- self._add_unk = add_unk
- self._unk_token = unk_token
-
- self.unk_index = -1
- if add_unk:
- self.unk_index = self.add_token(unk_token)
-
-
- def to_serializable(self):
- """ returns a dictionary that can be serialized """
- return {'token_to_idx': self._token_to_idx,
- 'add_unk': self._add_unk,
- 'unk_token': self._unk_token}
-
- @classmethod
- def from_serializable(cls, contents):
- """ instantiates the Vocabulary from a serialized dictionary """
- return cls(**contents)
-
- def add_token(self, token):
- """Update mapping dicts based on the token.
- Args:
- token (str): the item to add into the Vocabulary
- Returns:
- index (int): the integer corresponding to the token
- """
- try:
- index = self._token_to_idx[token]
- except KeyError:
- index = len(self._token_to_idx)
- self._token_to_idx[token] = index
- self._idx_to_token[index] = token
- return index
-
- def add_many(self, tokens):
- """Add a list of tokens into the Vocabulary
-
- Args:
- tokens (list): a list of string tokens
- Returns:
- indices (list): a list of indices corresponding to the tokens
- """
- return [self.add_token(token) for token in tokens]
-
- def lookup_token(self, token):
- """Retrieve the index associated with the token
- or the UNK index if token isn't present.
-
- Args:
- token (str): the token to look up
- Returns:
- index (int): the index corresponding to the token
- Notes:
- `unk_index` needs to be >=0 (having been added into the Vocabulary)
- for the UNK functionality
- """
- if self.unk_index >= 0:
- return self._token_to_idx.get(token, self.unk_index)
- else:
- return self._token_to_idx[token]
-
- def lookup_index(self, index):
- """Return the token associated with the index
-
- Args:
- index (int): the index to look up
- Returns:
- token (str): the token corresponding to the index
- Raises:
- KeyError: if the index is not in the Vocabulary
- """
- if index not in self._idx_to_token:
- raise KeyError("the index (%d) is not in the Vocabulary" % index)
- return self._idx_to_token[index]
-
- def __str__(self):
- return "<Vocabulary(size=%d)>" % len(self)
-
- def __len__(self):
- return len(self._token_to_idx)
- class SurnameVectorizer(object):
- """ The Vectorizer which coordinates the Vocabularies and puts them to use"""
- def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
- """
- Args:
- surname_vocab (Vocabulary): maps characters to integers
- nationality_vocab (Vocabulary): maps nationalities to integers
- max_surname_length (int): the length of the longest surname
- """
- self.surname_vocab = surname_vocab
- self.nationality_vocab = nationality_vocab
- self._max_surname_length = max_surname_length
-
- def vectorize(self, surname):
- """
- Args:
- surname (str): the surname
- Returns:
- one_hot_matrix (np.ndarray): a matrix of one-hot vectors
- """
-
- one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
- one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
-
- for position_index, character in enumerate(surname):
- character_index = self.surname_vocab.lookup_token(character)
- one_hot_matrix[character_index][position_index] = 1
-
- return one_hot_matrix
-
- @classmethod
- def from_dataframe(cls, surname_df):
- """Instantiate the vectorizer from the dataset dataframe
-
- Args:
- surname_df (pandas.DataFrame): the surnames dataset
- Returns:
- an instance of the SurnameVectorizer
- """
- surname_vocab = Vocabulary(unk_token="@")
- nationality_vocab = Vocabulary(add_unk=False)
- max_surname_length = 0
-
- for index, row in surname_df.iterrows():
- max_surname_length = max(max_surname_length, len(row.surname))
- for letter in row.surname:
- surname_vocab.add_token(letter)
- nationality_vocab.add_token(row.nationality)
-
- return cls(surname_vocab, nationality_vocab, max_surname_length)
-
- @classmethod
- def from_serializable(cls, contents):
- surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
- nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
- return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
- max_surname_length=contents['max_surname_length'])
-
- def to_serializable(self):
- return {'surname_vocab': self.surname_vocab.to_serializable(),
- 'nationality_vocab': self.nationality_vocab.to_serializable(),
- 'max_surname_length': self._max_surname_length}
- class SurnameDataset(Dataset):
- def __init__(self, surname_df, vectorizer):
- """
- Args:
- name_df (pandas.DataFrame): the dataset
- vectorizer (SurnameVectorizer): vectorizer instatiated from dataset
- """
- self.surname_df = surname_df
- self._vectorizer = vectorizer
- self.train_df = self.surname_df[self.surname_df.split=='train']
- self.train_size = len(self.train_df)
-
- self.val_df = self.surname_df[self.surname_df.split=='val']
- self.validation_size = len(self.val_df)
-
- self.test_df = self.surname_df[self.surname_df.split=='test']
- self.test_size = len(self.test_df)
-
- self._lookup_dict = {'train': (self.train_df, self.train_size),
- 'val': (self.val_df, self.validation_size),
- 'test': (self.test_df, self.test_size)}
-
- self.set_split('train')
-
- # Class weights
- class_counts = surname_df.nationality.value_counts().to_dict()
- def sort_key(item):
- return self._vectorizer.nationality_vocab.lookup_token(item[0])
- sorted_counts = sorted(class_counts.items(), key=sort_key)
- frequencies = [count for _, count in sorted_counts]
- self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
-
-
- @classmethod
- def load_dataset_and_make_vectorizer(cls, surname_csv):
- """Load dataset and make a new vectorizer from scratch
-
- Args:
- surname_csv (str): location of the dataset
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- train_surname_df = surname_df[surname_df.split=='train']
- return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
-
- @classmethod
- def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
- """Load dataset and the corresponding vectorizer.
- Used in the case in the vectorizer has been cached for re-use
-
- Args:
- surname_csv (str): location of the dataset
- vectorizer_filepath (str): location of the saved vectorizer
- Returns:
- an instance of SurnameDataset
- """
- surname_df = pd.read_csv(surname_csv)
- vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
- return cls(surname_df, vectorizer)
-
- @staticmethod
- def load_vectorizer_only(vectorizer_filepath):
- """a static method for loading the vectorizer from file
-
- Args:
- vectorizer_filepath (str): the location of the serialized vectorizer
- Returns:
- an instance of SurnameDataset
- """
- with open(vectorizer_filepath) as fp:
- return SurnameVectorizer.from_serializable(json.load(fp))
-
- def save_vectorizer(self, vectorizer_filepath):
- """saves the vectorizer to disk using json
-
- Args:
- vectorizer_filepath (str): the location to save the vectorizer
- """
- with open(vectorizer_filepath, "w") as fp:
- json.dump(self._vectorizer.to_serializable(), fp)
-
- def get_vectorizer(self):
- """ returns the vectorizer """
- return self._vectorizer
-
- def set_split(self, split="train"):
- """ selects the splits in the dataset using a column in the dataframe """
- self._target_split = split
- self._target_df, self._target_size = self._lookup_dict[split]
-
- def __len__(self):
- return self._target_size
-
- def __getitem__(self, index):
- """the primary entry point method for PyTorch datasets
-
- Args:
- index (int): the index to the data point
- Returns:
- a dictionary holding the data point's features (x_data) and label (y_target)
- """
- row = self._target_df.iloc[index]
-
- surname_matrix = \
- self._vectorizer.vectorize(row.surname)
-
- nationality_index = \
- self._vectorizer.nationality_vocab.lookup_token(row.nationality)
-
- return {'x_surname': surname_matrix,
- 'y_nationality': nationality_index}
-
- def get_num_batches(self, batch_size):
- """Given a batch size, return the number of batches in the dataset
-
- Args:
- batch_size (int)
- Returns:
- number of batches in the dataset
- """
- return len(self) // batch_size
-
-
- def generate_batches(dataset, batch_size, shuffle=True,
- drop_last=True, device="cpu"):
- """
- A generator function which wraps the PyTorch DataLoader. It will
- ensure each tensor is on the write device location.
- """
- dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
- shuffle=shuffle, drop_last=drop_last)
-
- for data_dict in dataloader:
- out_data_dict = {}
- for name, tensor in data_dict.items():
- out_data_dict[name] = data_dict[name].to(device)
- yield out_data_dict

- class SurnameClassifier(nn.Module):
- def __init__(self, initial_num_channels, num_classes, num_channels):
- """
- Args:
- initial_num_channels (int): size of the incoming feature vector
- num_classes (int): size of the output prediction vector
- num_channels (int): constant channel size to use throughout network
- """
- super(SurnameClassifier, self).__init__()
-
- self.convnet = nn.Sequential(
- nn.Conv1d(in_channels=initial_num_channels,
- out_channels=num_channels, kernel_size=3),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3, stride=2),
- nn.ELU(),
- nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
- kernel_size=3),
- nn.ELU()
- )
- self.fc = nn.Linear(num_channels, num_classes)
-
- def forward(self, x_surname, apply_softmax=False):
- """The forward pass of the classifier
-
- Args:
- x_surname (torch.Tensor): an input data tensor.
- x_surname.shape should be (batch, initial_num_channels, max_surname_length)
- apply_softmax (bool): a flag for the softmax activation
- should be false if used with the Cross Entropy losses
- Returns:
- the resulting tensor. tensor.shape should be (batch, num_classes)
- """
- features = self.convnet(x_surname).squeeze(dim=2)
-
- prediction_vector = self.fc(features)
-
- if apply_softmax:
- prediction_vector = F.softmax(prediction_vector, dim=1)
-
- return prediction_vector

- def make_train_state(args):
- return {'stop_early': False,
- 'early_stopping_step': 0,
- 'early_stopping_best_val': 1e8,
- 'learning_rate': args.learning_rate,
- 'epoch_index': 0,
- 'train_loss': [],
- 'train_acc': [],
- 'val_loss': [],
- 'val_acc': [],
- 'test_loss': -1,
- 'test_acc': -1,
- 'model_filename': args.model_state_file}
- def update_train_state(args, model, train_state):
- """Handle the training state updates.
- Components:
- - Early Stopping: Prevent overfitting.
- - Model Checkpoint: Model is saved if the model is better
- :param args: main arguments
- :param model: model to train
- :param train_state: a dictionary representing the training state values
- :returns:
- a new train_state
- """
-
- # Save one model at least
- if train_state['epoch_index'] == 0:
- torch.save(model.state_dict(), train_state['model_filename'])
- train_state['stop_early'] = False
-
- # Save model if performance improved
- elif train_state['epoch_index'] >= 1:
- loss_tm1, loss_t = train_state['val_loss'][-2:]
-
- # If loss worsened
- if loss_t >= train_state['early_stopping_best_val']:
- # Update step
- train_state['early_stopping_step'] += 1
- # Loss decreased
- else:
- # Save the best model
- if loss_t < train_state['early_stopping_best_val']:
- torch.save(model.state_dict(), train_state['model_filename'])
-
- # Reset early stopping step
- train_state['early_stopping_step'] = 0
-
- # Stop early ?
- train_state['stop_early'] = \
- train_state['early_stopping_step'] >= args.early_stopping_criteria
-
- return train_state

- def compute_accuracy(y_pred, y_target):
- y_pred_indices = y_pred.max(dim=1)[1]
- n_correct = torch.eq(y_pred_indices, y_target).sum().item()
- return n_correct / len(y_pred_indices) * 100
-
- args = Namespace(
- # Data and Path information
- surname_csv="data/surnames/surnames_with_splits.csv",
- vectorizer_file="vectorizer.json",
- model_state_file="model.pth",
- save_dir="model_storage/ch4/cnn",
- # Model hyper parameters
- hidden_dim=100,
- num_channels=256,
- # Training hyper parameters
- seed=1337,
- learning_rate=0.001,
- batch_size=128,
- num_epochs=100,
- early_stopping_criteria=5,
- dropout_p=0.1,
- # Runtime options
- cuda=False,
- reload_from_files=False,
- expand_filepaths_to_save_dir=True,
- catch_keyboard_interrupt=True
- )
-
-
- if args.expand_filepaths_to_save_dir:
- args.vectorizer_file = os.path.join(args.save_dir,
- args.vectorizer_file)
-
- args.model_state_file = os.path.join(args.save_dir,
- args.model_state_file)
-
- print("Expanded filepaths: ")
- print("\t{}".format(args.vectorizer_file))
- print("\t{}".format(args.model_state_file))
-
- # Check CUDA
- if not torch.cuda.is_available():
- args.cuda = False
-
- args.device = torch.device("cuda" if args.cuda else "cpu")
- print("Using CUDA: {}".format(args.cuda))
-
- def set_seed_everywhere(seed, cuda):
- np.random.seed(seed)
- torch.manual_seed(seed)
- if cuda:
- torch.cuda.manual_seed_all(seed)
-
- def handle_dirs(dirpath):
- if not os.path.exists(dirpath):
- os.makedirs(dirpath)
-
- # Set seed for reproducibility
- set_seed_everywhere(args.seed, args.cuda)
-
- # handle dirs
- handle_dirs(args.save_dir)

- if args.reload_from_files:
- # training from a checkpoint
- dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
- args.vectorizer_file)
- else:
- # create dataset and vectorizer
- dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
- dataset.save_vectorizer(args.vectorizer_file)
-
- vectorizer = dataset.get_vectorizer()
-
- classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab),
- num_classes=len(vectorizer.nationality_vocab),
- num_channels=args.num_channels)
-
- classifer = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
-
- loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
- optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
- scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
- mode='min', factor=0.5,
- patience=1)
-
- train_state = make_train_state(args)

- epoch_bar = tqdm_notebook(desc='training routine',
- total=args.num_epochs,
- position=0)
-
- dataset.set_split('train')
- train_bar = tqdm_notebook(desc='split=train',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
- dataset.set_split('val')
- val_bar = tqdm_notebook(desc='split=val',
- total=dataset.get_num_batches(args.batch_size),
- position=1,
- leave=True)
-
- try:
- for epoch_index in range(args.num_epochs):
- train_state['epoch_index'] = epoch_index
-
- # Iterate over training dataset
-
- # setup: batch generator, set loss and acc to 0, set train mode on
-
- dataset.set_split('train')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.0
- running_acc = 0.0
- classifier.train()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # the training routine is these 5 steps:
-
- # --------------------------------------
- # step 1. zero the gradients
- optimizer.zero_grad()
-
- # step 2. compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # step 4. use loss to produce gradients
- loss.backward()
-
- # step 5. use optimizer to take gradient step
- optimizer.step()
- # -----------------------------------------
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- # update bar
- train_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- train_bar.update()
-
- train_state['train_loss'].append(running_loss)
- train_state['train_acc'].append(running_acc)
-
- # Iterate over val dataset
-
- # setup: batch generator, set loss and acc to 0; set eval mode on
- dataset.set_split('val')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
-
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # step 3. compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
- val_bar.set_postfix(loss=running_loss, acc=running_acc,
- epoch=epoch_index)
- val_bar.update()
-
- train_state['val_loss'].append(running_loss)
- train_state['val_acc'].append(running_acc)
-
- train_state = update_train_state(args=args, model=classifier,
- train_state=train_state)
-
- scheduler.step(train_state['val_loss'][-1])
-
- if train_state['stop_early']:
- break
-
- train_bar.n = 0
- val_bar.n = 0
- epoch_bar.update()
- except KeyboardInterrupt:
- print("Exiting loop")

-
- classifier.load_state_dict(torch.load(train_state['model_filename']))
-
- classifier = classifier.to(args.device)
- dataset.class_weights = dataset.class_weights.to(args.device)
- loss_func = nn.CrossEntropyLoss(dataset.class_weights)
-
- dataset.set_split('test')
- batch_generator = generate_batches(dataset,
- batch_size=args.batch_size,
- device=args.device)
- running_loss = 0.
- running_acc = 0.
- classifier.eval()
-
- for batch_index, batch_dict in enumerate(batch_generator):
- # compute the output
- y_pred = classifier(batch_dict['x_surname'])
-
- # compute the loss
- loss = loss_func(y_pred, batch_dict['y_nationality'])
- loss_t = loss.item()
- running_loss += (loss_t - running_loss) / (batch_index + 1)
-
- # compute the accuracy
- acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
- running_acc += (acc_t - running_acc) / (batch_index + 1)
-
- train_state['test_loss'] = running_loss
- train_state['test_acc'] = running_acc
- print("Test loss: {};".format(train_state['test_loss']))
- print("Test Accuracy: {}".format(train_state['test_acc']))

- def predict_nationality(surname, classifier, vectorizer):
- """Predict the nationality from a new surname
-
- Args:
- surname (str): the surname to classifier
- classifier (SurnameClassifer): an instance of the classifier
- vectorizer (SurnameVectorizer): the corresponding vectorizer
- Returns:
- a dictionary with the most likely nationality and its probability
- """
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
- result = classifier(vectorized_surname, apply_softmax=True)
-
- probability_values, indices = result.max(dim=1)
- index = indices.item()
-
- predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
- probability_value = probability_values.item()
-
- return {'nationality': predicted_nationality, 'probability': probability_value}
-
- new_surname = input("Enter a surname to classify: ")
- classifier = classifier.cpu()
- prediction = predict_nationality(new_surname, classifier, vectorizer)
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

- def predict_topk_nationality(surname, classifier, vectorizer, k=5):
- """Predict the top K nationalities from a new surname
-
- Args:
- surname (str): the surname to classifier
- classifier (SurnameClassifer): an instance of the classifier
- vectorizer (SurnameVectorizer): the corresponding vectorizer
- k (int): the number of top nationalities to return
- Returns:
- list of dictionaries, each dictionary is a nationality and a probability
- """
-
- vectorized_surname = vectorizer.vectorize(surname)
- vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
- prediction_vector = classifier(vectorized_surname, apply_softmax=True)
- probability_values, indices = torch.topk(prediction_vector, k=k)
-
- # returned size is 1,k
- probability_values = probability_values[0].detach().numpy()
- indices = indices[0].detach().numpy()
-
- results = []
- for kth_index in range(k):
- nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
- probability_value = probability_values[kth_index]
- results.append({'nationality': nationality,
- 'probability': probability_value})
- return results
-
- new_surname = input("Enter a surname to classify: ")
-
- k = int(input("How many of the top predictions to see? "))
- if k > len(vectorizer.nationality_vocab):
- print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
- k = len(vectorizer.nationality_vocab)
-
- predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
-
- print("Top {} predictions:".format(k))
- print("===================")
- for prediction in predictions:
- print("{} -> {} (p={:0.2f})".format(new_surname,
- prediction['nationality'],
- prediction['probability']))

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。