赞
踩
import json # 数据集下载地址:https://www.cluebenchmarks.com/introduce.html # 细粒度命名实体识别->下载 # 将数据转为 BIO 标注形式 def dimension_label(path, save_path, labels_path=None): label_dict = ['O'] with open(save_path, "a", encoding="utf-8") as w: with open(path, "r", encoding="utf-8") as r: for line in r: line = json.loads(line) text = line['text'] label = line['label'] text_label = ['O'] * len(text) for label_key in label: # 遍历实体标签 B_label = "B-" + label_key I_label = "I-" + label_key if B_label not in label_dict: label_dict.append(B_label) if I_label not in label_dict: label_dict.append(I_label) label_item = label[label_key] for entity in label_item: # 遍历实体 position = label_item[entity] start = position[0][0] end = position[0][1] text_label[start] = B_label for i in range(start + 1, end + 1): text_label[i] = I_label line = { "text": text, "label": text_label } line = json.dumps(line, ensure_ascii=False) w.write(line + "\n") w.flush() if labels_path: # 保存 label ,后续训练和预测时使用 label_map = {} for i,label in enumerate(label_dict): label_map[label] = i with open(labels_path, "w", encoding="utf-8") as w: labels = json.dumps(label_map, ensure_ascii=False) w.write(labels + "\n") w.flush() if __name__ == '__main__': path = "./cluener_public/dev.json" save_path = "./data/dev.json" dimension_label(path, save_path) path = "./cluener_public/train.json" save_path = "./data/train.json" labels_path = "./data/labels.json" dimension_label(path, save_path, labels_path)
# 处理数据集构建 Dataset from torch.utils.data import Dataset, DataLoader import torch import json class NERDataset(Dataset): def __init__(self, tokenizer, file_path, labels_map, max_length=300): self.tokenizer = tokenizer self.max_length = max_length self.labels_map = labels_map self.text_data = [] self.label_data = [] with open(file_path, "r", encoding="utf-8") as r: for line in r: line = json.loads(line) text = line['text'] label = line['label'] self.text_data.append(text) self.label_data.append(label) def __len__(self): return len(self.text_data) def __getitem__(self, idx): text = self.text_data[idx] labels = self.label_data[idx] # 使用分词器对句子进行处理 inputs = self.tokenizer.encode_plus( text, None, add_special_tokens=True, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt' ) input_ids = inputs['input_ids'].squeeze() attention_mask = inputs['attention_mask'].squeeze() # 将标签转换为数字编码 label_ids = [self.labels_map[l] for l in labels] if len(label_ids) > self.max_length: label_ids = label_ids[0:self.max_length] if len(label_ids) < self.max_length: # 标签填充到最大长度 label_ids.extend([0] * (self.max_length - len(label_ids))) return { 'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': torch.LongTensor(label_ids) }
# 模型迭代训练 import torch from torch.utils.data import Dataset, DataLoader from transformers import AutoTokenizer, AutoModelForTokenClassification # from ner_datasets import NERDataset from tqdm import tqdm import json import time, sys import numpy as np from sklearn.metrics import f1_score def train(epoch, model, device, loader, optimizer, gradient_accumulation_steps): model.train() time1 = time.time() for index, data in enumerate(tqdm(loader, file=sys.stdout, desc="Train Epoch: " + str(epoch))): input_ids = data['input_ids'].to(device) attention_mask = data['attention_mask'].to(device) labels = data['labels'].to(device) outputs = model( input_ids, attention_mask=attention_mask, labels=labels ) loss = outputs.loss # 反向传播,计算当前梯度 loss.backward() # 梯度累积步数 if (index % gradient_accumulation_steps == 0 and index != 0) or index == len(loader) - 1: # 更新网络参数 optimizer.step() # 清空过往梯度 optimizer.zero_grad() # 100轮打印一次 loss if index % 100 == 0 or index == len(loader) - 1: time2 = time.time() tqdm.write( f"{index}, epoch: {epoch} -loss: {str(loss)} ; each step's time spent: {(str(float(time2 - time1) / float(index + 0.0001)))}") def validate(model, device, loader): model.eval() acc = 0 f1 = 0 with torch.no_grad(): for _, data in enumerate(tqdm(loader, file=sys.stdout, desc="Validation Data")): input_ids = data['input_ids'].to(device) attention_mask = data['attention_mask'].to(device) labels = data['labels'] outputs = model(input_ids, attention_mask=attention_mask) _, predicted_labels = torch.max(outputs.logits, dim=2) predicted_labels = predicted_labels.detach().cpu().numpy().tolist() true_labels = labels.detach().cpu().numpy().tolist() predicted_labels_flat = [label for sublist in predicted_labels for label in sublist] true_labels_flat = [label for sublist in true_labels for label in sublist] accuracy = (np.array(predicted_labels_flat) == np.array(true_labels_flat)).mean() acc = acc + accuracy f1score = f1_score(true_labels_flat, predicted_labels_flat, average='macro') f1 = f1 + f1score return acc / len(loader), f1 / len(loader) def main(): labels_path = "./data/labels.json" model_name = 'D:\\AIGC\\model\\chinese-roberta-wwm-ext' train_json_path = "./data/train.json" val_json_path = "./data/dev.json" max_length = 300 epochs = 5 batch_size = 1 lr = 1e-4 gradient_accumulation_steps = 16 model_output_dir = "output" device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载label with open(labels_path, "r", encoding="utf-8") as r: labels_map = json.loads(r.read()) # 加载分词器和模型 tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(labels_map)) model.to(device) # 加载数据 print("Start Load Train Data...") train_dataset = NERDataset(tokenizer, train_json_path, labels_map, max_length) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) print("Start Load Validation Data...") val_dataset = NERDataset(tokenizer, val_json_path, labels_map, max_length) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) # 定义优化器和损失函数 optimizer = torch.optim.AdamW(model.parameters(), lr=lr) print("Start Training...") best_acc = 0.0 for epoch in range(epochs): train(epoch, model, device, train_loader, optimizer, gradient_accumulation_steps) print("Start Validation...") acc, f1 = validate(model, device, val_loader) print(f"Validation : acc: {acc} , f1: {f1}") if best_acc < acc: # 保存准确率最高的模型 print("Save Model To ", model_output_dir) model.save_pretrained(model_output_dir) tokenizer.save_pretrained(model_output_dir) best_acc = acc if __name__ == '__main__': main()
# 模型测试 from transformers import AutoTokenizer, AutoModelForTokenClassification import torch import json # 解析实体 def post_processing(outputs, text, labels_map): _, predicted_labels = torch.max(outputs.logits, dim=2) predicted_labels = predicted_labels.detach().cpu().numpy() predicted_tags = [labels_map[label_id] for label_id in predicted_labels[0]] result = {} entity = "" type = "" for index, word_token in enumerate(text): tag = predicted_tags[index] if tag.startswith("B-"): type = tag.split("-")[1] if entity: if type not in result: result[type] = [] result[type].append(entity) entity = word_token elif tag.startswith("I-"): type = tag.split("-")[1] if entity: entity += word_token else: if entity: if type not in result: result[type] = [] result[type].append(entity) entity = "" return result def main(): labels_path = "./data/labels.json" model_name = './output' max_length = 300 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载label labels_map = {} with open(labels_path, "r", encoding="utf-8") as r: labels = json.loads(r.read()) for label in labels: label_id = labels[label] labels_map[label_id] = label # 加载分词器和模型 tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(labels_map)) model.to(device) while True: text = input("请输入:") if not text or text == '': continue if text == 'q': break encoded_input = tokenizer(text, padding="max_length", truncation=True, max_length=max_length) input_ids = torch.tensor([encoded_input['input_ids']]).to(device) attention_mask = torch.tensor([encoded_input['attention_mask']]).to(device) outputs = model(input_ids, attention_mask=attention_mask) result = post_processing(outputs, text, labels_map) print(result) if __name__ == '__main__': main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。