统计语言模型 (SLM) S = Statistics: ngram语言模型等(成句概率)
马尔可夫假设:假设第n个词出现的概率,仅受其前面 “有限” 个词的影响
解决方案–回退:当三元组abc未出现时,找bc二元组的概率(* 概率值 0.4)
PPL(判断标准) : 困惑度 和 成句概率成反比----相对值
神经语言模型(NLM) N = Neural: rnn语言模型等
预训练语言模型(PLM) P = Pre-train: Bert、GPT等
大语言模型(LLM) L = Large: ChatGPT等
n-gram 统计型demo
import math from collections import defaultdict class NgramLanguageModel: def __init__(self, corpus=None, n=3): self.n = n self.sep = "_" # 用来分割两个词,没有实际含义,只要是字典里不存在的符号都可以 self.sos = "<sos>" # start of sentence,句子开始的标识符 self.eos = "<eos>" # end of sentence,句子结束的标识符 self.unk_prob = 1e-5 # 给unk分配一个比较小的概率值,避免集外词概率为0 self.fix_backoff_prob = 0.4 # 使用固定的回退概率 self.ngram_count_dict = dict((x + 1, defaultdict(int)) for x in range(n)) self.ngram_count_prob_dict = dict((x + 1, defaultdict(int)) for x in range(n)) self.ngram_count(corpus) self.calc_ngram_prob() # 将文本切分成词或字或token def sentence_segment(self, sentence): return sentence.split() # return jieba.lcut(sentence) # 统计ngram的数量 def ngram_count(self, corpus): for sentence in corpus: word_lists = self.sentence_segment(sentence) word_lists = [self.sos] + word_lists + [self.eos] # 前后补充开始符和结尾符 for window_size in range(1, self.n + 1): # 按不同窗长扫描文本 for index, word in enumerate(word_lists): # 取到末尾时窗口长度会小于指定的gram,跳过那几个 if len(word_lists[index:index + window_size]) != window_size: continue # 用分隔符连接word形成一个ngram用于存储 ngram = self.sep.join(word_lists[index:index + window_size]) self.ngram_count_dict[window_size][ngram] += 1 # 计算总词数,后续用于计算一阶ngram概率 self.ngram_count_dict[0] = sum(self.ngram_count_dict[1].values()) return # 计算ngram概率 def calc_ngram_prob(self): for window_size in range(1, self.n + 1): for ngram, count in self.ngram_count_dict[window_size].items(): if window_size > 1: ngram_splits = ngram.split(self.sep) # ngram :a b c ngram_prefix = self.sep.join(ngram_splits[:-1]) # ngram_prefix :a b ngram_prefix_count = self.ngram_count_dict[window_size - 1][ngram_prefix] # Count(a,b) else: ngram_prefix_count = self.ngram_count_dict[0] # count(total word) # word = ngram_splits[-1] # self.ngram_count_prob_dict[word + "|" + ngram_prefix] = count / ngram_prefix_count self.ngram_count_prob_dict[window_size][ngram] = count / ngram_prefix_count return # 获取ngram概率,其中用到了回退平滑,回退概率采取固定值 def get_ngram_prob(self, ngram): n = len(ngram.split(self.sep)) if ngram in self.ngram_count_prob_dict[n]: # 尝试直接取出概率 return self.ngram_count_prob_dict[n][ngram] elif n == 1: # 一阶gram查找不到,说明是集外词,不做回退 return self.unk_prob else: # 高于一阶的可以回退 ngram = self.sep.join(ngram.split(self.sep)[1:]) return self.fix_backoff_prob * self.get_ngram_prob(ngram) # 回退法预测句子概率 def calc_sentence_ppl(self, sentence): word_list = self.sentence_segment(sentence) word_list = [self.sos] + word_list + [self.eos] sentence_prob = 0 for index, word in enumerate(word_list): ngram = self.sep.join(word_list[max(0, index - self.n + 1):index + 1]) prob = self.get_ngram_prob(ngram) # print(ngram, prob) sentence_prob += math.log(prob) return 2 ** (sentence_prob * (-1 / len(word_list))) if __name__ == "__main__": corpus = open("sample.txt", encoding="utf8").readlines() lm = NgramLanguageModel(corpus, 3) print("词总数:", lm.ngram_count_dict[0]) print(lm.ngram_count_prob_dict) print(lm.calc_sentence_ppl("c d b d b"))
rnn demo 预测句子的分类
import torch import torch.nn as nn import math import os import random import torch.utils.data as data_util import numpy as np """ import torch import torch.nn as nn import math import os import random import torch.utils.data as data_util import numpy as np """ week 6--语言模型 通俗来说就是人话 统计语言模型----=N-gram 成句概率-》 词w1--wn 按顺序出现的概率 马尔可夫假设: 假设第n个词出现的概率,仅受其前面 “有限” 个词的影响 P(今天天气不错) = P(今)*P(天|今) *P(天|今天) *P(气|天天) *P(不|天气) *P(错|气不) 平滑(折扣)问题 遇见没见过的句子,概率也不应该为0 解决方案: 回退:当三元组abc未出现时,找bc二元组的概率(* 概率值 0.4) 若是P(word) 都不存在: 加1平滑(count数+1)/// 低频词替换为【unk】 都当做<unk>处理 插值:计算高阶的概率时,同时考虑低阶的概率 Pw1|wn-1 wn-2 = aP(w1|wn-1 wn-2) + bp(w1|wn-1) + cP(wn) PPL : 困惑度 和 成句概率成反比----相对值 """ # 实现一个判断文本是否该领域的模型 根据ppl大小 class LanguageModel(nn.Module): def __init__(self, input_dim, vocab): super(LanguageModel, self).__init__() self.emb = nn.Embedding(len(vocab) + 1, input_dim) self.rnn = nn.RNN(input_dim, input_dim, batch_first=True) # 输出为字表的长度 代表可能性为任何一个字 因为是预测 self.linear = nn.Linear(input_dim, len(vocab) + 1) self.drop = nn.Dropout(0.1) self.loss = nn.functional.cross_entropy def forward(self, x, y=None): x = self.emb(x) # output shape:(batch_size, sen_len, input_dim) x, _ = self.rnn(x) # output shape:(batch_size, sen_len, input_dim) # x 取最后一个 x = x[:, -1, :] # output shape:(batch_size, input_dim) x = self.drop(x) y_pred = self.linear(x) if y is not None: return self.loss(y_pred, y) # [1*vocab_size] [] else: # 需要归一化处理 return torch.softmax(y_pred, dim=-1) def build_vocab(vocab_path): # set dict vocab = {} with open(vocab_path, encoding="utf8") as f: for index, line in enumerate(f): char = line[:-1] # 去掉结尾换行符 vocab[char] = index + 1 # 留出0位给pad token vocab["\n"] = 1 return vocab def build_simple(corpus, window_size, vocab): start = random.randint(0, len(corpus) - 1 - window_size) end = start + window_size window = corpus[start:end] # 窗口后一个字符 target = corpus[end] x = [vocab.get(char, vocab["<UNK>"]) for char in window] y = vocab[target] return x, y def build_dataset(simple_size, corpus, window_size, vocab): x = [] y = [] for i in range(simple_size): dataset_x, dataset_y = build_simple(corpus, window_size, vocab) x.append(dataset_x) y.append(dataset_y) return torch.LongTensor(x), torch.LongTensor(y) # 读取文件 def load_corpus(corpus_path): return open(corpus_path, encoding="utf8").read() def train(corpus_path, save_weight=True): epoch_num = 10 # 训练轮数 batch_size = 128 # 每次训练样本个数 train_sample = 10000 # 每轮训练总共训练的样本总数 char_dim = 128 # 每个字的维度 window_size = 6 # 样本文本长度 vocab = build_vocab("D:\\NLP\\test\\week6\\vocab.txt") # 建立字表 corpus = load_corpus(corpus_path) # 加载语料 model = LanguageModel(char_dim, vocab) # 建立模型 x, y = build_dataset(train_sample, corpus, window_size, vocab) dataset = data_util.TensorDataset(x, y) dataiter = data_util.DataLoader(dataset, batch_size) # gpu if torch.cuda.is_available(): model = model.cuda() optim = torch.optim.Adam(model.parameters(), lr=0.001) # 建立优化器 for epoch in range(epoch_num): # start training model.train() epoch_loss = [] # x.shape == 20*5 y_true.shape == 20 for x, y_true in dataiter: # print(x, y_true) # 交叉熵需要传递整个x,y过去,而非单个的 loss = model(x, y_true) # print(loss) # 反向传播过程,在反向传播过程中会计算每个参数的梯度值 loss.backward() # 改變權重;所有的 optimizer 都实现了 step() 方法,该方法会更新所有的参数。 optim.step() # 将上一轮计算的梯度清零,避免上一轮的梯度值会影响下一轮的梯度值计算 optim.zero_grad() epoch_loss.append(loss.data) print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(epoch_loss))) if not save_weight: return else: base_name = os.path.basename(corpus_path).replace("txt", "pth") model_path = os.path.join("D:\\NLP\\test\\week6\\model", base_name) torch.save(model.state_dict(), model_path) return def train_all(): for path in os.listdir("../week6/corpus"): corpus_path = os.path.join("D:\\NLP\\test\\week6\\corpus", path) print(corpus_path) train(corpus_path) # def cal_ppl(sentence, model, vocab, window_size): # prob = 0 # model.eval() # with torch.no_grad(): # for i in range(1, len(sentence)): # start = max(0, i - window_size) # window = sentence[start:i] # x = [vocab.get(char, vocab["<UNK>"]) for char in window] # x = torch.LongTensor([x]) # target = sentence[i] # target_index = vocab.get(target, vocab["<UNK>"]) # if torch.cuda.is_available(): # x = x.cuda() # pred_prob_distribute = model(x)[0] # target_prob = pred_prob_distribute[target_index] # prob += math.log(target_prob, 10) # return 2 ** (prob * (-1 / len(sentence))) # 计算文本ppl (rnn 无需回退 因为输出的softmax自带平滑) def cal_ppl(sentence, model): prob = 0 with torch.no_grad(): for i in range(1, len(sentence)): start = max(0, i - model.window_size) window = sentence[start:i] x = [model.vocab.get(char, model.vocab["<UNK>"]) for char in window] x = torch.LongTensor([x]) target = sentence[i] # 目标值的下标 target_index = model.vocab.get(target, model.vocab["<UNK>"]) if torch.cuda.is_available(): x = x.cuda() pred_prob_distribute = model(x)[0] # 对应的概率是多少 target_prob = pred_prob_distribute[target_index] # print(window , "->", target, "prob:", float(target_prob)) prob += math.log(target_prob, 10) return 2 ** (prob * (-1 / len(sentence))) # if __name__ == '__main__': # train_all() # ============================== implement ============================= def load_trained_language_model(path): char_dim = 128 # 每个字的维度,与训练时保持一致 window_size = 6 # 样本文本长度,与训练时保持一致 vocab = build_vocab("D:\\NLP\\test\\week6\\vocab.txt") # 加载字表 model = LanguageModel(char_dim, vocab) # 加载模型 model.load_state_dict(torch.load(path)) # 加载训练好的模型权重 model.eval() if torch.cuda.is_available(): model = model.cuda() model.window_size = window_size model.vocab = vocab return model # 加载训练好的所有模型 def load_models(): model_paths = os.listdir("D:\\NLP\\test\\week6\\model") class_to_model = {} for model_path in model_paths: class_name = model_path.replace(".pth", "") model_path = os.path.join("D:\\NLP\\test\\week6\\model", model_path) class_to_model[class_name] = load_trained_language_model(model_path) return class_to_model # 基于语言模型的文本分类伪代码 # class_to_model: {"class1":<language model obj1>, "class2":<language model obj2>, ..} # 每个语言模型,用对应的领域语料训练 def text_classification_based_on_language_model(class_to_model, sentence): ppl = [] for class_name, class_lm in class_to_model.items(): # 用每个语言模型计算ppl ppl.append([class_name, cal_ppl(sentence, class_lm)]) ppl = sorted(ppl, key=lambda x: x[1]) print(sentence) print(ppl[0: 3]) print("==================") return ppl sentence = ["在全球货币体系出现危机的情况下", "点击进入双色球玩法经典选号图表", "慢时尚服饰最大的优点是独特", "做处女座朋友的人真的很难", "网戒中心要求家长全程陪护", "在欧巡赛扭转了自己此前不利的状态", "选择独立的别墅会比公寓更适合你", ] class_to_model = load_models() for s in sentence: text_classification_based_on_language_model(class_to_model, s)
plm Demo 待补充
import torch import math import numpy as np from transformers import BertModel ''' 通过手动矩阵运算实现Bert结构 模型文件下载 https://huggingface.co/models ''' bert = BertModel.from_pretrained(r"D:\NLP\video\第六周\bert-base-chinese", return_dict=False) state_dict = bert.state_dict() bert.eval() x = np.array([2450, 15486, 102, 2110]) # 通过vocab对应输入:深度学习 torch_x = torch.LongTensor([x]) # pytorch形式输入 # 所有字符的向量 ,开头的cls(包含所有字符信息) # 1 * 4 * 768(simple_size * list_size * input_dim) 1 * 768(simple_size* input_dim) seqence_output, pooler_output = bert(torch_x) print(seqence_output.shape, pooler_output.shape) # print(seqence_output, pooler_output) # 答案 # print(bert.state_dict().keys()) #查看所有的权值矩阵名称 # input() # softmax归一化 def softmax(x): return np.exp(x) / np.sum(np.exp(x), axis=-1, keepdims=True) # gelu激活函数 def gelu(x): return 0.5 * x * (1 + np.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * np.power(x, 3)))) class DiyBert: # 将预训练好的整个权重字典输入进来 def __init__(self, state_dict): # 多头 self.num_attention_heads = 12 self.hidden_size = 768 # should be 12 self.num_layers = 1 self.load_weights(state_dict) def load_weights(self, state_dict): # embedding部分 # output:21128 * 768 self.word_embeddings = state_dict["embeddings.word_embeddings.weight"].numpy() # 位置 output:512(max) *768 self.position_embeddings = state_dict["embeddings.position_embeddings.weight"].numpy() # 同一句话对应的向量一致 [output:2(type_vocab_size) *768](不过一句话的值一致) segment self.token_type_embeddings = state_dict["embeddings.token_type_embeddings.weight"].numpy() # 激活层(归一化)output: 1 * 768 self.embeddings_layer_norm_weight = state_dict["embeddings.LayerNorm.weight"].numpy() #output: 1 * 768 self.embeddings_layer_norm_bias = state_dict["embeddings.LayerNorm.bias"].numpy() self.transformer_weights = [] # transformer部分,有多层 for i in range(self.num_layers): # output: 768* 768 q_w = state_dict["encoder.layer.%d.attention.self.query.weight" % i].numpy() q_b = state_dict["encoder.layer.%d.attention.self.query.bias" % i].numpy() # output: 768* 768 k_w = state_dict["encoder.layer.%d.attention.self.key.weight" % i].numpy() k_b = state_dict["encoder.layer.%d.attention.self.key.bias" % i].numpy() # output: 768* 768 v_w = state_dict["encoder.layer.%d.attention.self.value.weight" % i].numpy() v_b = state_dict["encoder.layer.%d.attention.self.value.bias" % i].numpy() # 过一个线性层 output:768* 768 attention_output_weight = state_dict["encoder.layer.%d.attention.output.dense.weight" % i].numpy() attention_output_bias = state_dict["encoder.layer.%d.attention.output.dense.bias" % i].numpy() # 过一个归一化 output:1*768 attention_layer_norm_w = state_dict["encoder.layer.%d.attention.output.LayerNorm.weight" % i].numpy() attention_layer_norm_b = state_dict["encoder.layer.%d.attention.output.LayerNorm.bias" % i].numpy() # 变大的layer层 output:3072*768 intermediate_weight = state_dict["encoder.layer.%d.intermediate.dense.weight" % i].numpy() intermediate_bias = state_dict["encoder.layer.%d.intermediate.dense.bias" % i].numpy() # 变回来的layer层===》output: 768* 3072 output_weight = state_dict["encoder.layer.%d.output.dense.weight" % i].numpy() output_bias = state_dict["encoder.layer.%d.output.dense.bias" % i].numpy() # 激活层 output: 768 ff_layer_norm_w = state_dict["encoder.layer.%d.output.LayerNorm.weight" % i].numpy() ff_layer_norm_b = state_dict["encoder.layer.%d.output.LayerNorm.bias" % i].numpy() self.transformer_weights.append( [q_w, q_b, k_w, k_b, v_w, v_b, attention_output_weight, attention_output_bias, attention_layer_norm_w, attention_layer_norm_b, intermediate_weight, intermediate_bias, output_weight, output_bias, ff_layer_norm_w, ff_layer_norm_b]) # pooler层 self.pooler_dense_weight = state_dict["pooler.dense.weight"].numpy() self.pooler_dense_bias = state_dict["pooler.dense.bias"].numpy() # bert embedding,使用3层叠加,在经过一个embedding层 def embedding_forward(self, x): # x.shape = [max_len] we = self.get_embedding(self.word_embeddings, x) # shpae: [max_len, hidden_size] # position embeding的输入 [0, 1, 2, 3] pe = self.get_embedding(self.position_embeddings, np.array(list(range(len(x))))) # shpae: [max_len, hidden_size] # token type embedding,单输入的情况下为[0, 0, 0, 0] te = self.get_embedding(self.token_type_embeddings, np.array([0] * len(x))) # shpae: [max_len, hidden_size] embedding = we + pe + te # 加和后有一个归一化层 embedding = self.layer_norm(embedding, self.embeddings_layer_norm_weight, self.embeddings_layer_norm_bias) # shpae: [max_len, hidden_size] return embedding # embedding层实际上相当于按index索引,或理解为onehot输入乘以embedding矩阵 def get_embedding(self, embedding_matrix, x): return np.array([embedding_matrix[index] for index in x]) # 执行全部的transformer层计算 def all_transformer_layer_forward(self, x): for i in range(self.num_layers): x = self.single_transformer_layer_forward(x, i) return x # 执行单层transformer层计算 def single_transformer_layer_forward(self, x, layer_index): weights = self.transformer_weights[layer_index] # 取出该层的参数,在实际中,这些参数都是随机初始化,之后进行预训练 q_w, q_b, \ k_w, k_b, \ v_w, v_b, \ attention_output_weight, attention_output_bias, \ attention_layer_norm_w, attention_layer_norm_b, \ intermediate_weight, intermediate_bias, \ output_weight, output_bias, \ ff_layer_norm_w, ff_layer_norm_b = weights # self attention层 attention_output = self.self_attention(x, q_w, q_b, k_w, k_b, v_w, v_b, attention_output_weight, attention_output_bias, self.num_attention_heads, self.hidden_size) # bn层,并使用了残差机制 x = self.layer_norm(x + attention_output, attention_layer_norm_w, attention_layer_norm_b) # feed forward层 feed_forward_x = self.feed_forward(x, intermediate_weight, intermediate_bias, output_weight, output_bias) # bn层,并使用了残差机制 x = self.layer_norm(x + feed_forward_x, ff_layer_norm_w, ff_layer_norm_b) return x # self attention的计算 def self_attention(self, x, q_w, q_b, k_w, k_b, v_w, v_b, attention_output_weight, attention_output_bias, num_attention_heads, hidden_size): # x.shape = max_len * hidden_size # q_w, k_w, v_w shape = hidden_size * hidden_size # q_b, k_b, v_b shape = hidden_size q = np.dot(x, q_w.T) + q_b # shape: [max_len, hidden_size] W * X + B lINER k = np.dot(x, k_w.T) + k_b # shpae: [max_len, hidden_size] v = np.dot(x, v_w.T) + v_b # shpae: [max_len, hidden_size] attention_head_size = int(hidden_size / num_attention_heads) # q.shape = num_attention_heads, max_len, attention_head_size q = self.transpose_for_scores(q, attention_head_size, num_attention_heads) # k.shape = num_attention_heads, max_len, attention_head_size k = self.transpose_for_scores(k, attention_head_size, num_attention_heads) # v.shape = num_attention_heads, max_len, attention_head_size v = self.transpose_for_scores(v, attention_head_size, num_attention_heads) # qk.shape = num_attention_heads, max_len, max_len qk = np.matmul(q, k.swapaxes(1, 2)) qk /= np.sqrt(attention_head_size) qk = softmax(qk) # qkv.shape = num_attention_heads, max_len, attention_head_size qkv = np.matmul(qk, v) # qkv.shape = max_len, hidden_size qkv = qkv.swapaxes(0, 1).reshape(-1, hidden_size) # attention.shape = max_len, hidden_size attention = np.dot(qkv, attention_output_weight.T) + attention_output_bias return attention # 多头机制 def transpose_for_scores(self, x, attention_head_size, num_attention_heads): # hidden_size = 768 num_attent_heads = 12 attention_head_size = 64 max_len, hidden_size = x.shape x = x.reshape(max_len, num_attention_heads, attention_head_size) # 就是 转置 x = x.swapaxes(1, 0) # output shape = [num_attention_heads, max_len, attention_head_size] return x # 前馈网络的计算 def feed_forward(self, x, intermediate_weight, # intermediate_size, hidden_size intermediate_bias, # intermediate_size output_weight, # hidden_size, intermediate_size output_bias, # hidden_size ): # output shpae: [max_len, intermediate_size] x = np.dot(x, intermediate_weight.T) + intermediate_bias x = gelu(x) # output shpae: [max_len, hidden_size] x = np.dot(x, output_weight.T) + output_bias return x # 归一化层 def layer_norm(self, x, w, b): x = (x - np.mean(x, axis=1, keepdims=True)) / np.std(x, axis=1, keepdims=True) x = x * w + b return x # 链接[cls] token的输出层 def pooler_output_layer(self, x): x = np.dot(x, self.pooler_dense_weight.T) + self.pooler_dense_bias x = np.tanh(x) return x # 最终输出 def forward(self, x): x = self.embedding_forward(x) sequence_output = self.all_transformer_layer_forward(x) pooler_output = self.pooler_output_layer(sequence_output[0]) return sequence_output, pooler_output # 自制 db = DiyBert(state_dict) diy_sequence_output, diy_pooler_output = db.forward(x) # torch torch_sequence_output, torch_pooler_output = bert(torch_x) print(diy_sequence_output) print(torch_sequence_output) # print(diy_pooler_output) # print(torch_pooler_output)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。