赞
踩
仅记录学习过程,有问题欢迎讨论
P(B1) = 结果为奇数
P(B2) = 结果为偶数
P(A) = 结果为5
P(A) = P(B1) * P(A|B1) + P(B2) * P(A|B2) = 1/2 1/3 + 1/20
svm尝试找一个最优的决策边界,来解决一个 二分类的问题
解决不了线性不可分问题,只能以更高维来区分数据(升维 类似于bert中的 feed forward)
使用 核函数解决高纬度向量内积问题
深度学习
是RNN的变体,相比于tansformer,没那么复杂,东西没那么多
一定程度规避传统RNN会导致信息遗忘和梯度消失的问题
(把前向和当前的信息做一定筛选【门】后保存)
是包含一定的语序信息的
通过一维卷积对文本进行编码,
编码后的文本通过pooling转化为向量,用于分类
取 【CLS】token对应的向量
整句话的向量求MAX/AVG pooling
需要再接一层来实现想要的需求,需要微调
(但是准确率还是比RNN高)
添加标注数据!!
构造训练样本
换模型
调整阈值,用召回率换准确率(两者概率相反)
减少样本类别
过采样:复制指定类别样本,可以重复
降采样:随机下采样,可以减少类别样本数量
1.分解为多个独立的二分类
(分为多个模型来判断数据是否属于该类)
2.转化为多分类问题(同时属于13/12/23类别)
bert实现多分类任务demo(优化得跑更多的数据,是真的慢。。)
""" 使用bert 实现一个多分类任务 判断 输入的句子属于哪个板块的数据 """ import json import os import numpy as np import torch import torch.nn as nn from transformers import BertModel, BertTokenizer from torch.utils.data import DataLoader, TensorDataset class PickModel(nn.Module): def __init__(self, input_dim, output_size, max_len=128): super(PickModel, self).__init__() self.bert = BertModel.from_pretrained(r"E:\Anlp\week6语言模型和预训练\bert-base-chinese", return_dict=False) self.linear = nn.Linear(input_dim, output_size) # 归一化算概率 self.activation = torch.sigmoid self.dropout = nn.Dropout(0.4) self.pool = nn.MaxPool1d(max_len) self.loss = nn.functional.cross_entropy def forward(self, x, y=None): sequence_output, pool_output = self.bert(x) x = self.linear(sequence_output) x = self.pool(x.transpose(1, 2)).squeeze() # input shape:(batch_size, sen_len, input_dim) y_pred = self.activation(x) if y is not None: return self.loss(y_pred, y) else: return y_pred # 构建 tag对应的向量set # tag_dict = {} def build_dataset(corpus_path, simples_size): print("============") x = [] y = [] # 用来记录每次的tag 方便后续构建tag对应的向量 # list = [] # 加载中文分词 直接转化为向量 并且添加前后标记 tokenizer = BertTokenizer.from_pretrained("E:\\Anlp\\week6语言模型和预训练\\bert-base-chinese") tag_dict = open("tag_dict.json", "r", encoding="utf8").read() # 读取tag_dict.json文件 变为字典 tag_dict = json.loads(tag_dict) with open(corpus_path, encoding="utf8") as f: i = 0 # 读取文件中随机某一行的数据 lines = f.readlines() # 随机打乱数据 np.random.shuffle(lines) for line in lines: if i < simples_size: print(line[:50]) i += 1 # loads操作字符串 load操作文件流 line = json.loads(line) # 返回的张量类型(pytorch),是否填充,是否截取,都为128的长度 content_input_ids = tokenizer.encode(str(line["content"]), max_length=128, pad_to_max_length=True, truncation=True) x.append(content_input_ids) # 记录有多少个tag 放入key # tag_dict[str(line["tag"])] = 1 # list.append(str(line["tag"])) y.append(tag_dict.get(str(line["tag"]))) # 对每个tag定义index 方便loss计算 # for index, tag in enumerate(tag_dict.keys()): # tag_dict[tag] = index # 把tag_dict的key,value保存到文件中 # if not os.path.exists("tag_dict.json"): # with open("tag_dict.json", "w", encoding="utf8") as f: # json.dump(tag_dict, f, ensure_ascii=False) return torch.LongTensor(x), torch.LongTensor(y) # test # build_dataset("E:\\Anlp\\week7文本分类问题\\data\\train_tag_news.json",100) def main(): char_dim = 768 epoch_num = 10 simples_size = 200 batch_num = 20 # build_dataset("E:\\Anlp\\week7文本分类问题\\data\\train_tag_news.json", simples_size) x, y = build_dataset("E:\\Anlp\\week7文本分类问题\\data\\train_tag_news.json", simples_size) dataset = TensorDataset(x, y) dataiter = DataLoader(dataset, batch_num, shuffle=True) model = PickModel(char_dim, 18) # 建立模型 optim = torch.optim.Adam(model.parameters(), lr=1e-3) # 建立优化器) for epoch in range(epoch_num): epoch_loss = [] model.train() for x, y in dataiter: loss = model(x, y) loss.backward() optim.step() optim.zero_grad() epoch_loss.append(loss.item()) # print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(epoch_loss))) print("==\n第%d轮,epoch—loss mean 为 %f" % (epoch + 1, np.mean(epoch_loss))) torch.save(model.state_dict(), "model0506.pth") return def predict(model_path): # 建立模型 model = PickModel(768, 18) model.load_state_dict(torch.load(model_path)) model.eval() # sentence = input() # 加载中文分词 直接转化为向量 并且添加前后标记 x, y_true = build_dataset("E:\\Anlp\\week7文本分类问题\\data\\valid_tag_news.json", 100) # tag_dict1 = open("tag_dict.json", "r", encoding="utf8").read() # 读取tag_dict.json文件 变为字典 # tag_dict1 = json.loads(tag_dict1) correct, wrong = 0, 0 with torch.no_grad(): result = model(x) # 返回最大概率的index y_pred = [torch.argmax(i) for i in list(result)] print(y_true) print(y_pred) # 返回最大概率的tag for y_p, y in zip(y_pred, y_true): # 与真实标签进行对比 if int(y_p) == int(y): correct += 1 # 正样本判断正确 else: wrong += 1 print("正确预测个数:%d / %d, 正确率:%f" % (correct, correct + wrong, correct / (correct + wrong))) return if __name__ == '__main__': # main() predict("model0506.pth")
tag_dict.json
{"文化": 0, "时尚": 1, "健康": 2, "教育": 3, "军事": 4, "股票": 5, "娱乐": 6, "游戏": 7, "科技": 8, "彩票": 9, "旅游": 10, "汽车": 11, "体育": 12, "家居": 13, "财经": 14, "国际": 15, "房产": 16, "社会": 17}
标准化流程实现分类任务
训练样本 1w+ 验证样本 1261 训练轮次15
RNN:正确率 80.8 1020/1261 4min40s
LSTM: 正确率 79.3 1001/260 4min43s
BERT: 正确率 82.3 跑第3轮结束耗时 8min40s
config.py 配置文件
""" 配置参数信息 """ Config = { "model_path": "./output/", "model_name": "model.pt", "train_data_path": r"D:\NLP\video\第七周\data\train_simple.csv", "valid_data_path": r"D:\NLP\\video\第七周\data\valid_simple.csv", "vocab_path": r"D:\NLP\video\第七周\data\vocab.txt", "model_type": "bert", "use_bert": True, # 文本向量大小 "char_dim": 128, # 文本长度 "max_len": 50, # 词向量大小 "hidden_size": 256, # 训练 轮数 "epoch_size": 15, # 批量大小 "batch_size": 25, # 训练集大小 "simple_size": 300, # 学习率 "lr": 0.001, # dropout "dropout": 0.5, # 优化器 "optimizer": "adam", # 卷积核 "kernel_size": 3, # 最大池 or 平均池 "pooling_style": "max", # 模型层数 "num_layers": 2, "bert_model_path": r"D:\NLP\video\第六周\bert-base-chinese", # 输出层大小 "output_size": 2, # 随机数种子 "seed": 987 }
load.py j加载数据文件
""" 数据加载 """ import os import numpy as np import pandas as pd import json import re import os import torch import torch.utils.data as Data from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer # 获取字表集 def load_vocab(path): vocab = {} with open(path, 'r', encoding='utf-8') as f: for index, line in enumerate(f): word = line.strip() # 0留给padding位置,所以从1开始 vocab[word] = index + 1 vocab['unk'] = len(vocab) + 1 return vocab # 数据预处理 裁剪or填充 def padding(input_ids, length): if len(input_ids) >= length: return input_ids[:length] else: padded_input_ids = input_ids + [0] * (length - len(input_ids)) return padded_input_ids # 文本预处理 # 转化为向量 def sentence_to_index(text, length, vocab): input_ids = [] for char in text: input_ids.append(vocab.get(char, vocab['unk'])) # 填充or裁剪 input_ids = padding(input_ids, length) return input_ids class DataGenerator: def __init__(self, data_path, config): self.data_path = data_path self.config = config if self.config["model_type"] == "bert": self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"]) self.vocab = load_vocab(config["vocab_path"]) self.config["vocab_size"] = len(self.vocab) self.data = self.load_data() def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] def load_data(self): dataset_x = [] dataset_y = [] with open(self.data_path, 'r', encoding='utf-8') as f: for line in f: row = line.strip().split(',') # 第一列为标签,第二列为文本 label = int(row[0]) text = row[1] # 文本预处理 if self.config["model_type"] == "bert": input_ids = self.tokenizer.encode(text, max_length=self.config["max_len"], pad_to_max_length=True) else: # 转化为对应的字表id input_ids = sentence_to_index(text, self.config["max_len"], self.vocab) # 标签和文本组成一个样本 dataset_x.append(input_ids) dataset_y.append(label) data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y)) return data # 用torch自带的DataLoader类封装数据 def load_data_batch(data_path, config, shuffle=True): dg = DataGenerator(data_path, config) dl = DataLoader(dg.data, batch_size=config["batch_size"], shuffle=shuffle) return dl if __name__ == '__main__': from config import Config dg = DataGenerator(Config["train_data_path"], Config) print(len(dg)) print(dg[0])
main.py 主方法
import torch import os import random import os import numpy as np import logging from config import Config from model import TorchModel, choose_optimizer from loader import load_data_batch from evaluate import Evaluator # [DEBUG, INFO, WARNING, ERROR, CRITICAL] logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) """ 模型训练主程序 """ # 通过设置随机种子来复现上一次的结果(避免随机性) seed = Config["seed"] random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) def main(config): # 保存模型的目录 if not os.path.isdir(config["model_path"]): os.mkdir(config["model_path"]) # 加载数据 dataset = load_data_batch(config["train_data_path"], config) # 加载模型 model = TorchModel(config) # 是否使用gpu if torch.cuda.is_available(): logger.info("gpu可以使用,迁移模型至gpu") model.cuda() # 选择优化器 optim = choose_optimizer(config, model) # 加载效果测试类 evaluator = Evaluator(config, model, logger) for epoch in range(config["epoch_size"]): epoch += 1 logger.info("epoch %d begin" % epoch) epoch_loss = [] # 训练模型 model.train() for batch_data in dataset: if torch.cuda.is_available(): batch_data = [d.cuda() for d in batch_data] # x, y = dataiter # 反向传播 optim.zero_grad() x, y = batch_data # 输入变化时这里需要修改,比如多输入,多输出的情况 # 计算梯度 loss = model(x, y) # 梯度更新 loss.backward() # 优化器更新模型 optim.step() # 记录损失 epoch_loss.append(loss.item()) logger.info("epoch average loss: %f" % np.mean(epoch_loss)) # 测试模型效果 acc = evaluator.eval(epoch) # 可以用model_type model_path epoch 三个参数来保存模型 model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"])) torch.save(model.state_dict(), model_path) # 保存模型权重 return if __name__ == "__main__": main(Config) # for model in ["cnn"]: # Config["model_type"] = model # print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"]) # 对比所有模型 # 中间日志可以关掉,避免输出过多信息 # 超参数的网格搜索 # for model in ["gated_cnn"]: # Config["model_type"] = model # for lr in [1e-3, 1e-4]: # Config["learning_rate"] = lr # for hidden_size in [128]: # Config["hidden_size"] = hidden_size # for batch_size in [64, 128]: # Config["batch_size"] = batch_size # for pooling_style in ["avg"]: # Config["pooling_style"] = pooling_style # 可以把输出放入文件中 便于查看 # print("最后一轮准确率:", main(Config), "当前配置:", Config)
evaluate.py 评估模型文件
""" 模型效果测试 """ import torch from loader import load_data_batch class Evaluator: def __init__(self, config, model, logger): self.config = config self.model = model self.logger = logger # 选择验证集合 self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False) self.stats_dict = {"correct": 0, "wrong": 0} # 用于存储测试结果 def eval(self, epoch): self.logger.info("开始测试第%d轮模型效果:" % epoch) # 测试模式 self.model.eval() self.stats_dict = {"correct": 0, "wrong": 0} # 清空上一轮结果 for index,batch_data in enumerate(self.dataset): if torch.cuda.is_available(): batch_data = [d.cuda() for d in batch_data] x,y = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况 with torch.no_grad(): pred = self.model(x) # dim=1 表示在第一维上进行比较,即取一行的argmax pred = [torch.argmax(i) for i in pred] for pred, y in zip(pred, y): # 预测正确 if pred == y: self.stats_dict["correct"] += 1 # 预测错误 else: self.stats_dict["wrong"] += 1 acc = self.show_stats() return acc def show_stats(self): correct = self.stats_dict["correct"] wrong = self.stats_dict["wrong"] self.logger.info("预测集合条目总量:%d" % (correct + wrong)) self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong)) self.logger.info("预测准确率:%f" % (correct / (correct + wrong))) self.logger.info("--------------------") return correct / (correct + wrong)
model.py
import torch import torch.nn as nn from torch.optim import Adam, SGD from transformers import BertModel """ 建立网络模型结构 """ class TorchModel(nn.Module): def __init__(self, config): super(TorchModel, self).__init__() hidden_size = config["hidden_size"] vocab_size = config["vocab_size"] + 1 output_size = config["output_size"] model_type = config["model_type"] num_layers = config["num_layers"] self.use_bert = config["use_bert"] self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0) if model_type == 'rnn': self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True) elif model_type == 'lstm': # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2) self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers) elif self.use_bert: self.encoder = BertModel.from_pretrained(config["bert_model_path"]) # 需要使用预训练模型的hidden_size hidden_size = self.encoder.config.hidden_size elif model_type == 'cnn': self.encoder = CNN(config) elif model_type == "gated_cnn": self.encoder = GatedCNN(config) elif model_type == "bert_lstm": self.encoder = BertLSTM(config) # 需要使用预训练模型的hidden_size hidden_size = self.encoder.config.hidden_size self.classify = nn.Linear(hidden_size, output_size) self.pooling_style = config["pooling_style"] self.loss = nn.functional.cross_entropy # loss采用交叉熵损失 def forward(self, x, y=None): if self.use_bert: # 输入x为[batch_size, seq_len] # bert返回的结果是 (sequence_output, pooler_output) # sequence_output:batch_size, max_len, hidden_size # pooler_output:batch_size, hidden_size x = self.encoder(x)[0] else: x = self.emb(x) x = self.encoder(x) # 判断x是否是tuple if isinstance(x, tuple): x = x[0] # 池化层 if self.pooling_style == "max": # shape[1]代表列数,shape是行和列数构成的元组 self.pooling_style = nn.MaxPool1d(x.shape[1]) elif self.pooling_style == "avg": self.pooling_style = nn.AvgPool1d(x.shape[1]) x = self.pooling_style(x.transpose(1, 2)).squeeze() y_pred = self.classify(x) if y is not None: return self.loss(y_pred, y.squeeze()) else: return y_pred # 优化器的选择 def choose_optimizer(config, model): optimizer = config["optimizer"] learning_rate = config["lr"] if optimizer == "adam": return Adam(model.parameters(), lr=learning_rate) elif optimizer == "sgd": return SGD(model.parameters(), lr=learning_rate) # 定义CNN模型 class CNN(nn.Module): def __init__(self, config): super(CNN, self).__init__() hidden_size = config["hidden_size"] kernel_size = config["kernel_size"] pad = int((kernel_size - 1) / 2) self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad) def forward(self, x): # x : (batch_size, max_len, embeding_size) return self.cnn(x.transpose(1, 2)).transpose(1, 2) # 定义GatedCNN模型 class GatedCNN(nn.Module): def __init__(self, config): super(GatedCNN, self).__init__() self.cnn = CNN(config) self.gate = CNN(config) # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积 def forward(self, x): a = self.cnn(x) b = self.gate(x) b = torch.sigmoid(b) return torch.mul(a, b) # 定义BERT-LSTM模型 class BertLSTM(nn.Module): def __init__(self, config): super(BertLSTM, self).__init__() self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False) self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True) def forward(self, x): x = self.bert(x)[0] x, _ = self.rnn(x) return x # if __name__ == "__main__": # from config import Config # # Config["output_size"] = 2 # Config["vocab_size"] = 20 # Config["max_length"] = 5 # Config["model_type"] = "bert" # Config["use_bert"] = True # # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False) # x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]) # # sequence_output, pooler_output = model(x) # # print(x[1], type(x[2]), len(x[2])) # # model = TorchModel(Config) # label = torch.LongTensor([0,1]) # print(model(x, label))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。