当前位置:   article > 正文

算法工程师非数据结构手撕代码题_算法工程师代码问题

算法工程师代码问题

1. top-K采样

def topK_sampler(logits, k):
    probs = torch.softmax(logits, dim=-1)
    values, indices = torch.topk(probs, k, dim=-1)
    zeros = logits.new_ones(logits.shape) * float('-inf')
    # 填充
    zeros.scatter_(-1, indices, values)
    probs_to_smaple = torch.softmax(zeros, dim=-1)
    sample_token_id = torch.multinomial(probs_to_smaple, 1)
    return sample_token_id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2. top-P采样

def top_p_sampler(logits, p):
    probs = torch.softmax(logits, dim=-1)
    # 降序排序
    sorted_probs, sorted_indices = torch.sort(probs, dim=-1, descending=True)
    # 概率累加
    cum_sum_probs = torch.cumsum(sorted_probs, dim=-1)
    sorted_indices_to_remove = cum_sum_probs > p
    # sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
    # 至少保留一个词
    sorted_indices_to_remove[..., 0] = 0
    
    probs_top_p = sorted_probs.clone()
    probs_top_p[sorted_indices_to_remove] = float("-inf")
    probs_to_smaple = torch.softmax(probs_top_p, dim=-1)
    # 采样
    sample_token_id = torch.multinomial(probs_to_smaple, 1)

    token_index = sorted_indices.gather(dim=-1, index=sample_token_id)
    return token_index
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3. temperature softmax

def temperature_softmax(logits, t=1.0):
    logits = logits/t
    exp_logits = torch.exp(logits)
    sum_exp = torch.sum(exp_logits, dim=-1, keepdim=True)
    out = exp_logits/sum_exp
    return out
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4. 线性回归

import torch
from torch import nn


class LinearReg(nn.Module):
    def __init__(self, input_dim, out_dim):
        super(LinearReg, self).__init__()
        self.input_dim = input_dim
        self.out_dim = out_dim
        self.linear = nn.Linear(input_dim, out_dim, bias=True)

    def forward(self, x):
        out = self.linear(x)
        return out


def main():
    data = torch.randn(100, 2)*10
    # 假设 y=3*x_1 + 2*x_2 + 5
    weight = torch.tensor([[3.], [2]])
    bias = torch.tensor([[5.]])
    # 构建数据集,增加一些扰动
    y = (data @ weight + bias) + torch.randn(100, 2)*2

    model = LinearReg(input_dim=2, out_dim=1)
    loss_func = nn.MSELoss(reduction="mean")
    optimizer = torch.optim.SGD(lr=5e-3, params=model.parameters())
    epochs = 1000
    for step in range(epochs):
        pred = model(data)
        loss = loss_func(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (step+1) % 10 == 0:
            print(f"{step}/{epochs} steps, loss: {loss.item():.4f}")
    print("train finished")

    # 打印模型权重
    print("训练后模型权重如下")
    print("weight", model.linear.weight)
    print("bias", model.linear.bias)


if __name__ == "__main__":
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

5. 线性回归手动更新梯度


import torch


def diff_mse(x, y, w):
    """
    手动求梯度
    delta_w = x.t@(x@w-y)
    """
    return x.transpose(0, 1)@(x@w-y)/x.shape[0]


def mse_loss(x, y, w):
	"""
	计算损失
	"""
    return 0.5*torch.mean(torch.square(x@w-y))


def get_batch_data(x, y, batch_size, step):
    data_len = x.shape[0]
    start = step*batch_size
    end = min(start + batch_size, data_len)
    return x[start:end], y[start:end]


def train(epochs, batch_size, lr):
    data = torch.randn(100, 2)*2    # [100, 2]
    weight = torch.tensor([[3.], [2]])   # [2, 1]
    y = data@weight + torch.randn(100, 1)*2

    param_w = torch.randn(2, 1)
    steps = data.shape[0]//batch_size
    for epoch in range(epochs):
        for step in range(steps):
            x, lb = get_batch_data(data, y, batch_size, step)
            loss = mse_loss(x, lb, param_w)
            grad = diff_mse(x, lb, param_w)
            param_w = param_w - lr*grad
            if step % 10 == 0:
                print(f"epoch:{epoch}; step:{step}; loss:{loss.item()}")
    print(f"train finished, param w: {param_w}")


if __name__ == "__main__":
    train(epochs=200, batch_size=8, lr=5e-4)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

6. kmeans算法

import numpy as np


def k_means(max_iter, tol, data, k):
    """
    max_iter: 最大迭代次数
    toL: 聚类中心变化最小值
    data:输入数据 [n, dim]
    k:聚类中心个数
    """
    # 初始化聚类中心
    centers = data[np.random.choice(data.shape[0], k, replace=False)]
    for j in range(max_iter):
        # dist.shape = [n, k]
        dist = np.linalg.norm(data[:, np.newaxis]-centers, axis=2)
        # labels.shape = (n)
        labels = np.argmin(dist, axis=1)
        # 分别选出数据中属于每个聚类中心的点并求均值作为新的聚类中心
        new_centers = np.array([data[labels == i].mean(axis=0) for i in range(k)])
        # 聚类中心的变化值很小时停止迭代
        if np.all(np.linalg.norm(new_centers-centers, axis=1) < tol):
            print(f"less than tol, break down, iter num {j}")
            break
        centers = new_centers
    return centers, labels


if __name__ == "__main__":
    data = np.random.randn(100, 3)
    centers, labels = k_means(max_iter=100, tol=0.00001, data=data, k=3)
    print(centers)
    print(labels)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

7. beam search算法

类似于top-k

8. Layer Norm

class LayerNorm(nn.Module):
    def __init__(self, dim):
        super(LayerNorm, self).__init__()
        self.alpha = nn.Parameter(torch.ones(dim))
        self.beta = nn.Parameter(torch.zeros(dim))
        self.epsilon = 1e-6

    def forward(self, feature):
        """
        feaure.shape = [batch-size, seq_len, embed_dim]
        """
        mean = torch.mean(feature, dim=-1, keepdim=True)
        std = torch.std(feature, dim=-1, keepdim=True)
        norm = (feature-mean)/(std+self.epsilon)
        out = self.alpha*norm + self.beta
        return out
  
 if __name__ == "__main__":
    logits = torch.randn(2, 3, 5)
    ln = LayerNorm(5)
    print(ln(logits))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

9. Batch Norm

class BatchNorm(nn.Module):
    def __init__(self, momenum=0.01, eps=1e-6, dim=5):
        super(BatchNorm, self).__init__()
        self.run_mean = torch.zeros(dim)
        self.run_std = torch.ones(dim)
        self.momentum = momenum
        self.eps = eps
        self.beta = nn.Parameter(torch.ones(dim))
        self.gamma = nn.Parameter(torch.zeros(dim))

    def forward(self, feature, is_train=True):
        if is_train:
            mean = torch.mean(feature, dim=0)
            std = torch.std(feature, dim=0)
            self.run_mean = self.momentum*self.run_mean + (1-self.momentum)*mean
            self.run_std = self.momentum*self.run_std + (1-self.momentum)*std
            norm = (feature-mean)/(std+self.eps)
        else:
            norm = (feature-self.run_mean)/(self.run_std+self.eps)
        out = self.beta*norm + self.gamma
        return out


if __name__ == "__main__":
    logits = torch.randn(3, 5)
    bn = BatchNorm(momenum=0.01, eps=1e-6, dim=5)
    print(bn(logits, True))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

10. self-attention

# -*- coding = utf-8 -*-
# @Time: 2024/2/24 15:44
# @Author: fun
# @Software: PyCharm

import torch
from torch import nn
import torch.nn.functional as F


class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads

        self.head_dim = self.embed_dim // self.num_heads
        self.q_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.k_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.v_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.out_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=True)

    def forward(self, query, key, value, atten_mask=None, padding_mask=None):
        """

        :param query: [tgt_len, batch_size, embed_dim]
        :param key: [src_len, batch_size, embed_dim]
        :param value: [src_len, batch_size, embed_dim]
        :param atten_mask: decoder部分mask multi head attention
        :param padding_mask: input padding mask
        :return:
        """
        q = self.q_proj(query)   # [tgt_len, bs, embed_dim]
        k = self.k_proj(key)     # [src_len, bs, embed_dim]
        v = self.v_proj(value)   # [src_len, bs, embed_dim]
        tgt_len, batch_size, embed_dim = query.size()
        src_len = key.size()[0]
        head_dim = embed_dim // self.num_heads
        scaling = float(head_dim)**-0.5
        q = q*scaling

        if atten_mask:
            if atten_mask.dim() == 2:
                atten_mask = atten_mask.unsqueeze(0)
                if list(atten_mask.size()) != [1, tgt_len, src_len]:
                    raise RuntimeError("The 2D attention mask is not correct!")
            elif atten_mask.dim() == 3:
                if list(atten_mask.size()) != [batch_size*self.num_heads, tgt_len, src_len]:
                    raise RuntimeError("The 3D attention mask is not correct!")
            else:
                raise RuntimeError("Attention mask dim is not correct")

        q = q.contiguous().view(tgt_len, batch_size*self.num_heads, head_dim).transpose(0, 1)     # [bsz*num_head,
        # tgt_len, head_dim]
        v = v.contiguous().view(src_len, batch_size*self.num_heads, head_dim).transpose(0, 1)     # [bsz*num_head,
        # src_len, head_dim]
        k = k.contiguous().view(src_len, batch_size*self.num_heads, head_dim).transpose(0, 1)
        attn_output_weight = torch.bmm(q, k.transpose(1, 2))    # [bsz*num_heads, tgt_len, src_len]

        if atten_mask is not None:
            attn_output_weight += atten_mask
        if padding_mask is not None:
            attn_output_weight = attn_output_weight.view(batch_size, self.num_heads, tgt_len, src_len)
            attn_output_weight = attn_output_weight.masked_fill(
                padding_mask.unsqueeze(1).unsqueeze(2), float("-inf"))
            attn_output_weight = attn_output_weight.view(batch_size*self.num_heads, tgt_len, src_len)

        attn_output_weight = F.softmax(attn_output_weight, dim=-1)
        attn_output_weight = F.dropout(attn_output_weight, training=True)
        attn_output = torch.bmm(attn_output_weight, v)    # [bsz*num_heads, tgt_len, head_dim]
        attn_output = attn_output.transpose(0, 1).contiguous().view(tgt_len, batch_size, head_dim*self.num_heads)
        z = self.out_proj(attn_output)
        return z

def generate_square_subsequent_mask(size):
    """
    生成 attention mask 矩阵
    :param size: [tgt_len, src_len]
    :return: 
    """
    # torch.triu返回主对角线及其上部的元素,其他元素都置为0
    mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask  # [sz,sz]


if __name__ == "__main__":
    source_len = 5
    bs = 2
    model_dim = 32
    num_head = 2
    src = torch.rand((source_len, bs, model_dim))
    src_padding_mask = torch.tensor([[False, False, False, True, True],
                        [False, False, False, False, True]])

    mha = MultiHeadAttention(embed_dim=model_dim, num_heads=num_head)
    out = mha(src, src, src, padding_mask=src_padding_mask)
    print(out)
    print(out.shape)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

11. 可训练embedding

# -*- coding = utf-8 -*-
# @Time: 2024/2/24 18:49
# @Author: fun
# @Software: PyCharm

from torch import nn
import math


class TokenEmbed(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super(TokenEmbed, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.embed_dim = embed_dim

    def forward(self, tokens):
        """

        :param tokens: [length, bs]
        :return:embedding: [length, bs, embed_dim]
        """
        # 注意:根据论文所述,token embedding 需要进行缩放
        return self.embedding(tokens.long()) * math.sqrt(self.embed_dim)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

12. 三角函数位置编码

在这里插入图片描述

def position_embed(max_len, model_dim):
    pe = torch.zeros((max_len, model_dim))
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)   # [max_len, 1]
    div_term = torch.exp(torch.arange(0, model_dim, 2).float()*(math.log(10000)/model_dim))
    pe[:, 0::2] = torch.sin(position/div_term)
    pe[:, 1::2] = torch.cos(position/div_term)
    return pe
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/975589
推荐阅读
相关标签
  

闽ICP备14008679号