赞
踩
文本分类任务是自然语言处理(NLP)中的一个常见问题,目的是根据预定义的类别来自动对输入的文本进行分类。这类任务广泛应用于垃圾邮件过滤、情感分析、主题标签生成等场景。常用的方法包括朴素贝叶斯分类、支持向量机(SVM)、神经网络等。
全概率公式:
举例:扔一个正常的骰子
P(B1) = 结果为奇数
P(B2) = 结果为偶数
P(A) = 结果为5
P(A) = P(B1) * P(A|B1) + P(B2) * P(A|B2)
公式:
求解:如果核酸检测呈阳性,感染新冠的概率是多少?
我们假定新冠在人群中的感染率为0.1%(千分之一)
核酸检测有一定误报率,我们假定如下:
import math import jieba import re import os import json from collections import defaultdict jieba.initialize() """ 贝叶斯分类实践 P(A|B) = (P(A) * P(B|A)) / P(B) 事件A:文本属于类别x1。文本属于类别x的概率,记做P(x1) 事件B:文本为s (s=w1w2w3..wn) P(x1|s) = 文本为s,属于x1类的概率. #求解目标# P(x1|s) = P(x1|w1, w2, w3...wn) = P(w1, w2..wn|x1) * P(x1) / P(w1, w2, w3...wn) P(x1) 任意样本属于x1的概率。x1样本数/总样本数 P(w1, w2..wn|x1) = P(w1|x1) * P(w2|x1)...P(wn|x1) 词的独立性假设 P(w1|x1) x1类样本中,w1出现的频率 公共分母的计算,使用全概率公式: P(w1, w2, w3...wn) = P(w1,w2..Wn|x1)*P(x1) + P(w1,w2..Wn|x2)*P(x2) ... P(w1,w2..Wn|xn)*P(xn) """ class BayesApproach: def __init__(self, data_path): self.p_class = defaultdict(int) self.word_class_prob = defaultdict(dict) self.load(data_path) def load(self, path): self.class_name_to_word_freq = defaultdict(dict) self.all_words = set() #汇总一个词表 with open(path, encoding="utf8") as f: for line in f: line = json.loads(line) class_name = line["tag"] title = line["title"] words = jieba.lcut(title) self.all_words.union(set(words)) self.p_class[class_name] += 1 #记录每个类别样本数量 word_freq = self.class_name_to_word_freq[class_name] #记录每个类别下的词频 for word in words: if word not in word_freq: word_freq[word] = 1 else: word_freq[word] += 1 self.freq_to_prob() return #将记录的词频和样本频率都转化为概率 def freq_to_prob(self): #样本概率计算 total_sample_count = sum(self.p_class.values()) self.p_class = dict([c, self.p_class[c] / total_sample_count] for c in self.p_class) #词概率计算 self.word_class_prob = defaultdict(dict) for class_name, word_freq in self.class_name_to_word_freq.items(): total_word_count = sum(count for count in word_freq.values()) #每个类别总词数 for word in word_freq: #加1平滑,避免出现概率为0,计算P(wn|x1) prob = (word_freq[word] + 1) / (total_word_count + len(self.all_words)) self.word_class_prob[class_name][word] = prob self.word_class_prob[class_name]["<unk>"] = 1/(total_word_count + len(self.all_words)) return #P(w1|x1) * P(w2|x1)...P(wn|x1) def get_words_class_prob(self, words, class_name): result = 1 for word in words: unk_prob = self.word_class_prob[class_name]["<unk>"] result *= self.word_class_prob[class_name].get(word, unk_prob) return result #计算P(w1, w2..wn|x1) * P(x1) def get_class_prob(self, words, class_name): #P(x1) p_x = self.p_class[class_name] # P(w1, w2..wn|x1) = P(w1|x1) * P(w2|x1)...P(wn|x1) p_w_x = self.get_words_class_prob(words, class_name) return p_x * p_w_x #做文本分类 def classify(self, sentence): words = jieba.lcut(sentence) #切词 results = [] for class_name in self.p_class: prob = self.get_class_prob(words, class_name) #计算class_name类概率 results.append([class_name, prob]) results = sorted(results, key=lambda x:x[1], reverse=True) #排序 #计算公共分母:P(w1, w2, w3...wn) = P(w1,w2..Wn|x1)*P(x1) + P(w1,w2..Wn|x2)*P(x2) ... P(w1,w2..Wn|xn)*P(xn) #不做这一步也可以,对顺序没影响,只不过得到的不是0-1之间的概率值 pw = sum([x[1] for x in results]) #P(w1, w2, w3...wn) results = [[c, prob/pw] for c, prob in results] #打印结果 for class_name, prob in results: print("属于类别[%s]的概率为%f" % (class_name, prob)) return results if __name__ == "__main__": path = "../data/train_tag_news.json" ba = BayesApproach(path) query = "目瞪口呆 世界上还有这些奇葩建筑" ba.classify(query)
缺点:
优点:
5. 简单高效
6. 一定的可解释性
7. 如果样本覆盖的好,效果是不错的
8. 训练数据可以很好的分批处理
支持向量机(SVM)是一种用于分类和回归问题的监督学习算法。在分类任务中,SVM通过找到一个超平面来区分不同类别的数据点。这个超平面被选取以便最大化距离最近的数据点(支持向量)的间隔,从而提供更好的泛化能力。SVM在文本分类、图像识别和生物信息学等多个领域有广泛应用。它也可以通过核技巧来处理非线性问题。
支持向量机
#!/usr/bin/env python3 #coding: utf-8 #使用基于词向量的分类器 #对比几种模型的效果 import json import jieba import numpy as np from gensim.models import Word2Vec from sklearn.metrics import classification_report from sklearn.svm import SVC from collections import defaultdict LABELS = {'健康': 0, '军事': 1, '房产': 2, '社会': 3, '国际': 4, '旅游': 5, '彩票': 6, '时尚': 7, '文化': 8, '汽车': 9, '体育': 10, '家居': 11, '教育': 12, '娱乐': 13, '科技': 14, '股票': 15, '游戏': 16, '财经': 17} #输入模型文件路径 #加载训练好的模型 def load_word2vec_model(path): model = Word2Vec.load(path) return model #加载数据集 def load_sentence(path, model): sentences = [] labels = [] with open(path, encoding="utf8") as f: for line in f: line = json.loads(line) title, content = line["title"], line["content"] sentences.append(" ".join(jieba.lcut(title))) labels.append(line["tag"]) train_x = sentences_to_vectors(sentences, model) train_y = label_to_label_index(labels) return train_x, train_y #tag标签转化为类别标号 def label_to_label_index(labels): return [LABELS[y] for y in labels] #文本向量化,使用了基于这些文本训练的词向量 def sentences_to_vectors(sentences, model): vectors = [] for sentence in sentences: words = sentence.split() vector = np.zeros(model.vector_size) for word in words: try: vector += model.wv[word] # vector = np.max([vector, model.wv[word]], axis=0) except KeyError: vector += np.zeros(model.vector_size) vectors.append(vector / len(words)) return np.array(vectors) def main(): model = load_word2vec_model("model.w2v") train_x, train_y = load_sentence("../data/train_tag_news.json", model) test_x, test_y = load_sentence("../data/valid_tag_news.json", model) classifier = SVC() classifier.fit(train_x, train_y) y_pred = classifier.predict(test_x) print(classification_report(test_y, y_pred)) if __name__ == "__main__": main()
优点:
缺点:
在深度学习中,“pipeline” 通常指的是一系列数据预处理、模型训练、模型评估和模型部署的步骤。这些步骤被组织成一个流程,以便更高效地完成特定任务。
FastText 是一个用于文本分类和词向量学习的开源库。与传统的深度学习模型相比,FastText 显著提高了训练速度和分类准确性。
TextRNN 是一种利用循环神经网络(RNN)进行文本分类的模型。下面是一些关键点:
TextRNN 是一种强大但计算密集的文本分类方法,尤其适用于需要捕获长距离依赖或复杂结构的任务。
RNN(循环神经网络)用于文本分类主要有以下几个特点:
总体来说,RNN 是一种适用于文本分类的强大模型,特别是当文本中的顺序信息很重要时。然而,也需要注意其潜在的计算成本和其他技术挑战。
LSTM(长短时记忆网络)在文本分类方面具有以下特点:
总体而言,LSTM提供了一种高度灵活和强大的方式来进行文本分类,尤其是当处理具有复杂结构和长距离依赖的文本时。
import torch import torch.nn as nn import numpy as np ''' 用矩阵运算的方式复现一些基础的模型结构 清楚模型的计算细节,有助于加深对于模型的理解,以及模型转换等工作 ''' #构造一个输入 length = 6 input_dim = 12 hidden_size = 7 x = np.random.random((length, input_dim)) # print(x) #使用pytorch的lstm层 torch_lstm = nn.LSTM(input_dim, hidden_size, batch_first=True) for key, weight in torch_lstm.state_dict().items(): print(key, weight.shape) def sigmoid(x): return 1/(1 + np.exp(-x)) #将pytorch的lstm网络权重拿出来,用numpy通过矩阵运算实现lstm的计算 def numpy_lstm(x, state_dict): weight_ih = state_dict["weight_ih_l0"].numpy() weight_hh = state_dict["weight_hh_l0"].numpy() bias_ih = state_dict["bias_ih_l0"].numpy() bias_hh = state_dict["bias_hh_l0"].numpy() #pytorch将四个门的权重拼接存储,我们将它拆开 w_i_x, w_f_x, w_c_x, w_o_x = weight_ih[0:hidden_size, :], \ weight_ih[hidden_size:hidden_size*2, :],\ weight_ih[hidden_size*2:hidden_size*3, :],\ weight_ih[hidden_size*3:hidden_size*4, :] w_i_h, w_f_h, w_c_h, w_o_h = weight_hh[0:hidden_size, :], \ weight_hh[hidden_size:hidden_size * 2, :], \ weight_hh[hidden_size * 2:hidden_size * 3, :], \ weight_hh[hidden_size * 3:hidden_size * 4, :] b_i_x, b_f_x, b_c_x, b_o_x = bias_ih[0:hidden_size], \ bias_ih[hidden_size:hidden_size * 2], \ bias_ih[hidden_size * 2:hidden_size * 3], \ bias_ih[hidden_size * 3:hidden_size * 4] b_i_h, b_f_h, b_c_h, b_o_h = bias_hh[0:hidden_size], \ bias_hh[hidden_size:hidden_size * 2], \ bias_hh[hidden_size * 2:hidden_size * 3], \ bias_hh[hidden_size * 3:hidden_size * 4] w_i = np.concatenate([w_i_h, w_i_x], axis=1) w_f = np.concatenate([w_f_h, w_f_x], axis=1) w_c = np.concatenate([w_c_h, w_c_x], axis=1) w_o = np.concatenate([w_o_h, w_o_x], axis=1) b_f = b_f_h + b_f_x b_i = b_i_h + b_i_x b_c = b_c_h + b_c_x b_o = b_o_h + b_o_x c_t = np.zeros((1, hidden_size)) h_t = np.zeros((1, hidden_size)) sequence_output = [] for x_t in x: x_t = x_t[np.newaxis, :] hx = np.concatenate([h_t, x_t], axis=1) # f_t = sigmoid(np.dot(x_t, w_f_x.T) + b_f_x + np.dot(h_t, w_f_h.T) + b_f_h) f_t = sigmoid(np.dot(hx, w_f.T) + b_f) # i_t = sigmoid(np.dot(x_t, w_i_x.T) + b_i_x + np.dot(h_t, w_i_h.T) + b_i_h) i_t = sigmoid(np.dot(hx, w_i.T) + b_i) # g = np.tanh(np.dot(x_t, w_c_x.T) + b_c_x + np.dot(h_t, w_c_h.T) + b_c_h) g = np.tanh(np.dot(hx, w_c.T) + b_c) c_t = f_t * c_t + i_t * g # o_t = sigmoid(np.dot(x_t, w_o_x.T) + b_o_x + np.dot(h_t, w_o_h.T) + b_o_h) o_t = sigmoid(np.dot(hx, w_o.T) + b_o) h_t = o_t * np.tanh(c_t) sequence_output.append(h_t) return np.array(sequence_output), (h_t, c_t) torch_sequence_output, (torch_h, torch_c) = torch_lstm(torch.Tensor([x])) numpy_sequence_output, (numpy_h, numpy_c) = numpy_lstm(x, torch_lstm.state_dict()) print(torch_sequence_output) print(numpy_sequence_output) print("--------") print(torch_h) print(numpy_h) print("--------") print(torch_c) print(numpy_c) ############################################################# #使用pytorch的GRU层 torch_gru = nn.GRU(input_dim, hidden_size, batch_first=True) # for key, weight in torch_gru.state_dict().items(): # print(key, weight.shape) #将pytorch的GRU网络权重拿出来,用numpy通过矩阵运算实现GRU的计算 def numpy_gru(x, state_dict): weight_ih = state_dict["weight_ih_l0"].numpy() weight_hh = state_dict["weight_hh_l0"].numpy() bias_ih = state_dict["bias_ih_l0"].numpy() bias_hh = state_dict["bias_hh_l0"].numpy() #pytorch将3个门的权重拼接存储,我们将它拆开 w_r_x, w_z_x, w_x = weight_ih[0:hidden_size, :], \ weight_ih[hidden_size:hidden_size * 2, :],\ weight_ih[hidden_size * 2:hidden_size * 3, :] w_r_h, w_z_h, w_h = weight_hh[0:hidden_size, :], \ weight_hh[hidden_size:hidden_size * 2, :], \ weight_hh[hidden_size * 2:hidden_size * 3, :] b_r_x, b_z_x, b_x = bias_ih[0:hidden_size], \ bias_ih[hidden_size:hidden_size * 2], \ bias_ih[hidden_size * 2:hidden_size * 3] b_r_h, b_z_h, b_h = bias_hh[0:hidden_size], \ bias_hh[hidden_size:hidden_size * 2], \ bias_hh[hidden_size * 2:hidden_size * 3] w_z = np.concatenate([w_z_h, w_z_x], axis=1) w_r = np.concatenate([w_r_h, w_r_x], axis=1) b_z = b_z_h + b_z_x b_r = b_r_h + b_r_x h_t = np.zeros((1, hidden_size)) sequence_output = [] for x_t in x: x_t = x_t[np.newaxis, :] hx = np.concatenate([h_t, x_t], axis=1) z_t = sigmoid(np.dot(hx, w_z.T) + b_z) r_t = sigmoid(np.dot(hx, w_r.T) + b_r) h = np.tanh(r_t * (np.dot(h_t, w_h.T) + b_h) + np.dot(x_t, w_x.T) + b_x) h_t = (1 - z_t) * h + z_t * h_t sequence_output.append(h_t) return np.array(sequence_output), h_t # torch_sequence_output, torch_h = torch_gru(torch.Tensor([x])) # numpy_sequence_output, numpy_h = numpy_gru(x, torch_gru.state_dict()) # # print(torch_sequence_output) # print(numpy_sequence_output) # print("--------") # print(torch_h) # print(numpy_h)
对CNN的一种改进
import torch import torch.nn as nn import numpy as np #使用pytorch的1维卷积层 input_dim = 7 hidden_size = 8 kernel_size = 2 torch_cnn1d = nn.Conv1d(input_dim, hidden_size, kernel_size) for key, weight in torch_cnn1d.state_dict().items(): print(key, weight.shape) x = torch.rand((7, 4)) #embedding_size * max_length def numpy_cnn1d(x, state_dict): weight = state_dict["weight"].numpy() bias = state_dict["bias"].numpy() sequence_output = [] for i in range(0, x.shape[1] - kernel_size + 1): window = x[:, i:i+kernel_size] kernel_outputs = [] for kernel in weight: kernel_outputs.append(np.sum(kernel * window)) sequence_output.append(np.array(kernel_outputs) + bias) return np.array(sequence_output).T print(x.shape) print(torch_cnn1d(x.unsqueeze(0))) print(torch_cnn1d(x.unsqueeze(0)).shape) print(numpy_cnn1d(x.numpy(), torch_cnn1d.state_dict()))
BERT(Bidirectional Encoder Representations from Transformers)在文本分类上有如下特点:
总的来说,BERT由于其强大的编码能力和高度的灵活性,已经成为文本分类和其他NLP任务中的一种主流方法。
# -*- coding: utf-8 -*- """ 配置参数信息 """ Config = { "model_path": "output", "train_data_path": "../data/train_tag_news.json", "valid_data_path": "../data/valid_tag_news.json", "vocab_path":"chars.txt", "model_type":"lstm", "max_length": 20, "hidden_size": 128, "kernel_size": 3, "num_layers": 2, "epoch": 15, "batch_size": 64, "pooling_style":"max", "optimizer": "adam", "learning_rate": 1e-3, "pretrain_model_path":r"F:\Desktop\work_space\pretrain_models\bert-base-chinese", "seed": 987 }
# -*- coding: utf-8 -*- import torch import torch.nn as nn from torch.optim import Adam, SGD from transformers import BertModel """ 建立网络模型结构 """ class TorchModel(nn.Module): def __init__(self, config): super(TorchModel, self).__init__() hidden_size = config["hidden_size"] vocab_size = config["vocab_size"] + 1 class_num = config["class_num"] model_type = config["model_type"] num_layers = config["num_layers"] self.use_bert = False self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0) if model_type == "fast_text": self.encoder = lambda x: x elif model_type == "lstm": self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers) elif model_type == "gru": self.encoder = nn.GRU(hidden_size, hidden_size, num_layers=num_layers) elif model_type == "rnn": self.encoder = nn.RNN(hidden_size, hidden_size, num_layers=num_layers) elif model_type == "cnn": self.encoder = CNN(config) elif model_type == "gated_cnn": self.encoder = GatedCNN(config) elif model_type == "stack_gated_cnn": self.encoder = StackGatedCNN(config) elif model_type == "rcnn": self.encoder = RCNN(config) elif model_type == "bert": self.use_bert = True self.encoder = BertModel.from_pretrained(config["pretrain_model_path"]) hidden_size = self.encoder.config.hidden_size elif model_type == "bert_lstm": self.use_bert = True self.encoder = BertLSTM(config) hidden_size = self.encoder.bert.config.hidden_size elif model_type == "bert_cnn": self.use_bert = True self.encoder = BertCNN(config) hidden_size = self.encoder.bert.config.hidden_size elif model_type == "bert_mid_layer": self.use_bert = True self.encoder = BertMidLayer(config) hidden_size = self.encoder.bert.config.hidden_size self.classify = nn.Linear(hidden_size, class_num) self.pooling_style = config["pooling_style"] self.loss = nn.functional.cross_entropy #loss采用交叉熵损失 #当输入真实标签,返回loss值;无真实标签,返回预测值 def forward(self, x, target=None): if self.use_bert: # bert返回的结果是 (sequence_output, pooler_output) x = self.encoder(x) else: x = self.embedding(x) # input shape:(batch_size, sen_len) x = self.encoder(x) # input shape:(batch_size, sen_len, input_dim) if isinstance(x, tuple): #RNN类的模型会同时返回隐单元向量,我们只取序列结果 x = x[0] #可以采用pooling的方式得到句向量 if self.pooling_style == "max": self.pooling_layer = nn.MaxPool1d(x.shape[1]) else: self.pooling_layer = nn.AvgPool1d(x.shape[1]) x = self.pooling_layer(x.transpose(1, 2)).squeeze() #input shape:(batch_size, sen_len, input_dim) #也可以直接使用序列最后一个位置的向量 # x = x[:, -1, :] predict = self.classify(x) #input shape:(batch_size, input_dim) if target is not None: return self.loss(predict, target.squeeze()) else: return predict class CNN(nn.Module): def __init__(self, config): super(CNN, self).__init__() hidden_size = config["hidden_size"] kernel_size = config["kernel_size"] pad = int((kernel_size - 1)/2) self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad) def forward(self, x): #x : (batch_size, max_len, embeding_size) return self.cnn(x.transpose(1, 2)).transpose(1, 2) class GatedCNN(nn.Module): def __init__(self, config): super(GatedCNN, self).__init__() self.cnn = CNN(config) self.gate = CNN(config) def forward(self, x): a = self.cnn(x) b = self.gate(x) b = torch.sigmoid(b) return torch.mul(a, b) class StackGatedCNN(nn.Module): def __init__(self, config): super(StackGatedCNN, self).__init__() self.num_layers = config["num_layers"] self.hidden_size = config["hidden_size"] #ModuleList类内可以放置多个模型,取用时类似于一个列表 self.gcnn_layers = nn.ModuleList( GatedCNN(config) for i in range(self.num_layers) ) self.ff_liner_layers1 = nn.ModuleList( nn.Linear(self.hidden_size, self.hidden_size) for i in range(self.num_layers) ) self.ff_liner_layers2 = nn.ModuleList( nn.Linear(self.hidden_size, self.hidden_size) for i in range(self.num_layers) ) self.bn_after_gcnn = nn.ModuleList( nn.LayerNorm(self.hidden_size) for i in range(self.num_layers) ) self.bn_after_ff = nn.ModuleList( nn.LayerNorm(self.hidden_size) for i in range(self.num_layers) ) def forward(self, x): #仿照bert的transformer模型结构,将self-attention替换为gcnn for i in range(self.num_layers): gcnn_x = self.gcnn_layers[i](x) x = gcnn_x + x #通过gcnn+残差 x = self.bn_after_gcnn[i](x) #之后bn # # 仿照feed-forward层,使用两个线性层 l1 = self.ff_liner_layers1[i](x) #一层线性 l1 = torch.relu(l1) #在bert中这里是gelu l2 = self.ff_liner_layers2[i](l1) #二层线性 x = self.bn_after_ff[i](x + l2) #残差后过bn return x class RCNN(nn.Module): def __init__(self, config): super(RCNN, self).__init__() hidden_size = config["hidden_size"] self.rnn = nn.RNN(hidden_size, hidden_size) self.cnn = GatedCNN(config) def forward(self, x): x, _ = self.rnn(x) x = self.cnn(x) return x class BertLSTM(nn.Module): def __init__(self, config): super(BertLSTM, self).__init__() self.bert = BertModel.from_pretrained(config["pretrain_model_path"]) self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True) def forward(self, x): x = self.bert(x)[0] x, _ = self.rnn(x) return x class BertCNN(nn.Module): def __init__(self, config): super(BertCNN, self).__init__() self.bert = BertModel.from_pretrained(config["pretrain_model_path"]) config["hidden_size"] = self.bert.config.hidden_size self.cnn = CNN(config) def forward(self, x): x = self.bert(x)[0] x = self.cnn(x) return x class BertMidLayer(nn.Module): def __init__(self, config): super(BertMidLayer, self).__init__() self.bert = BertModel.from_pretrained(config["pretrain_model_path"]) self.bert.config.output_hidden_states = True def forward(self, x): layer_states = self.bert(x)[2] layer_states = torch.add(layer_states[-2], layer_states[-1]) return layer_states #优化器的选择 def choose_optimizer(config, model): optimizer = config["optimizer"] learning_rate = config["learning_rate"] if optimizer == "adam": return Adam(model.parameters(), lr=learning_rate) elif optimizer == "sgd": return SGD(model.parameters(), lr=learning_rate) if __name__ == "__main__": from config import Config # Config["class_num"] = 3 # Config["vocab_size"] = 20 # Config["max_length"] = 5 Config["model_type"] = "bert" model = BertModel.from_pretrained(Config["pretrain_model_path"]) x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]) sequence_output, pooler_output = model(x) print(x[2], type(x[2]), len(x[2])) # model = TorchModel(Config) # label = torch.LongTensor([1,2]) # print(model(x, label))
# -*- coding: utf-8 -*- import torch import os import random import os import numpy as np import logging from config import Config from model import TorchModel, choose_optimizer from evaluate import Evaluator from loader import load_data #[DEBUG, INFO, WARNING, ERROR, CRITICAL] logging.basicConfig(level=logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) """ 模型训练主程序 """ seed = Config["seed"] random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) def main(config): #创建保存模型的目录 if not os.path.isdir(config["model_path"]): os.mkdir(config["model_path"]) #加载训练数据 train_data = load_data(config["train_data_path"], config) #加载模型 model = TorchModel(config) # 标识是否使用gpu cuda_flag = torch.cuda.is_available() if cuda_flag: logger.info("gpu可以使用,迁移模型至gpu") model = model.cuda() #加载优化器 optimizer = choose_optimizer(config, model) #加载效果测试类 evaluator = Evaluator(config, model, logger) #训练 for epoch in range(config["epoch"]): epoch += 1 model.train() logger.info("epoch %d begin" % epoch) train_loss = [] for index, batch_data in enumerate(train_data): if cuda_flag: batch_data = [d.cuda() for d in batch_data] optimizer.zero_grad() input_ids, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况 loss = model(input_ids, labels) loss.backward() optimizer.step() train_loss.append(loss.item()) if index % int(len(train_data) / 2) == 0: logger.info("batch loss %f" % loss) logger.info("epoch average loss: %f" % np.mean(train_loss)) acc = evaluator.eval(epoch) # model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch) # torch.save(model.state_dict(), model_path) #保存模型权重 return acc if __name__ == "__main__": main(Config) # for model in ["cnn"]: # Config["model_type"] = model # print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"]) #对比所有模型 #中间日志可以关掉,避免输出过多信息 # 超参数的网格搜索 # for model in ["gated_cnn"]: # Config["model_type"] = model # for lr in [1e-3]: # Config["learning_rate"] = lr # for hidden_size in [128]: # Config["hidden_size"] = hidden_size # for batch_size in [64, 128]: # Config["batch_size"] = batch_size # for pooling_style in ["avg"]: # Config["pooling_style"] = pooling_style # print("最后一轮准确率:", main(Config), "当前配置:", Config)
# -*- coding: utf-8 -*- import json import re import os import torch import numpy as np from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer """ 数据加载 """ class DataGenerator: def __init__(self, data_path, config): self.config = config self.path = data_path self.index_to_label = {0: '家居', 1: '房产', 2: '股票', 3: '社会', 4: '文化', 5: '国际', 6: '教育', 7: '军事', 8: '彩票', 9: '旅游', 10: '体育', 11: '科技', 12: '汽车', 13: '健康', 14: '娱乐', 15: '财经', 16: '时尚', 17: '游戏'} self.label_to_index = dict((y, x) for x, y in self.index_to_label.items()) self.config["class_num"] = len(self.index_to_label) if self.config["model_type"] == "bert": self.tokenizer = BertTokenizer.from_pretrained(config["pretrain_model_path"]) self.vocab = load_vocab(config["vocab_path"]) self.config["vocab_size"] = len(self.vocab) self.load() def load(self): self.data = [] with open(self.path, encoding="utf8") as f: for line in f: line = json.loads(line) tag = line["tag"] label = self.label_to_index[tag] title = line["title"] if self.config["model_type"] == "bert": input_id = self.tokenizer.encode(title, max_length=self.config["max_length"], pad_to_max_length=True) else: input_id = self.encode_sentence(title) input_id = torch.LongTensor(input_id) label_index = torch.LongTensor([label]) self.data.append([input_id, label_index]) return def encode_sentence(self, text): input_id = [] for char in text: input_id.append(self.vocab.get(char, self.vocab["[UNK]"])) input_id = self.padding(input_id) return input_id #补齐或截断输入的序列,使其可以在一个batch内运算 def padding(self, input_id): input_id = input_id[:self.config["max_length"]] input_id += [0] * (self.config["max_length"] - len(input_id)) return input_id def __len__(self): return len(self.data) def __getitem__(self, index): return self.data[index] def load_vocab(vocab_path): token_dict = {} with open(vocab_path, encoding="utf8") as f: for index, line in enumerate(f): token = line.strip() token_dict[token] = index + 1 #0留给padding位置,所以从1开始 return token_dict #用torch自带的DataLoader类封装数据 def load_data(data_path, config, shuffle=True): dg = DataGenerator(data_path, config) dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle) return dl if __name__ == "__main__": from config import Config dg = DataGenerator("valid_tag_news.json", Config) print(dg[1])
# -*- coding: utf-8 -*- import torch from loader import load_data """ 模型效果测试 """ class Evaluator: def __init__(self, config, model, logger): self.config = config self.model = model self.logger = logger self.valid_data = load_data(config["valid_data_path"], config, shuffle=False) self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果 def eval(self, epoch): self.logger.info("开始测试第%d轮模型效果:" % epoch) self.model.eval() self.stats_dict = {"correct": 0, "wrong": 0} # 清空上一轮结果 for index, batch_data in enumerate(self.valid_data): if torch.cuda.is_available(): batch_data = [d.cuda() for d in batch_data] input_ids, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况 with torch.no_grad(): pred_results = self.model(input_ids) #不输入labels,使用模型当前参数进行预测 self.write_stats(labels, pred_results) acc = self.show_stats() return acc def write_stats(self, labels, pred_results): assert len(labels) == len(pred_results) for true_label, pred_label in zip(labels, pred_results): pred_label = torch.argmax(pred_label) if int(true_label) == int(pred_label): self.stats_dict["correct"] += 1 else: self.stats_dict["wrong"] += 1 return def show_stats(self): correct = self.stats_dict["correct"] wrong = self.stats_dict["wrong"] self.logger.info("预测集合条目总量:%d" % (correct +wrong)) self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong)) self.logger.info("预测准确率:%f" % (correct / (correct + wrong))) self.logger.info("--------------------") return correct / (correct + wrong)
代码
import torch
import torch.nn as nn
m = nn.Sigmoid()
bceloss = nn.BCELoss()
input = torch.randn(5)
target = torch.FloatTensor([1,0,1,0,0])
output = bceloss(m(input), target)
print(output)
celoss = nn.CrossEntropyLoss()
input = torch.FloatTensor([[0.1,0.2,0.3,0.1,0.3],[0.1,0.2,0.3,0.1,0.3]])
target = torch.LongTensor([2,3])
output = celoss(input, target)
print(output)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。