赞
踩
这是第一次在CSDN上记录自己的学习过程,加油。
本文是记录b站博主“手写AI”的命名实体识别系列课程的学习笔记,构建五个py文件,直接运行即可。
目录
文章比较简单,就是利用BERT+BiLSTM,然后后面直接连上Linear层进行分类,比较简单。
高 B-NAME 勇 E-NAME : O 男 O , O 中 B-CONT 国 I-CONT 国 I-CONT 籍 E-CONT , O 无 O 境 O 外 O 居 O 留 O 权 O , O
- def read_data(filename):
- with open(filename, 'r', encoding='utf8') as f:
- all_data = f.read().split('\n')
-
- all_text = [] # 用来保存所有的文本
- all_label = [] # 用来保存所有的标签
- text = [] # 用来保存一段文本
- labels = [] # 用来保存一段文本的标签
- for data in all_data:
- if data == '':
- all_text.append(text)
- all_label.append(labels)
- text = []
- labels = []
- else:
- t, l = data.split(' ')
- text.append(t)
- labels.append(l)
- return all_text, all_label
- def build_label_2_index(all_label):
- label_2_index = {'PAD': 0, 'UNK': 1}
- for labels in all_label:
- for label in labels:
- if label not in label_2_index:
- label_2_index[label] = len(label_2_index)
- return label_2_index, list(label_2_index)
因为会设置模型输出的最大长度,所有,当句子不够长的时候,我们需要对标签进行填充[PAD],当遇到不认识的标签时[UNK].
返回值:
label_2_index:是字典,类似于{'PAD': 0, 'UNK': 1}
list(label_2_index):是列表,['PAD', 'UNK']
在pytorch里面,Dataset和DataLoader这两个类很重要,可以将数据处理好,然后就可以直接读取了。具体的操作流程都是固定的,主要是以下三个函数:
- def __init__(self): # 初始化函数
- pass
- def __getitem__(self, item): # 读取一个数据
- pass
- def __len__(self) # 返回整个数据的长度
- pass
- class Data(Dataset):
- def __init__(self, all_text, all_label, tokenizer, label2index, max_len):
- self.all_text = all_text
- self.all_label = all_label
- self.tokenizer = tokenizer
- self.label2index = label2index
- self.max_len = max_len
-
- def __getitem__(self, item):
- text = self.all_text[item]
- labels = self.all_label[item][:self.max_len]
-
- # 需要对text编码,让bert可以接受
- text_index = self.tokenizer.encode(text,
- add_special_tokens=True,
- max_length=self.max_len + 2,
- padding='max_length',
- truncation=True,
- return_tensors='pt',
- )
- # 也需要将label进行编码
- # 那么我们需要构建一个函数来传入label2index
- # labels_index = [self.label2index.get(label, 1) for label in labels]
- # 上面那个就仅仅是转化,我们需要将label和text对齐
- labels_index = [0] + [self.label2index.get(label, 1) for label in labels] + [0] + [0] * (
- self.max_len - len(text))
-
- # 这里需要注意text_index.squeeze(),squeeze()是默认去掉维度为1的那个维度
- # text_index的原始维度是:batch_size,1,seq_len
- # 在后续操作的过程中,将输入数据喂入模型时,如果不做处理,就会报错
- # 这里多输出一个len(text)!目的是在验证的时候,用的上,后面会介绍用处
-
- return text_index.squeeze(), torch.tensor(labels_index), len(text)
-
- def __len__(self):
- return len(self.all_text)
1、 def __init__(self, all_text, all_label, tokenizer, label2index, max_len)
需要在初始化函数中传入需要的参数,比如:
all_text和all_label:你读取的所有文本和标签(数据处理部分);
tokenizer:因为要将文本传入BERT模型中,直接传入肯定是不行的,需要将文本转成数字(这是transformers封装好的,直接调用就行);
label2index:与上面的tokenizer相似,也需要将标签转成数字,这里直接编写代码即可(数据处理部分)
max_len:设置你想要的最大长度
- class MyModel(nn.Module):
- def __init__(self, class_num):
- super(MyModel, self).__init__()
- self.class_num = class_num
-
- self.bert = BertModel.from_pretrained(BERT_PATH)
-
- self.lstm = nn.LSTM(768,
- 768 // 2,
- bidirectional=True,
- batch_first=True)
-
- self.linear = nn.Linear(768, class_num)
- self.loss_fn = nn.CrossEntropyLoss()
-
- def forward(self, batch_text, batch_label=None):
- output = self.bert(batch_text)
- bert_out0, bert_out1 = output[0], output[1]
- output1, _ = self.lstm(bert_out0)
- pre = self.linear(output1)
-
- if batch_label is not None:
- loss = self.loss_fn(pre.reshape(-1, pre.shape[-1]), batch_label.reshape(-1))
- return loss
- else:
- return torch.argmax(pre, dim=-1)
将输入数据喂入模型,然后得到输出。
当模型有标签数据的时候,那么就会返回损失值,然后反向传播,更新,梯度清零;
当模型没有标签数据的时候,那么就是预测了,模型的输出应该就是标签类别,所以要在初始化函数中传入整个类别数(len(label2index))
注意:此时,这个标签值,应该是数字,后续还需要将其转换为真是标签进行计算。
所以在初始化函数中:
def __init__(self, class_num),设置了一class_num==整个类别数(len(label2index))
if batch_label is not None:
loss = self.loss_fn(pre.reshape(-1, pre.shape[-1]),batch_label.reshape(-1))
return loss
为什么要将pre和batch_label的维度进行改变?
首先,需要看pre的原始维度:
pre.shape == torch.Size([batch_size, max_len, class_num])
batch_label.shape == torch.Size([batch_size, max_len])
其次,loss_fn = nn.CrossEntropyLoss(),需要输入的向量维度是二维的,所以我们需要对维度进行改变!
最后,
pre.reshape(-1, pre.shape[-1]) == (batch_size*max_len, class_num)
batch_label.reshape(-1) == (batch_size*max_len)
直接返回:return torch.argmax(pre, dim=-1)。
- def train():
-
- # 读取训练文件夹
- train_filename = os.path.join('data', 'train.txt')
- # 返回训练数据的文本和标签
- train_text, train_label = read_data(train_filename)
-
- # 验证集
- dev_filename = os.path.join('data', 'dev.txt')
- dev_text, dev_label = read_data(dev_filename)
- # print(train_filename)
-
- # 得到label2index, index2label
- label2index, index2label = build_label_2_index(train_label)
-
- # 数据迭代器
- train_data = Data(train_text, train_label, tokenizer, label2index, MAX_LEN)
- train_loader = DataLoader(train_data, batch_size=32, shuffle=False)
-
- dev_data = Data(dev_text, dev_label, tokenizer, label2index, MAX_LEN)
- dev_loader = DataLoader(dev_data, batch_size=32, shuffle=False)
-
- # 模型
- model = MyModel(len(label2index)).to(DEVICE)
- optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
-
- # 训练
-
- for epoch in range(EPOCHS):
- model.train()
- for batch_idx, data in enumerate(train_loader):
- batch_text, batch_label, batch_len = data
- # 将数据放到GPU上
- loss = model(batch_text.to(DEVICE), batch_label.to(DEVICE))
- loss.backward()
-
- optimizer.step()
- optimizer.zero_grad()
-
- if batch_idx % 10 == 0:
- print(f'Epoch: {epoch}, BATCH: {batch_idx}, Training Loss: {loss.item()}')
- # torch.save(model, MODEL_DIR + f'model_{epoch}.pth')
-
- model.eval()
-
- # 用来存放预测标签和真实标签
- all_pre = []
- all_tag = []
-
- for batch_text, batch_label, batch_len in dev_loader:
-
- # 因为是预测,所以在模型输入的地方,没有加入batch_label
- pre = model(batch_text.to(DEVICE))
-
- # 将pre从GPU上读下来,转成list
- pre = pre.cpu().numpy().tolist()
- batch_label = batch_label.cpu().numpy().tolist()
-
- # 还有一点要注意, from seqeval.metrics import f1_score
- # 在使用 f1_score的时候,所需要的标签应该是完整的,而不是经过填充过的
- # 所以我们需要将填充过的标签信息进行拆分怎么做呢?
- # 就需要将最开始没有填充过的文本长度记录下来,在__getitem__的返回量中增加一个长度量,那样我们就能知道文本真实长度
- # 然后就此进行切分,因为左边增加了一个开始符,需要去掉一个即可;右边按照长度来切分
-
- for p, t, l in zip(pre, batch_label, batch_len):
- p = p[1: l + 1]
- t = t[1: l + 1]
-
- pre = [index2label[j] for j in p]
- tag = [index2label[j] for j in t]
- all_pre.append(pre)
- all_tag.append(tag)
- f1_score_ = f1_score(all_pre, all_tag)
- p_score = precision_score(all_pre, all_tag)
- r_score = recall_score(all_pre, all_tag)
- # f1_score(batch_label_index, pre)
- print(f'p值={p_score}, r值={r_score}, f1={f1_score_}')
就没有跑那么多了,直接保存模型,读取一条数据进行预测。
- def predict():
- train_filename = os.path.join('data', 'train.txt')
- train_text, train_label = read_data(train_filename)
-
- test_filename = os.path.join('data', 'test.txt')
- test_text, _ = read_data(test_filename)
- text = test_text[1]
-
- print(text)
-
- inputs = tokenizer.encode(text,
- return_tensors='pt')
- inputs = inputs.to(DEVICE)
- model = torch.load(MODEL_DIR + 'model_1.pth')
- y_pre = model(inputs).reshape(-1) # 或者是y_pre[0]也行,因为y_pre是一个batch,需要进行reshape
-
- _, id2label = build_label_2_index(train_label)
-
- label = [id2label[l] for l in y_pre[1:-1]]
- print(text)
- print(label)
-
-
- if __name__ == '__main__':
- predict()
完整代码分为5部分:config.py, utils.py, model.py, train.py, predict.py
- import torch
- from transformers import BertModel, BertTokenizer
- from torch.utils.data import DataLoader, Dataset
- EPOCHS = 2
- BATCH_SIZE = 64
- LEARNING_RATE = 2e-5
- MAX_LEN = 50
- DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # 调用GPU
-
- BERT_PATH = r'BERT_MODEL\roberta' # 你自己的bert模型地址
-
- tokenizer = BertTokenizer.from_pretrained(BERT_PATH)
- MODEL_DIR = 'model/' # 这是保存模型的地址,建在你代码的同一级即可
- import torch
- from torch.utils.data import DataLoader, Dataset
-
-
- def read_data(filename):
- with open(filename, 'r', encoding='utf8') as f:
- all_data = f.read().split('\n')
-
- all_text = []
- all_label = []
- text = []
- labels = []
- for data in all_data:
- if data == '':
- all_text.append(text)
- all_label.append(labels)
- text = []
- labels = []
- else:
- t, l = data.split(' ')
- text.append(t)
- labels.append(l)
- return all_text, all_label
-
-
- def build_label_2_index(all_label):
- label_2_index = {'PAD': 0, 'UNK': 1}
- for labels in all_label:
- for label in labels:
- if label not in label_2_index:
- label_2_index[label] = len(label_2_index)
- return label_2_index, list(label_2_index)
-
-
- class Data(Dataset):
- def __init__(self, all_text, all_label, tokenizer, label2index, max_len):
- self.all_text = all_text
- self.all_label = all_label
- self.tokenizer = tokenizer
- self.label2index = label2index
- self.max_len = max_len
-
- def __getitem__(self, item):
- text = self.all_text[item]
- labels = self.all_label[item][:self.max_len]
-
- # 需要对text编码,让bert可以接受
- text_index = self.tokenizer.encode(text,
- add_special_tokens=True,
- max_length=self.max_len + 2,
- padding='max_length',
- truncation=True,
- return_tensors='pt',
- )
- # 也需要将label进行编码
- # 那么我们需要构建一个函数来传入label2index
- # labels_index = [self.label2index.get(label, 1) for label in labels]
- # 上面那个就仅仅是转化,我们需要将label和text对齐
- labels_index = [0] + [self.label2index.get(label, 1) for label in labels] + [0] + [0] * (
- self.max_len - len(text))
-
- return text_index.squeeze(), torch.tensor(labels_index), len(text)
-
- def __len__(self):
- return len(self.all_text)
- import torch.nn as nn
- from config import *
-
-
- class MyModel(nn.Module):
- def __init__(self, class_num):
- super(MyModel, self).__init__()
- self.class_num = class_num
-
- self.bert = BertModel.from_pretrained(BERT_PATH)
-
- self.lstm = nn.LSTM(768,
- 768 // 2,
- bidirectional=True,
- batch_first=True)
-
- self.linear = nn.Linear(768, class_num)
- self.loss_fn = nn.CrossEntropyLoss()
-
- def forward(self, batch_text, batch_label=None):
- output = self.bert(batch_text)
- bert_out0, bert_out1 = output[0], output[1]
- output1, _ = self.lstm(bert_out0)
- pre = self.linear(output1)
-
- if batch_label is not None:
- loss = self.loss_fn(pre.reshape(-1, pre.shape[-1]), batch_label.reshape(-1))
- return loss
- else:
- return torch.argmax(pre, dim=-1)
-
- from utils import *
- from model import *
- from config import *
- from seqeval.metrics import f1_score, precision_score, recall_score
- import os
-
-
- def train():
-
- # 读取训练文件夹
- train_filename = os.path.join('data', 'train.txt')
- # 返回训练数据的文本和标签
- train_text, train_label = read_data(train_filename)
-
- # 验证集
- dev_filename = os.path.join('data', 'dev.txt')
- dev_text, dev_label = read_data(dev_filename)
- # print(train_filename)
-
- # 得到label2index, index2label
- label2index, index2label = build_label_2_index(train_label)
-
- # 数据迭代器
- train_data = Data(train_text, train_label, tokenizer, label2index, MAX_LEN)
- train_loader = DataLoader(train_data, batch_size=32, shuffle=False)
-
- dev_data = Data(dev_text, dev_label, tokenizer, label2index, MAX_LEN)
- dev_loader = DataLoader(dev_data, batch_size=32, shuffle=False)
-
- # 模型
- model = MyModel(len(label2index)).to(DEVICE)
- optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
-
- # 训练
-
- for epoch in range(EPOCHS):
- model.train()
- for batch_idx, data in enumerate(train_loader):
- batch_text, batch_label, batch_len = data
- # 将数据放到GPU上
- loss = model(batch_text.to(DEVICE), batch_label.to(DEVICE))
- loss.backward()
-
- optimizer.step()
- optimizer.zero_grad()
-
- if batch_idx % 10 == 0:
- print(f'Epoch: {epoch}, BATCH: {batch_idx}, Training Loss: {loss.item()}')
- # torch.save(model, MODEL_DIR + f'model_{epoch}.pth')
-
- model.eval()
-
- # 用来存放预测标签和真实标签
- all_pre = []
- all_tag = []
-
- for batch_text, batch_label, batch_len in dev_loader:
-
- # 因为是预测,所以在模型输入的地方,没有加入batch_label
- pre = model(batch_text.to(DEVICE))
-
- # 将pre从GPU上读下来,转成list
- pre = pre.cpu().numpy().tolist()
- batch_label = batch_label.cpu().numpy().tolist()
-
- # 还有一点要注意, from seqeval.metrics import f1_score
- # 在使用 f1_score的时候,所需要的标签应该是完整的,而不是经过填充过的
- # 所以我们需要将填充过的标签信息进行拆分怎么做呢?
- # 就需要将最开始没有填充过的文本长度记录下来,在__getitem__的返回量中增加一个长度量,那样我们就能知道文本真实长度
- # 然后就此进行切分,因为左边增加了一个开始符,需要去掉一个即可;右边按照长度来切分
-
- for p, t, l in zip(pre, batch_label, batch_len):
- p = p[1: l + 1]
- t = t[1: l + 1]
-
- pre = [index2label[j] for j in p]
- tag = [index2label[j] for j in t]
- all_pre.append(pre)
- all_tag.append(tag)
- f1_score_ = f1_score(all_pre, all_tag)
- p_score = precision_score(all_pre, all_tag)
- r_score = recall_score(all_pre, all_tag)
- # f1_score(batch_label_index, pre)
- print(f'p值={p_score}, r值={r_score}, f1={f1_score_}')
- # print(2*p_score*r_score/(p_score+r_score))
-
-
- if __name__ == '__main__':
- train()
- from utils import *
- from model import *
- from config import *
- import os
-
-
- def predict():
- train_filename = os.path.join('data', 'train.txt')
- train_text, train_label = read_data(train_filename)
-
- test_filename = os.path.join('data', 'test.txt')
- test_text, _ = read_data(test_filename)
- text = test_text[1]
-
- print(text)
-
- inputs = tokenizer.encode(text,
- return_tensors='pt')
- inputs = inputs.to(DEVICE)
- model = torch.load(MODEL_DIR + 'model_1.pth')
- y_pre = model(inputs).reshape(-1) # 或者是y_pre[0]也行,因为y_pre是一个batch,需要进行reshape
-
- _, id2label = build_label_2_index(train_label)
-
- label = [id2label[l] for l in y_pre[1:-1]]
- print(text)
- print(label)
-
-
- if __name__ == '__main__':
- predict()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。