BERT实战 - 文本分类(Pre-training + Fine-tuning)_文本分类 测试集如何设置

1 简介

Huggingface.co通过其Transformers API使使用基于transformers的模型变得方便。但是,很多时候仅进行微调并不能达到预期的效果。对未标记的数据进行预训练,然后再进行微调,可以帮助模型实现期望的结果。Huggingface API也提供了预训练功能。在这篇博文中,我将解释如何对基于transformers的模型进行预训练,然后再进行微调。为此,将以BERT作为参考模型。

2 数据格式化

要进行预训练,数据必须采用特定的格式。数据应该保存在一个文本文件(.txt格式)中,每行一个句子这个文本文件的目的是首先使用Word Piece tokenizer对数据进行标记化,然后在数据上进行预训练。

import pandas as pd

root_data_dir = "./query-wellformedness/"

# 分别读取训练集 验证集 测试集
train_data = pd.read_csv(root_data_dir + 'train.tsv', sep='\t', header=None)
val_data = pd.read_csv(root_data_dir + 'dev.tsv', sep='\t', header=None)
test_data = pd.read_csv(root_data_dir + 'test.tsv', sep='\t', header=None)

# 查询的问题                     查询的良好状态分数
# 希腊仍然采用哪种形式的政府?	    1.0
# 仅在北美的猫头鹰数量?	        0.0
# 约翰尼德普是凯尔特人球迷吗?	    0.8
# 罗尔德达尔十几岁时住在哪里?	    0.6
# 为训练集数据DataFrame添加列名称,
# 第一列为query表示查询的问题,第二列为label表示查询的良好状态分数。
train_data.columns = ['query', 'label']
val_data.columns = ['query', 'label']
test_data.columns = ['query', 'label']

# 登记大于0.8 标记为1  否则标记为0
train_data['label'] = [1 if label >= 0.8 else 0 for label in train_data['label']]
val_data['label'] = [1 if label >= 0.8 else 0 for label in val_data['label']]
test_data['label'] = [1 if label >= 0.8 else 0 for label in test_data['label']]

# 将训练集、验证集和测试集的query列的内容合并到pretraining_data列表中。
pretraining_data = train_data['query'].tolist() + val_data['query'].tolist() + test_data['query'].tolist()

# 文件写入
with open('./working/pretraining_data.txt', 'w') as f:
    for sent in pretraining_data:
        f.write("%s\n" % sent)

3 预训练模型

3.1 在文本上训练标记器(tokenizer)

在将数据转换为所需格式后,下一步是对输入数据进行标记器(tokenizer)的训练。该步骤有助于创建数据的词汇表。以下代码片段展示了如何使用Word Piece Tokenizer对文本进行标记化。若要了解更多关于Word Piece Tokenizer的信息,请参阅以下链接中的第4.1节:

# 在文本上训练标记器
# 分词 获得词元
import tokenizers

# 创建一个BertWordPieceTokenizer对象,用于训练BERT模型的WordPiece标记器。
bwpt = tokenizers.BertWordPieceTokenizer()

filepath = "./working/pretraining_data.txt"
# 在指定的文件上训练标记器
    files=[filepath],  # 指定要训练的文本文件,即预训练数据。
    vocab_size=50000,  # 设置词汇表的大小为50000,即标记器将学习并保存50000个词元。
    min_frequency=3,  # 设置最小词频为3,表示只保留在数据中至少出现3次的词元。
    limit_alphabet=1000  # 限制字符表的大小为1000,即只考虑数据中最常见的1000个字符。
# 04_finetune/02_BERT_Pre-training+Fine-tuning/models/vocab.txt

3.2 训练BERT进行MLM任务(Masked Language Modeling任务)

接下来的步骤是为Masked Language Modeling(MLM)任务对BERT进行预训练。为此,我们将使用与训练tokenizer相同的数据集。对于MLM任务,随机选择15%的标记进行屏蔽,然后训练模型来预测这些标记。在Huggingface API中已经提供了这个功能,具体代码如下所示:

import tokenizers
from transformers import Trainer, TrainingArguments
from transformers import BertTokenizer, LineByLineTextDataset, BertModel, BertConfig, BertForMaskedLM, DataCollatorForLanguageModeling

# Convert data into required format
dataset= LineByLineTextDataset(
    tokenizer = tokenizer,
    file_path = '/kaggle/working/pretraining_data.txt',
    block_size = 128

print('No. of lines: ', len(dataset))

# Defining configuration of BERT for pretraining
config = BertConfig(
model = BertForMaskedLM(config)
print('No of parameters: ', model.num_parameters())

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=True, mlm_probability=0.15

# Defining training configuration\
training_args = TrainingArguments(

trainer = Trainer(

# Perfrom pre-training and save the model
4 微调模型(Finetuning Model)

4.1 数据准备

在_微调阶段,数据的格式必须与我们在预训练部分使用的格式不同_。BERT接受三个输入,即input_ids、attention_mask和token_type_ids。我不会详细解释它们的含义,你可以从BERT论文中查阅相关信息。在这里,我将解释如何通过Huggingface API计算它们。我将在这里使用BERT模型进行分类任务。根据你的需要,你可以在代码中进行相应的更改。

import torch
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer

MAX_LEN = 512
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', truncation=True, do_lower_case=True)

# Defining dataset class
class QueryData(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.text = dataframe['query']
        self.targets = dataframe['label']
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.float)

training_set = QueryData(train_data, tokenizer, MAX_LEN)
val_set = QueryData(val_data, tokenizer, MAX_LEN)

# Defining training and testing paramerters
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0

# Creating dataloader for training and testing purposes
training_loader = DataLoader(training_set, **train_params)
val_loader = DataLoader(val_set, **test_params)
4.2 模型定义

现在让我们开始进行微调目的的模型构建部分。我将在BERT的顶部添加两个线性层,用于分类目的,并使用dropout = 0.1和ReLU作为激活函数。您也可以尝试不同的配置。我已经定义了一个PyTorch类来构建模型,代码如下所示:

import torch
from transformers import BertModel

class BertClass(torch.nn.Module):
    def __init__(self):
        super(BertClass, self).__init__()
        self.l1 = BertModel.from_pretrained("./models/")
        self.pre_classifier = torch.nn.Linear(768, 768)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(768, 2)
        self.relu = torch.nn.ReLU()

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        hidden_state = output_1[0]
        pooler = hidden_state[:, 0]
        pooler = self.pre_classifier(pooler)
        pooler = self.relu(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output
4.3 训练和验证功能


import torch
from tqdm import tqdm

from fine_tuning import BertClass
from bert_config import device, LEARNING_RATE, model_dir
from fine_tuning_data_formatting import training_loader, val_loader

# 实例化BertClass模型,并将模型移动到指定的计算设备(GPU或CPU)上
model = BertClass()
# 定义交叉熵损失函数
loss_function = torch.nn.CrossEntropyLoss()
# 定义Adam优化器
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)

# 计算准确率的函数
def calcuate_accuracy(preds, targets):
    # 预测和目标样本对比  相等(即正确的)的记为1  并求总个数
    n_correct = (preds == targets).sum().item()
    return n_correct

# 模型训练函数
def train(epoch, training_loader):
    # 初始化累计的训练损失
    tr_loss = 0
    # 初始化累计的正确预测数量
    n_correct = 0
    # 初始化累计的训练步数
    nb_tr_steps = 0
    # 初始化累计的训练样本数量
    nb_tr_examples = 0
    # 将BERT模型设置为训练模式,启用Dropout和BatchNormalization等训练时特有的操作
    for _, data in tqdm(enumerate(training_loader, 0)):
        # 将输入数据的ids张量移动到指定的计算设备(GPU或CPU)上
        ids = data['ids'].to(device, dtype=torch.long)
        # 将输入数据的mask张量移动到指定的计算设备上
        mask = data['mask'].to(device, dtype=torch.long)
        # 将输入数据的token_type_ids张量移动到指定的计算设备上
        token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
        # 将标签数据的targets张量移动到指定的计算设备上
        targets = data['targets'].to(device, dtype=torch.long)
        # 通过BERT模型进行前向传播,获取预测输出
        outputs = model(ids, mask, token_type_ids)
        # 计算loss
        loss = loss_function(outputs, targets)
        # 累计loss
        tr_loss += loss.item()
        # 获取预测输出中每个样本预测得分最高的类别索引
        big_val, big_idx = torch.max(outputs.data, dim=1)
        # 计算当前训练步中预测正确的样本数量,并累计到n_correct中
        n_correct += calcuate_accuracy(big_idx, targets)
        # 增加训练步数
        nb_tr_steps += 1
        # 增加训练样本数量
        nb_tr_examples += targets.size(0)

        if _ % 500 == 0:
            # 计算当前步数中的平均训练损失
            loss_step = tr_loss / nb_tr_steps
            # 计算当前步数中的训练准确率
            accu_step = (n_correct * 100) / nb_tr_examples
            print(f"Training Loss per 500 steps: {loss_step}")
            print(f"Training Accuracy per 500 steps: {accu_step}")
        # 清空之前的梯度信息
        # 计算当前步数中的梯度
        # 执行梯度更新,更新模型的参数
    # 打印当前epoch中的总准确率
    print(f'The Total Accuracy for Epoch {epoch}: {(n_correct * 100) / nb_tr_examples}')
    # 计算当前epoch的平均训练损失
    epoch_loss = tr_loss / nb_tr_steps
    # 计算当前epoch的训练准确率
    epoch_accu = (n_correct * 100) / nb_tr_examples
    print(f"Training Loss Epoch: {epoch_loss}")
    print(f"Training Accuracy Epoch: {epoch_accu}")

for epoch in range(EPOCHS):
    train(epoch, training_loader)

# 模型验证函数
def valid(model, testing_loader):
    # 将BERT模型设置为评估模式,禁用Dropout和BatchNormalization等训练时特有的操作
    # 初始化累计的正确预测数量
    n_correct = 0
    # 初始化累计的错误预测数量
    n_wrong = 0
    # 初始化样本总数
    total = 0
    # 初始化累计的验证损失
    tr_loss = 0
    # 初始化累计的验证步数
    nb_tr_steps = 0
    # 初始化累计的验证样本数量
    nb_tr_examples = 0
    # 使用torch.no_grad()上下文管理器,禁用梯度计算,节省内存并加快计算速度
    with torch.no_grad():
        for _, data in tqdm(enumerate(testing_loader, 0)):
            # 将输入数据的ids张量移动到指定的计算设备(GPU或CPU)上
            ids = data['ids'].to(device, dtype=torch.long)
            mask = data['mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype=torch.long)
            # 通过BERT模型进行前向传播,获取预测输出
            outputs = model(ids, mask, token_type_ids)
            # 计算预测输出和真实标签之间loss
            loss = loss_function(outputs, targets)
            # 累计loss
            tr_loss += loss.item()
            # 获取预测输出中每个样本预测得分最高的类别索引
            big_val, big_idx = torch.max(outputs.data, dim=1)
            # 计算当前验证步中预测正确的样本数量,并累计到n_correct中
            n_correct += calcuate_accuracy(big_idx, targets)

            # 增加验证步数
            nb_tr_steps += 1
            # 增加验证样本数量
            nb_tr_examples += targets.size(0)

            if _ % 5000 == 0:
                # 计算当前步数中的平均验证损失
                loss_step = tr_loss / nb_tr_steps
                # 计算当前步数中的验证准确率
                accu_step = (n_correct * 100) / nb_tr_examples
                print(f"Validation Loss per 100 steps: {loss_step}")
                print(f"Validation Accuracy per 100 steps: {accu_step}")
    # 计算当前epoch的平均验证损失
    epoch_loss = tr_loss / nb_tr_steps
    # 计算当前epoch的验证准确率
    epoch_accu = (n_correct * 100) / nb_tr_examples

    print(f"Validation Loss Epoch: {epoch_loss}")
    print(f"Validation Accuracy Epoch: {epoch_accu}")
    # 返回验证准确率
    return epoch_accu

# 在验证数据集上评估模型的准确率
acc = valid(model, val_loader)
print("Accuracy on validation data = %0.2f%%" % acc)
# 保存模型
# .pth
torch.save(model, model_dir)

4.4 测试集测试

import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
from tqdm import tqdm
from data_read import test_data
from bert_config import MAX_LEN, device, model_dir
from fine_tuning_data_formatting import tokenizer

# 定义一个自定义数据集类QueryData,继承自Dataset类
class QueryData(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        # 初始化数据集的BertTokenizer
        self.tokenizer = tokenizer
        self.data = dataframe
        # 从DataFrame中获取查询文本列
        self.text = dataframe['query']
        #         self.targets = self.data.citation_influence_label
        # 初始化数据集的最大文本长度
        self.max_len = max_len

    # 定义数据集的长度,即数据集中样本的数量
    def __len__(self):
        return len(self.text)

    # 定义数据集的获取方法,根据索引获取具体样本
    def __getitem__(self, index):
        text = str(self.text[index])
        # 对查询文本进行分词,去除多余的空格
        text = " ".join(text.split())

        # 使用self.tokenizer.encode_plus()方法对文本进行编码,并生成模型的输入张量
        # 包含以下信息
        # input_ids:编码后的文本,表示为数字序列。
        # attention_mask:注意力掩码,用于指示哪些令牌在模型的注意力机制中被考虑,哪些令牌被掩盖。
        # token_type_ids:用于区分输入中不同句子或段落的令牌类型。在这个任务中,token_type_ids全为0,因为只有一个文本序列。
        inputs = self.tokenizer.encode_plus(
            text,  # 要编码的文本
            None,  # 在此情况下,表示不存在关联的另一个文本。在这个任务中,文本之间没有关联,因此使用None
            add_special_tokens=True,  # 表示在编码时添加特殊标记,例如[CLS]和[SEP]。
            max_length=self.max_len,  # 表示编码后的序列的最大长度。
            pad_to_max_length=True,  # 如果编码后的序列长度小于max_length,则在序列末尾填充0,使其长度与max_length相同。
            return_token_type_ids=True  # 用于区分输入中的不同句子或段落。在这个任务中,由于输入没有关联的另一个文本,token_type_ids全为0。
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]
        # 返回字典
        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long)
            #             'targets': torch.tensor(self.targets[index], dtype=torch.float)

# 测试函数,用于在测试集上进行模型预测
def test(model, testing_loader):
    res = []
    # 将BERT模型设置为评估模式
    n_correct = 0
    n_wrong = 0
    total = 0
    tr_loss = 0
    nb_tr_steps = 0
    nb_tr_examples = 0
    # 使用torch.no_grad()上下文管理器,禁用梯度计算
    with torch.no_grad():
        for _, data in tqdm(enumerate(testing_loader, 0)):
            # 将输入数据的ids张量移动到指定的计算设备
            ids = data['ids'].to(device, dtype=torch.long)
            mask = data['mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            #             targets = data['targets'].to(device, dtype = torch.long)
            # 通过BERT模型进行前向传播,得到预测输出。
            outputs = model(ids, mask, token_type_ids)
            # 获取预测输出中每个样本预测得分最高的类别索引。
            big_val, big_idx = torch.max(outputs, dim=1)
            # 将预测结果添加到列表res中。

    return res

# 打印测试集的前两行数据
# 创建测试数据集
data_to_test = QueryData(test_data[['query']], tokenizer, MAX_LEN)

# 测试参数设置
test_params = {'batch_size': 4,  # 批次大小
               'shuffle': False,  # 是否打乱顺序
               'num_workers': 0  # 运行的线程数量
# 用于测试的数据加载器。
testing_loader_f = DataLoader(data_to_test, **test_params)
# 加载训练好的模型
model = torch.load(model_dir, map_location="cpu")
# 对测试集进行预测,得到预测结果
res = test(model, testing_loader_f)
# 对test_data数据集中的label列进行了处理,根据label的值是否大于等于0.8,将label转换为二元标签。
# 如果label的值大于等于0.8,则将label设置为1;否则,将label设置为0。
test_data['label'] = [1 if label >= 0.8 else 0 for label in test_data['label']]
# 根据预测结果和真实标签计算预测准确性
# 通过将模型预测的结果res与测试集中的真实标签test_data['label']逐个对应比较,并生成一个新的列表correct。
correct = [1 if pred == lab else 0 for pred, lab in zip(res, test_data['label'].tolist())]
# 统计列表correct中值为1的元素的个数(即预测正确的样本数),除以测试集的总样本数,即可得到模型在测试集上的准确率
print('accuracy on test set is - ', sum(correct) / len(test_data['label'].tolist()) * 100, '%')

5 参考资料


