赞
踩
def topK_sampler(logits, k):
probs = torch.softmax(logits, dim=-1)
values, indices = torch.topk(probs, k, dim=-1)
zeros = logits.new_ones(logits.shape) * float('-inf')
# 填充
zeros.scatter_(-1, indices, values)
probs_to_smaple = torch.softmax(zeros, dim=-1)
sample_token_id = torch.multinomial(probs_to_smaple, 1)
return sample_token_id
def top_p_sampler(logits, p): probs = torch.softmax(logits, dim=-1) # 降序排序 sorted_probs, sorted_indices = torch.sort(probs, dim=-1, descending=True) # 概率累加 cum_sum_probs = torch.cumsum(sorted_probs, dim=-1) sorted_indices_to_remove = cum_sum_probs > p # sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() # 至少保留一个词 sorted_indices_to_remove[..., 0] = 0 probs_top_p = sorted_probs.clone() probs_top_p[sorted_indices_to_remove] = float("-inf") probs_to_smaple = torch.softmax(probs_top_p, dim=-1) # 采样 sample_token_id = torch.multinomial(probs_to_smaple, 1) token_index = sorted_indices.gather(dim=-1, index=sample_token_id) return token_index
def temperature_softmax(logits, t=1.0):
logits = logits/t
exp_logits = torch.exp(logits)
sum_exp = torch.sum(exp_logits, dim=-1, keepdim=True)
out = exp_logits/sum_exp
return out
import torch from torch import nn class LinearReg(nn.Module): def __init__(self, input_dim, out_dim): super(LinearReg, self).__init__() self.input_dim = input_dim self.out_dim = out_dim self.linear = nn.Linear(input_dim, out_dim, bias=True) def forward(self, x): out = self.linear(x) return out def main(): data = torch.randn(100, 2)*10 # 假设 y=3*x_1 + 2*x_2 + 5 weight = torch.tensor([[3.], [2]]) bias = torch.tensor([[5.]]) # 构建数据集,增加一些扰动 y = (data @ weight + bias) + torch.randn(100, 2)*2 model = LinearReg(input_dim=2, out_dim=1) loss_func = nn.MSELoss(reduction="mean") optimizer = torch.optim.SGD(lr=5e-3, params=model.parameters()) epochs = 1000 for step in range(epochs): pred = model(data) loss = loss_func(pred, y) optimizer.zero_grad() loss.backward() optimizer.step() if (step+1) % 10 == 0: print(f"{step}/{epochs} steps, loss: {loss.item():.4f}") print("train finished") # 打印模型权重 print("训练后模型权重如下") print("weight", model.linear.weight) print("bias", model.linear.bias) if __name__ == "__main__": main()
import torch def diff_mse(x, y, w): """ 手动求梯度 delta_w = x.t@(x@w-y) """ return x.transpose(0, 1)@(x@w-y)/x.shape[0] def mse_loss(x, y, w): """ 计算损失 """ return 0.5*torch.mean(torch.square(x@w-y)) def get_batch_data(x, y, batch_size, step): data_len = x.shape[0] start = step*batch_size end = min(start + batch_size, data_len) return x[start:end], y[start:end] def train(epochs, batch_size, lr): data = torch.randn(100, 2)*2 # [100, 2] weight = torch.tensor([[3.], [2]]) # [2, 1] y = data@weight + torch.randn(100, 1)*2 param_w = torch.randn(2, 1) steps = data.shape[0]//batch_size for epoch in range(epochs): for step in range(steps): x, lb = get_batch_data(data, y, batch_size, step) loss = mse_loss(x, lb, param_w) grad = diff_mse(x, lb, param_w) param_w = param_w - lr*grad if step % 10 == 0: print(f"epoch:{epoch}; step:{step}; loss:{loss.item()}") print(f"train finished, param w: {param_w}") if __name__ == "__main__": train(epochs=200, batch_size=8, lr=5e-4)
import numpy as np def k_means(max_iter, tol, data, k): """ max_iter: 最大迭代次数 toL: 聚类中心变化最小值 data:输入数据 [n, dim] k:聚类中心个数 """ # 初始化聚类中心 centers = data[np.random.choice(data.shape[0], k, replace=False)] for j in range(max_iter): # dist.shape = [n, k] dist = np.linalg.norm(data[:, np.newaxis]-centers, axis=2) # labels.shape = (n) labels = np.argmin(dist, axis=1) # 分别选出数据中属于每个聚类中心的点并求均值作为新的聚类中心 new_centers = np.array([data[labels == i].mean(axis=0) for i in range(k)]) # 聚类中心的变化值很小时停止迭代 if np.all(np.linalg.norm(new_centers-centers, axis=1) < tol): print(f"less than tol, break down, iter num {j}") break centers = new_centers return centers, labels if __name__ == "__main__": data = np.random.randn(100, 3) centers, labels = k_means(max_iter=100, tol=0.00001, data=data, k=3) print(centers) print(labels)
类似于top-k
class LayerNorm(nn.Module): def __init__(self, dim): super(LayerNorm, self).__init__() self.alpha = nn.Parameter(torch.ones(dim)) self.beta = nn.Parameter(torch.zeros(dim)) self.epsilon = 1e-6 def forward(self, feature): """ feaure.shape = [batch-size, seq_len, embed_dim] """ mean = torch.mean(feature, dim=-1, keepdim=True) std = torch.std(feature, dim=-1, keepdim=True) norm = (feature-mean)/(std+self.epsilon) out = self.alpha*norm + self.beta return out if __name__ == "__main__": logits = torch.randn(2, 3, 5) ln = LayerNorm(5) print(ln(logits))
class BatchNorm(nn.Module): def __init__(self, momenum=0.01, eps=1e-6, dim=5): super(BatchNorm, self).__init__() self.run_mean = torch.zeros(dim) self.run_std = torch.ones(dim) self.momentum = momenum self.eps = eps self.beta = nn.Parameter(torch.ones(dim)) self.gamma = nn.Parameter(torch.zeros(dim)) def forward(self, feature, is_train=True): if is_train: mean = torch.mean(feature, dim=0) std = torch.std(feature, dim=0) self.run_mean = self.momentum*self.run_mean + (1-self.momentum)*mean self.run_std = self.momentum*self.run_std + (1-self.momentum)*std norm = (feature-mean)/(std+self.eps) else: norm = (feature-self.run_mean)/(self.run_std+self.eps) out = self.beta*norm + self.gamma return out if __name__ == "__main__": logits = torch.randn(3, 5) bn = BatchNorm(momenum=0.01, eps=1e-6, dim=5) print(bn(logits, True))
# -*- coding = utf-8 -*- # @Time: 2024/2/24 15:44 # @Author: fun # @Software: PyCharm import torch from torch import nn import torch.nn.functional as F class MultiHeadAttention(nn.Module): def __init__(self, embed_dim, num_heads): super(MultiHeadAttention, self).__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.head_dim = self.embed_dim // self.num_heads self.q_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=False) self.k_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=False) self.v_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=False) self.out_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=True) def forward(self, query, key, value, atten_mask=None, padding_mask=None): """ :param query: [tgt_len, batch_size, embed_dim] :param key: [src_len, batch_size, embed_dim] :param value: [src_len, batch_size, embed_dim] :param atten_mask: decoder部分mask multi head attention :param padding_mask: input padding mask :return: """ q = self.q_proj(query) # [tgt_len, bs, embed_dim] k = self.k_proj(key) # [src_len, bs, embed_dim] v = self.v_proj(value) # [src_len, bs, embed_dim] tgt_len, batch_size, embed_dim = query.size() src_len = key.size()[0] head_dim = embed_dim // self.num_heads scaling = float(head_dim)**-0.5 q = q*scaling if atten_mask: if atten_mask.dim() == 2: atten_mask = atten_mask.unsqueeze(0) if list(atten_mask.size()) != [1, tgt_len, src_len]: raise RuntimeError("The 2D attention mask is not correct!") elif atten_mask.dim() == 3: if list(atten_mask.size()) != [batch_size*self.num_heads, tgt_len, src_len]: raise RuntimeError("The 3D attention mask is not correct!") else: raise RuntimeError("Attention mask dim is not correct") q = q.contiguous().view(tgt_len, batch_size*self.num_heads, head_dim).transpose(0, 1) # [bsz*num_head, # tgt_len, head_dim] v = v.contiguous().view(src_len, batch_size*self.num_heads, head_dim).transpose(0, 1) # [bsz*num_head, # src_len, head_dim] k = k.contiguous().view(src_len, batch_size*self.num_heads, head_dim).transpose(0, 1) attn_output_weight = torch.bmm(q, k.transpose(1, 2)) # [bsz*num_heads, tgt_len, src_len] if atten_mask is not None: attn_output_weight += atten_mask if padding_mask is not None: attn_output_weight = attn_output_weight.view(batch_size, self.num_heads, tgt_len, src_len) attn_output_weight = attn_output_weight.masked_fill( padding_mask.unsqueeze(1).unsqueeze(2), float("-inf")) attn_output_weight = attn_output_weight.view(batch_size*self.num_heads, tgt_len, src_len) attn_output_weight = F.softmax(attn_output_weight, dim=-1) attn_output_weight = F.dropout(attn_output_weight, training=True) attn_output = torch.bmm(attn_output_weight, v) # [bsz*num_heads, tgt_len, head_dim] attn_output = attn_output.transpose(0, 1).contiguous().view(tgt_len, batch_size, head_dim*self.num_heads) z = self.out_proj(attn_output) return z def generate_square_subsequent_mask(size): """ 生成 attention mask 矩阵 :param size: [tgt_len, src_len] :return: """ # torch.triu返回主对角线及其上部的元素,其他元素都置为0 mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask # [sz,sz] if __name__ == "__main__": source_len = 5 bs = 2 model_dim = 32 num_head = 2 src = torch.rand((source_len, bs, model_dim)) src_padding_mask = torch.tensor([[False, False, False, True, True], [False, False, False, False, True]]) mha = MultiHeadAttention(embed_dim=model_dim, num_heads=num_head) out = mha(src, src, src, padding_mask=src_padding_mask) print(out) print(out.shape)
# -*- coding = utf-8 -*- # @Time: 2024/2/24 18:49 # @Author: fun # @Software: PyCharm from torch import nn import math class TokenEmbed(nn.Module): def __init__(self, vocab_size, embed_dim): super(TokenEmbed, self).__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) self.embed_dim = embed_dim def forward(self, tokens): """ :param tokens: [length, bs] :return:embedding: [length, bs, embed_dim] """ # 注意:根据论文所述,token embedding 需要进行缩放 return self.embedding(tokens.long()) * math.sqrt(self.embed_dim)
def position_embed(max_len, model_dim):
pe = torch.zeros((max_len, model_dim))
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # [max_len, 1]
div_term = torch.exp(torch.arange(0, model_dim, 2).float()*(math.log(10000)/model_dim))
pe[:, 0::2] = torch.sin(position/div_term)
pe[:, 1::2] = torch.cos(position/div_term)
return pe
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。