赞
踩
仅记录学习过程,有问题欢迎讨论
什么时候应该使用Pooling层:
NER(数据标注):B/M/E (A/O/P) --左/中/右 边界(地址/机构/人名) O–无关字
CRF -转移矩阵: 如果一个字已经是B_location 那么它大概率是E_location 或者 M_location 和其他的BEM 基本无关
发射矩阵:shape为 sen_len * tag_size ;相当于输出每个字对应tag的概率矩阵
采用bert:
PERSON类实体,准确率:0.655738, 召回率: 0.462428, F1: 0.542368
LOCATION类实体,准确率:0.655462, 召回率: 0.406250, F1: 0.501603
TIME类实体,准确率:0.846847, 召回率: 0.576687, F1: 0.686127
ORGANIZATION类实体,准确率:0.448276, 召回率: 0.305882, F1: 0.363631
Macro-F1: 0.523432
Micro-F1 0.543495
采用LSTM:
PERSON类实体,准确率:0.432000, 召回率: 0.312139, F1: 0.362411
LOCATION类实体,准确率:0.512987, 召回率: 0.411458, F1: 0.456642
TIME类实体,准确率:0.721804, 召回率: 0.588957, F1: 0.648644
ORGANIZATION类实体,准确率:0.450000, 召回率: 0.423529, F1: 0.436359
Macro-F1: 0.476014
Micro-F1 0.479633
`
实现一个NER代码,划分每一句话的B/M/E/O
config.py
""" 配置参数信息 """ Config = { "model_path": "./output/", "model_name": "model.pt", "schema_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\schema.json", "train_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\train.txt", "valid_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\test.txt", "vocab_path": r"E:\Anlp\week9 序列标注问题\ner\chars.txt", "model_type": "bert", # 数据标注中计算loss "use_crf": True, # 文本向量大小 "char_dim": 20, # 文本长度 "max_len": 50, # 词向量大小 "hidden_size": 64, # 训练 轮数 "epoch_size": 15, # 批量大小 "batch_size": 25, # 训练集大小 "simple_size": 300, # 学习率 "lr": 0.0001, # dropout "dropout": 0.5, # 优化器 "optimizer": "adam", # 卷积核 "kernel_size": 3, # 最大池 or 平均池 "pooling_style": "max", # 模型层数 "num_layers": 3, "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese", # 输出层大小 "output_size": 9, # 随机数种子 "seed": 987 }
load.py j加载数据文件
""" 数据加载 """ import os import numpy as np import json import re import os import torch import torch.utils.data as Data from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer # 获取字表集 def load_vocab(path): vocab = {} with open(path, 'r', encoding='utf-8') as f: for index, line in enumerate(f): word = line.strip() # 0留给padding位置,所以从1开始 vocab[word] = index + 1 vocab['unk'] = len(vocab) + 1 return vocab class DataGenerator: def __init__(self, data_path, config): self.data_path = data_path self.config = config self.schema = self.load_schema(config["schema_path"]) if self.config["model_type"] == "bert": self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"]) self.vocab = load_vocab(config["vocab_path"]) self.config["vocab_size"] = len(self.vocab) # 中文的语句list self.sentence_list = [] self.data = self.load_data() def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] def load_schema(self, path): with open(path, encoding="utf8") as f: return json.load(f) def load_data(self): dataset_x = [] dataset_y = [] with open(self.data_path, 'r', encoding='utf-8') as f: # 每句话 segments = f.read().split("\n\n") # 每句话字符 如: 你 0 for segment in segments: sentences = [] labels = [] for line in segment.split("\n"): if line.strip() == "": continue char, label = line.split() sentences.append(char) labels.append(self.schema[label]) self.sentence_list.append(' '.join(sentences)) input_id = self.sentence_to_index(sentences) # labels 也需要padding相同长度 labels = self.padding(labels,-1) # 标签和文本组成一个样本 dataset_x.append(input_id) dataset_y.append(labels) data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y)) return data # 文本预处理 # 转化为向量 def sentence_to_index(self, text): input_ids = [] vocab = self.vocab if self.config["model_type"] == "bert": # 中文的文本转化为tokenizer的id input_ids = self.tokenizer.encode(text, padding="max_length", truncation=True, max_length=self.config["max_len"]) else: for char in text: input_ids.append(vocab.get(char, vocab['unk'])) # 填充or裁剪 input_ids = self.padding(input_ids) return input_ids # 数据预处理 裁剪or填充 def padding(self, input_ids, padding_dot=0): length = self.config["max_len"] padded_input_ids = input_ids if len(input_ids) >= length: return input_ids[:length] else: padded_input_ids += [padding_dot] * (length - len(input_ids)) return padded_input_ids # 用torch自带的DataLoader类封装数据 def load_data_batch(data_path, config, shuffle=True): dg = DataGenerator(data_path, config) # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用) dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle) return dl if __name__ == '__main__': from config import Config dg = DataGenerator(Config["train_data_path"], Config) print(len(dg)) print(dg[0])
evaluate.py 评估模型文件
""" 模型效果测试 """ import re from collections import defaultdict import numpy as np import torch from loader import load_data_batch class Evaluator: def __init__(self, config, model, logger): self.config = config self.model = model self.logger = logger # 选择验证集合 self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False) # self.stats_dict = {"correct": 0, "wrong": 0} # 用于存储测试结果 def eval(self, epoch): self.logger.info("开始测试第%d轮模型效果:" % epoch) # 测试模式 self.model.eval() self.logger.info("开始测试第%d轮模型效果:" % epoch) self.stats_dict = {"LOCATION": defaultdict(int), "TIME": defaultdict(int), "PERSON": defaultdict(int), "ORGANIZATION": defaultdict(int)} for index, batch_data in enumerate(self.dataset): # 取batch_size 句话 sentences = self.dataset.dataset.sentence_list[ index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]] if torch.cuda.is_available(): batch_data = [d.cuda() for d in batch_data] input_id, labels = batch_data with torch.no_grad(): pred_results = self.model(input_id) # 不输入labels,使用模型当前参数进行预测 self.write_stats(labels, pred_results, sentences) self.show_stats() return def write_stats(self, labels, pred_results, sentences): assert len(labels) == len(pred_results) == len(sentences) if not self.config['use_crf']: pred_results = torch.argmax(pred_results, dim=-1) for true_label, pred_label, sentence in zip(labels, pred_results, sentences): if not self.config["use_crf"]: pred_label = pred_label.cpu().detach().tolist() true_label = true_label.cpu().detach().tolist() true_entities = self.decode(sentence, true_label) pred_entities = self.decode(sentence, pred_label) # 正确率 = 识别出的正确实体数 / 识别出的实体数 # 召回率 = 识别出的正确实体数 / 样本的实体数 for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]: self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]]) self.stats_dict[key]["样本实体数"] += len(true_entities[key]) self.stats_dict[key]["识别出实体数"] += len(pred_entities[key]) return def show_stats(self): F1_scores = [] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]: # 正确率 = 识别出的正确实体数 / 识别出的实体数 # 召回率 = 识别出的正确实体数 / 样本的实体数 precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"]) recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"]) F1 = (2 * precision * recall) / (precision + recall + 1e-5) F1_scores.append(F1) self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1)) self.logger.info("Macro-F1: %f" % np.mean(F1_scores)) correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]]) total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]]) true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]]) micro_precision = correct_pred / (total_pred + 1e-5) micro_recall = correct_pred / (true_enti + 1e-5) micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5) self.logger.info("Micro-F1 %f" % micro_f1) self.logger.info("--------------------") return # 相当于截取对应的句子 def decode(self, sentence, labels): labels = "".join([str(x) for x in labels[:len(sentence)]]) results = defaultdict(list) for location in re.finditer("(04+)", labels): s, e = location.span() results["LOCATION"].append(sentence[s:e]) for location in re.finditer("(15+)", labels): s, e = location.span() results["ORGANIZATION"].append(sentence[s:e]) for location in re.finditer("(26+)", labels): s, e = location.span() results["PERSON"].append(sentence[s:e]) for location in re.finditer("(37+)", labels): s, e = location.span() results["TIME"].append(sentence[s:e]) return results
model.py
import torch import torch.nn as nn from torch.optim import Adam, SGD from transformers import BertModel from torchcrf import CRF """ 建立网络模型结构 """ class TorchModel(nn.Module): def __init__(self, config): super(TorchModel, self).__init__() hidden_size = config["hidden_size"] vocab_size = config["vocab_size"] + 1 output_size = config["output_size"] self.model_type = config["model_type"] num_layers = config["num_layers"] # self.use_bert = config["use_bert"] self.use_crf = config["use_crf"] self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0) if self.model_type == 'rnn': self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True) elif self.model_type == 'lstm': # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2) self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True) hidden_size = hidden_size * 2 elif self.model_type == 'bert': self.encoder = BertModel.from_pretrained(config["bert_model_path"]) # 需要使用预训练模型的hidden_size hidden_size = self.encoder.config.hidden_size elif self.model_type == 'cnn': self.encoder = CNN(config) elif self.model_type == "gated_cnn": self.encoder = GatedCNN(config) elif self.model_type == "bert_lstm": self.encoder = BertLSTM(config) # 需要使用预训练模型的hidden_size hidden_size = self.encoder.config.hidden_size self.classify = nn.Linear(hidden_size, output_size) self.pooling_style = config["pooling_style"] self.crf_layer = CRF(output_size, batch_first=True) self.loss = nn.functional.cross_entropy # loss采用交叉熵损失 def forward(self, x, y=None): if self.model_type == 'bert': # 输入x为[batch_size, seq_len] # bert返回的结果是 (sequence_output, pooler_output) # sequence_output:batch_size, max_len, hidden_size # pooler_output:batch_size, hidden_size x = self.encoder(x)[0] else: x = self.emb(x) x = self.encoder(x) # 判断x是否是tuple if isinstance(x, tuple): x = x[0] # # 池化层 # if self.pooling_style == "max": # # shape[1]代表列数,shape是行和列数构成的元组 # self.pooling_style = nn.MaxPool1d(x.shape[1]) # elif self.pooling_style == "avg": # self.pooling_style = nn.AvgPool1d(x.shape[1]) # x = self.pooling_style(x.transpose(1, 2)).squeeze() y_pred = self.classify(x) if y is not None: # 是否使用crf: if self.use_crf: mask = y.gt(-1) return - self.crf_layer(y_pred, y, mask, reduction="mean") else: # (number, class_num), (number) return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1)) else: if self.use_crf: return self.crf_layer.decode(y_pred) else: return y_pred # 优化器的选择 def choose_optimizer(config, model): optimizer = config["optimizer"] learning_rate = config["lr"] if optimizer == "adam": return Adam(model.parameters(), lr=learning_rate) elif optimizer == "sgd": return SGD(model.parameters(), lr=learning_rate) # 定义CNN模型 class CNN(nn.Module): def __init__(self, config): super(CNN, self).__init__() hidden_size = config["hidden_size"] kernel_size = config["kernel_size"] pad = int((kernel_size - 1) / 2) self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad) def forward(self, x): # x : (batch_size, max_len, embeding_size) return self.cnn(x.transpose(1, 2)).transpose(1, 2) # 定义GatedCNN模型 class GatedCNN(nn.Module): def __init__(self, config): super(GatedCNN, self).__init__() self.cnn = CNN(config) self.gate = CNN(config) # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积 def forward(self, x): a = self.cnn(x) b = self.gate(x) b = torch.sigmoid(b) return torch.mul(a, b) # 定义BERT-LSTM模型 class BertLSTM(nn.Module): def __init__(self, config): super(BertLSTM, self).__init__() self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False) self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True) def forward(self, x): x = self.bert(x)[0] x, _ = self.rnn(x) return x # if __name__ == "__main__": # from config import Config # # Config["output_size"] = 2 # Config["vocab_size"] = 20 # Config["max_length"] = 5 # Config["model_type"] = "bert" # Config["use_bert"] = True # # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False) # x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]) # # sequence_output, pooler_output = model(x) # # print(x[1], type(x[2]), len(x[2])) # # model = TorchModel(Config) # label = torch.LongTensor([0,1]) # print(model(x, label))
添加标点符号任务:
config
""" 配置参数信息 """ Config = { "model_path": "./output/", "model_name": "model.pt", "schema_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\schema.json", "train_data_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\train_corpus", "valid_data_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\valid_corpus", "vocab_path": r"D:\NLP\video\第七周\data\vocab.txt", "model_type": "lstm", # 数据标注中计算loss "use_crf": False, # 文本向量大小 "char_dim": 20, # 文本长度 "max_len": 50, # 词向量大小 "hidden_size": 64, # 训练 轮数 "epoch_size": 15, # 批量大小 "batch_size": 25, # 训练集大小 "simple_size": 300, # 学习率 "lr": 0.001, # dropout "dropout": 0.5, # 优化器 "optimizer": "adam", # 卷积核 "kernel_size": 3, # 最大池 or 平均池 "pooling_style": "max", # 模型层数 "num_layers": 2, "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese", # 输出层大小 "output_size": 4, # 随机数种子 "seed": 987 }
evaluate
""" 模型效果测试 """ import re from collections import defaultdict import numpy as np import torch from loader import load_data_batch class Evaluator: def __init__(self, config, model, logger): self.config = config self.model = model self.logger = logger # 选择验证集合 self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False) self.stats_dict = {"correct": 0, "wrong": 0} # 用于存储测试结果 self.schema = self.dataset.dataset.schema self.index_to_label = dict((y, x) for x, y in self.schema.items()) def eval(self, epoch): self.logger.info("开始测试第%d轮模型效果:" % epoch) self.stats_dict = dict(zip(self.schema.keys(), [defaultdict(int) for i in range(len(self.schema))])) self.model.eval() for index, batch_data in enumerate(self.dataset): # 句子 batch_size 化 sentence = self.dataset.dataset.sentence_list[ index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]] input_id, labels = batch_data with torch.no_grad(): pred_results = self.model(input_id) self.write_stats(labels, pred_results, sentence) self.show_stats() return # 计算准确率的 def write_stats(self, ture_labels, pred_results, sentences): assert len(ture_labels) == len(pred_results) == len(sentences), print(len(ture_labels), len(pred_results), len(sentences)) if not self.config["use_crf"]: # 获取最大下标值 pred_results = torch.argmax(pred_results, dim=-1) for true_label, pred_result, sentence in zip(ture_labels, pred_results, sentences): if not self.config["use_crf"]: # 和sentence 等长 pred_result = pred_result.cpu().detach().tolist()[:len(sentence)] # 和sentence 等长 true_label = true_label.cpu().detach().tolist()[:len(sentence)] for pred, index in zip(pred_result, true_label): if index == -1: continue key = self.index_to_label[index] self.stats_dict[key]["correct"] += 1 if pred == index else 0 self.stats_dict[key]["total"] += 1 return # 显示准确率 def show_stats(self): total = [] for key in self.schema: acc = self.stats_dict[key]["correct"] / (1e-5 + self.stats_dict[key]["total"]) self.logger.info("符号%s预测准确率:%f" % (key, acc)) total.append(acc) self.logger.info("平均acc:%f" % np.mean(total)) self.logger.info("--------------------") return
load
""" 数据加载 """ import os import numpy as np import json import re import os import torch import torch.utils.data as Data from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer # 获取字表集 def load_vocab(path): vocab = {} with open(path, 'r', encoding='utf-8') as f: for index, line in enumerate(f): word = line.strip() # 0留给padding位置,所以从1开始 vocab[word] = index + 1 vocab['unk'] = len(vocab) + 1 return vocab class DataGenerator: def __init__(self, data_path, config): self.data_path = data_path self.config = config self.schema = self.load_schema(config["schema_path"]) self.max_len = config["max_len"] if self.config["model_type"] == "bert": self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"]) self.vocab = load_vocab(config["vocab_path"]) self.config["vocab_size"] = len(self.vocab) # 中文的语句list self.sentence_list = [] self.load_data() def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] def load_schema(self, path): with open(path, encoding="utf8") as f: return json.load(f) def load_data(self): self.data = [] with open(self.data_path, 'r', encoding='utf-8') as f: for line in f: if len(line) > self.max_len: for i in range(len(line) // self.max_len): input_id, label = self.process_sentence(line[i * self.max_len:(i + 1) * self.max_len]) self.data.append([torch.LongTensor(input_id), torch.LongTensor(label)]) else: input_id, label = self.process_sentence(line) self.data.append([torch.LongTensor(input_id), torch.LongTensor(label)]) return # 处理文本 输出为不带target标点的文本 + label(目标是当前char的下一个char是否为target dot) def process_sentence(self, sentence): sentence_without_target = [] labels = [] # sentence[:-1] 因为取了 next_char for index, char in enumerate(sentence[:-1]): if char in self.schema: continue # 不是target 标点 sentence_without_target.append(char) next_char = sentence[index + 1] if next_char in self.schema: labels.append(self.schema[next_char]) else: labels.append(0) # 向量化 input_id = self.sentence_to_index(sentence_without_target) labels = self.padding(labels, -1) # 保存一下原始的句子 self.sentence_list.append(' '.join(sentence_without_target)) return input_id, labels # 文本预处理 # 转化为向量 def sentence_to_index(self, text): input_ids = [] vocab = self.vocab for char in text: input_ids.append(vocab.get(char, vocab['unk'])) # 填充or裁剪 input_ids = self.padding(input_ids) return input_ids # 数据预处理 裁剪or填充 def padding(self, input_ids, padding_dot=0): length = self.config["max_len"] padded_input_ids = input_ids if len(input_ids) >= length: return input_ids[:length] else: padded_input_ids += [padding_dot] * (length - len(input_ids)) return padded_input_ids # 用torch自带的DataLoader类封装数据 def load_data_batch(data_path, config, shuffle=True): dg = DataGenerator(data_path, config) # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用) dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle) return dl if __name__ == '__main__': from config import Config dg = DataGenerator(Config["train_data_path"], Config) print(len(dg)) print(dg[0])
main
import torch import os import random import os import numpy as np import logging from config import Config from model import TorchModel, choose_optimizer from loader import load_data_batch from evaluate import Evaluator # [DEBUG, INFO, WARNING, ERROR, CRITICAL] logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) """ 模型训练主程序 """ # 通过设置随机种子来复现上一次的结果(避免随机性) seed = Config["seed"] random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) def main(config): # 保存模型的目录 if not os.path.isdir(config["model_path"]): os.mkdir(config["model_path"]) # 加载数据 dataset = load_data_batch(config["train_data_path"], config) # 加载模型 model = TorchModel(config) # 是否使用gpu if torch.cuda.is_available(): logger.info("gpu可以使用,迁移模型至gpu") model.cuda() # 选择优化器 optim = choose_optimizer(config, model) # 加载效果测试类 evaluator = Evaluator(config, model, logger) for epoch in range(config["epoch_size"]): epoch += 1 logger.info("epoch %d begin" % epoch) epoch_loss = [] # 训练模型 model.train() for index, batch_data in enumerate(dataset): if torch.cuda.is_available(): batch_data = [d.cuda() for d in batch_data] # x, y = dataiter # 反向传播 optim.zero_grad() x, y = batch_data # 输入变化时这里需要修改,比如多输入,多输出的情况 # 计算梯度 loss = model(x, y) # 梯度更新 loss.backward() # 优化器更新模型 optim.step() # 记录损失 epoch_loss.append(loss.item()) logger.info("epoch average loss: %f" % np.mean(epoch_loss)) # 测试模型效果 acc = evaluator.eval(epoch) # 可以用model_type model_path epoch 三个参数来保存模型 # model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"])) # torch.save(model.state_dict(), model_path) # 保存模型权重 return if __name__ == "__main__": main(Config) # for model in ["cnn"]: # Config["model_type"] = model # print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"]) # 对比所有模型 # 中间日志可以关掉,避免输出过多信息 # 超参数的网格搜索 # for model in ["gated_cnn"]: # Config["model_type"] = model # for lr in [1e-3, 1e-4]: # Config["learning_rate"] = lr # for hidden_size in [128]: # Config["hidden_size"] = hidden_size # for batch_size in [64, 128]: # Config["batch_size"] = batch_size # for pooling_style in ["avg"]: # Config["pooling_style"] = pooling_style # 可以把输出放入文件中 便于查看 # print("最后一轮准确率:", main(Config), "当前配置:", Config)
model
import torch import torch.nn as nn from torch.optim import Adam, SGD from transformers import BertModel from torchcrf import CRF """ 建立网络模型结构 """ class TorchModel(nn.Module): def __init__(self, config): super(TorchModel, self).__init__() hidden_size = config["hidden_size"] vocab_size = config["vocab_size"] + 1 output_size = config["output_size"] self.model_type = config["model_type"] num_layers = config["num_layers"] # self.use_bert = config["use_bert"] self.use_crf = config["use_crf"] self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0) if self.model_type == 'rnn': self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True) elif self.model_type == 'lstm': # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2) self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True) hidden_size = hidden_size * 2 elif self.model_type == 'bert': self.encoder = BertModel.from_pretrained(config["bert_model_path"]) # 需要使用预训练模型的hidden_size hidden_size = self.encoder.config.hidden_size elif self.model_type == 'cnn': self.encoder = CNN(config) elif self.model_type == "gated_cnn": self.encoder = GatedCNN(config) elif self.model_type == "bert_lstm": self.encoder = BertLSTM(config) # 需要使用预训练模型的hidden_size hidden_size = self.encoder.config.hidden_size self.classify = nn.Linear(hidden_size, output_size) self.pooling_style = config["pooling_style"] self.crf_layer = CRF(output_size, batch_first=True) # 添加() 使用torch.nn self.loss = torch.nn.CrossEntropyLoss(ignore_index=-1) # loss采用交叉熵损失 def forward(self, x, y=None): if self.model_type == 'bert': # 输入x为[batch_size, seq_len] # bert返回的结果是 (sequence_output, pooler_output) # sequence_output:batch_size, max_len, hidden_size # pooler_output:batch_size, hidden_size x = self.encoder(x)[0] else: x = self.emb(x) x = self.encoder(x) # 判断x是否是tuple if isinstance(x, tuple): x = x[0] # # 池化层 # if not self.use_crf: # if self.pooling_style == "max": # # shape[1]代表列数,shape是行和列数构成的元组 # self.pooling_style = nn.MaxPool1d(x.shape[1]) # elif self.pooling_style == "avg": # self.pooling_style = nn.AvgPool1d(x.shape[1]) # x = self.pooling_style(x.transpose(1, 2)).squeeze() y_pred = self.classify(x) if y is not None: # 是否使用crf: if self.use_crf: mask = y.gt(-1) return - self.crf_layer(y_pred, y, mask, reduction="mean") else: # (number, class_num), (number) # 返回的y_pred是(batch_size, max_len, class_num) # y是(batch_size, max_len) # 所以计算loss 应该 展开为y_pred[batch_size*max_len,class_num] y[batch_size*max_len] return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1)) else: if self.use_crf: return self.crf_layer.decode(y_pred) else: return y_pred # 优化器的选择 def choose_optimizer(config, model): optimizer = config["optimizer"] learning_rate = config["lr"] if optimizer == "adam": return Adam(model.parameters(), lr=learning_rate) elif optimizer == "sgd": return SGD(model.parameters(), lr=learning_rate) # 定义CNN模型 class CNN(nn.Module): def __init__(self, config): super(CNN, self).__init__() hidden_size = config["hidden_size"] kernel_size = config["kernel_size"] pad = int((kernel_size - 1) / 2) self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad) def forward(self, x): # x : (batch_size, max_len, embeding_size) return self.cnn(x.transpose(1, 2)).transpose(1, 2) # 定义GatedCNN模型 class GatedCNN(nn.Module): def __init__(self, config): super(GatedCNN, self).__init__() self.cnn = CNN(config) self.gate = CNN(config) # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积 def forward(self, x): a = self.cnn(x) b = self.gate(x) b = torch.sigmoid(b) return torch.mul(a, b) # 定义BERT-LSTM模型 class BertLSTM(nn.Module): def __init__(self, config): super(BertLSTM, self).__init__() self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False) self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True) def forward(self, x): x = self.bert(x)[0] x, _ = self.rnn(x) return x # if __name__ == "__main__": # from config import Config # # Config["output_size"] = 2 # Config["vocab_size"] = 20 # Config["max_length"] = 5 # Config["model_type"] = "bert" # Config["use_bert"] = True # # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False) # x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]) # # sequence_output, pooler_output = model(x) # # print(x[1], type(x[2]), len(x[2])) # # model = TorchModel(Config) # label = torch.LongTensor([0,1]) # print(model(x, label))
predict:
# -*- coding: utf-8 -*- import torch import re import json import numpy as np from collections import defaultdict from config import Config from model import TorchModel """ 模型效果测试 """ class SentenceLabel: def __init__(self, config, model_path): self.config = config self.schema = self.load_schema(config["schema_path"]) self.index_to_sign = dict((y, x) for x, y in self.schema.items()) self.vocab = self.load_vocab(config["vocab_path"]) self.model = TorchModel(config) self.model.load_state_dict(torch.load(model_path)) self.model.eval() print("模型加载完毕!") def load_schema(self, path): with open(path, encoding="utf8") as f: schema = json.load(f) self.config["class_num"] = len(schema) return schema # 加载字表或词表 def load_vocab(self, vocab_path): token_dict = {} with open(vocab_path, encoding="utf8") as f: for index, line in enumerate(f): token = line.strip() token_dict[token] = index + 1 # 0留给padding位置,所以从1开始 self.config["vocab_size"] = len(token_dict) return token_dict def predict(self, sentence): input_id = [] for char in sentence: input_id.append(self.vocab.get(char, self.vocab["<UNK>"])) with torch.no_grad(): res = self.model(torch.LongTensor([input_id]))[0] res = torch.argmax(res, dim=-1) labeled_sentence = "" for char, label_index in zip(sentence, res): labeled_sentence += char + self.index_to_sign[int(label_index)] return labeled_sentence if __name__ == "__main__": sl = SentenceLabel(Config, "model_output/epoch_10.pth") sentence = "客厅的颜色比较稳重但不沉重相反很好的表现了欧式的感觉给人高雅的味道" res = sl.predict(sentence) print(res) sentence = "双子座的健康运势也呈上升的趋势但下半月有所回落" res = sl.predict(sentence) print(res)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。