赞
踩
最近看了很多大模型,也使用了很多大模型。对于大模型理论似乎很了解,但是好像又缺点什么,思来想去决定自己动手实现一个 toy 级别的模型,在实践中加深对大语言模型的理解。
在这个系列的文章中,我将通过亲手实践,构建一个 1.2B
的模型,完成模型搭建、tokenizer
训练、模型预训练和指令微调这些流程。记录整个开发过程和其中遇到的各种挑战和对应解决方案。
最后这些内容并不以训练一个足够强大的模型为目标,更多的是走一遍流程,所以里面内容显得十分粗糙。所有的内容都是我对于大模型的理解形成的,如果您发现有任何过时或不准确的地方,请不吝指出。
在使用 Pytorch
训练模型的时候,一个常见的流程就是前向传播、反向传播然后更新梯度,因此我们一步一步完成其中的组件。
现在训练大模型常用的优化器是 AdamW
,它使用一阶动量和二阶动量保持梯度稳定,从而使损失不会过于震荡。这里不详细介绍原理,给出 Pytorch
中的实现。
from torch.optim import AdamW
optimizer = AdamW(
params=model.parameters(),
lr=args.lr,
weight_decay=args.weight_decay,
)
这里我们设置了学习率和权重衰减,详细参数请见官方文档,这里不过多赘述。
学习率调度器可以在训练中动态调度学习率,从而提高训练效率和模型性能。合适的学习率有助于帮助模型脱离局部最优,达到一个更好的最优解。
这里我们采用 warmup
结合余弦退火的学习率调度策略。warmup
学习率预热开始从一个小学习率开始训练,然后再修正为指定的学习率。因为刚刚开始训练时模型权重随机初始化的,此时选择一个较大的学习率可能导致模型训练震荡。
余弦退火就是采用余弦方式对学习率进行衰减,这里我们给出调度器的实现:
from torch.optim.lr_scheduler import CosineAnnealingLR, LambdaLR, SequentialLR def get_lr_warmup(warmup_steps: int): def lr_warmup(current_step: int): return float(current_step) / float(max(1, warmup_steps)) return lr_warmup warmup_steps = xxx cosine_steps = xxx warmup_scheduler = LambdaLR( optimizer=optimizer, lr_lambda=get_lr_warmup(warmup_steps=warmup_steps) ) cosine_scheduler = CosineAnnealingLR(optimizer=optimizer, T_max=cosine_steps) scheduler = SequentialLR( optimizer=optimizer, schedulers=[warmup_scheduler, cosine_scheduler], milestones=[warmup_steps], )
在 warmup
阶段,我们采用线性递增的方式慢慢增大学习率,由于 Pytorch
没有相关实现,因此我们需要自己定义学习率调度的函数。
余弦退火阶段中 T_max
指定一个波峰到波谷的周期,也就是退火阶段迭代次数。最后将两个调度器组合起来,组成我们希望的调度器。
这次依然选择其作为我们的预训练数据集,在这里就要使用上一章训练的分词器。
首先加载数据集
from datasets import load_dataset, Dataset
dataset: Dataset = load_dataset(
"json",
data_files=[
"nlp_datas/part-000020-a894b46e.jsonl.tar.gz",
"nlp_datas/part-000065-a894b46e.jsonl.tar.gz",
],
split="train",
)
当然如果需要加载更大的数据,可以指定参数 streaming=True
减少加载的内存占用。加载之后就需要对数据集进行分词,得到模型需要的输入 input_ids
和 attention_mask
。
from tokenization_custom import CustomTokenizer
tokenizer = CustomTokenizer.from_pretrained("tokenizer")
tokenized_dataset = dataset.map(
lambda x: tokenizer(x["content"], truncation=True, max_length=2048),
batched=True,
remove_columns=dataset.column_names,
)
注意在这里我们只对文本进行了截断,但是没有对文本做填充,这样得到的文本可能是长短不一的。我们不需要对整体进行填充,这样会按照整体最大长度填充,占用大量存储,我们只需要对当时送入模型的一批数据进行填充即可。
因为我们采用的是预测下一个 token,因此 labels
和 input_ids
相同,同时我们不希望 padding_token
参与计算损失,因为这是无意义的,所以对于 padding_token
的位置,对应的 label 是 -100,这里采用 DataCollatorForLanguageModeling
可以方便的完成这项操作。
这些都准备好,就可以得到 data_loader
,通过遍历 data_loader
就可以方便进行模型训练了。
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
data_loader = DataLoader(
dataset=dataset,
batch_size=args.batch_size,
shuffle=True,
collate_fn=data_collator,
num_workers=8,
)
有了上面的各种组件,就可以进行训练了,首先确定模型结构,这里采用一个三层的结构。
from config import CustomConfig
from modeling_custom import CustomForCausalLM
config = CustomConfig(
vocab_size=len(tokenizer.get_vocab()),
max_position_embeddings=2048,
hidden_size=4096,
intermediate_size=16384,
num_hidden_layers=3,
pad_token_id=tokenizer.pad_token_id,
)
model = CustomForCausalLM(config)
这里简单实现一个函数来计算模型参数量
def get_model_size(model: nn.Module):
"""
获取模型参数量
"""
return sum(p.numel() for p in model.parameters())
模型准备好后就可以进行模型训练,下面是一个简单的训练流程:
for epoch in range(args.epochs): for idx, batch in enumerate(data_loader): batch = {k: v.to(device) for k, v in batch.items()} outputs = model(**batch) logits, loss = outputs # 反向传播 loss.backward() # 梯度裁剪 torch.nn.utils.clip_grad_norm_( model.parameters(), max_norm=args.max_norm ) # 梯度更新 optimizer.step() # 学习率更新 scheduler.step() # 清除梯度 optimizer.zero_grad()
对于一个 1.2B
模型,模型权重、优化器状态和梯度三部分大约占用显存计算如下:
1.2×109×4(FP32)10243×4≈17.88GB\frac{1.2 \times 10^9 \times 4(FP32)}{1024^3} \times 4 \approx 17.88GB102431.2×109×4(FP32)×4≈17.88GB
这里简单计算一下中间激活值占用显存,假设 batchsize
为16,这一批 padding 之后的长度为512,因此 input_ids
的大小为 (16,512)。
embedding 层的结果大约 32M,一层 Attention 层结果大约 128M,一层前馈网络结果大约 288M,最后词汇表投影大约 445M。这个模型中使用 3 层解码器层,不考虑层归一化的中间结果,这个模型总共中间结果大约有 1725M结果,每个结果占用 4Bytes,则最后总共显存占用大约 6.7GB。
这是长度为 512 的情况,实际上我的训练文本中大量存在 2k 左右文本,它会使占用显存成倍数增加,假设一个 2k 的文本,则显存占用会扩展到 26.8GB。
上面最理想的情况,实际计算中还会产生各种变量占用显存,很快就会导致显存溢出而从无法训练。幸运的是在实现模型结构时加入了梯度检查点,只需要保存关键节点的中间结果,反向传播时重新从最近节点开始计算即可,这样大大节省了显存。
在这个模型中只需要调用 model.enable_gradient_checkpoint()
即可开启梯度检查点。
除了梯度检查点,还可以通过减少 batchsize
来减少中间激活值占用显存,但是减少批量大小可能导致损失震荡无法收敛,这里我们采用多步累加解决这个问题,在一个小批次反向传播计算梯度之后,先不更新权重和清除梯度,而是累计多个小批次之后一起更新然后清除梯度。
最后还可以采用混合精度训练,这样不仅能加快训练速度还能显著减少中间激活值空间占用。
有了以上策略,可以尝试愉快训练模型了,训练前为了方便修改配置,我们进行一些封装,同时添加一些日志信息,方便最后观测整个训练过程,这里直接给出最后的代码。
import json import os import random from dataclasses import dataclass from typing import Optional, Union import numpy as np import torch import torch.nn as nn from datasets import Dataset from torch.cuda.amp import GradScaler, autocast from torch.optim import AdamW from torch.optim.lr_scheduler import CosineAnnealingLR, LambdaLR, SequentialLR from torch.utils.data import DataLoader from tqdm import tqdm from transformers import DataCollatorForLanguageModeling from config import CustomConfig from modeling_custom import CustomForCausalLM from tokenization_custom import CustomTokenizer from utils import get_model_size SEED = 42 def set_seed(seed: int): torch.manual_seed(seed=seed) torch.cuda.manual_seed(seed=seed) torch.cuda.manual_seed_all(seed=seed) np.random.seed(seed=seed) random.seed(seed) def get_lr_warmup(warmup_steps: int): def lr_warmup(current_step: int): return float(current_step) / float(max(1, warmup_steps)) return lr_warmup @dataclass class TrainingArgs: output_dir: str logging_steps: int = 500 saving_steps: int = 500 batch_size: int = 1 epochs: int = 3 lr: float = 1e-4 weight_decay: float = 1e-4 max_norm: float = 1.0 warm_up_ratio: float = 0.1 gradient_checkpointing: bool = False gradient_accumulation_steps: int = 24 def train( model: nn.Module, args: TrainingArgs, dataset: Dataset, device: Optional[Union[str, torch.device]] = None, data_collator=None, ): data_loader = DataLoader( dataset=dataset, batch_size=args.batch_size, shuffle=True, collate_fn=data_collator, num_workers=8, ) # 完整的有效步 complete_steps_per_epoch = len(data_loader) // args.gradient_accumulation_steps # 不完整的有效步,最后剩余的小批量 last_mini_steps = len(data_loader) % args.gradient_accumulation_steps # 一个 epoch 等效步 if last_mini_steps != 0: steps_per_epoch = complete_steps_per_epoch + 1 else: steps_per_epoch = complete_steps_per_epoch total_steps = steps_per_epoch * args.epochs # 优化器 optimizer = AdamW( params=model.parameters(), lr=args.lr, weight_decay=args.weight_decay, ) # 学习率调度 warmup_steps = int(total_steps * args.warm_up_ratio) cosine_steps = total_steps - warmup_steps warmup_scheduler = LambdaLR( optimizer=optimizer, lr_lambda=get_lr_warmup(warmup_steps=warmup_steps) ) cosine_scheduler = CosineAnnealingLR(optimizer=optimizer, T_max=cosine_steps) scheduler = SequentialLR( optimizer=optimizer, schedulers=[warmup_scheduler, cosine_scheduler], milestones=[warmup_steps], ) # 设备 if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" os.makedirs(args.output_dir, exist_ok=True) model = model.to(device=device) if args.gradient_checkpointing: model.enable_gradient_checkpoint() loggin_info = [] current_step = 0 progress_bar = tqdm(range(total_steps)) scaler = GradScaler() for epoch in range(args.epochs): current_loss = 0.0 for idx, batch in enumerate(data_loader): batch = {k: v.to(device) for k, v in batch.items()} if last_mini_steps == 0 or len(data_loader) - (idx + 1) > last_mini_steps: current_accumulation = args.gradient_accumulation_steps else: current_accumulation = last_mini_steps with autocast(dtype=torch.bfloat16): outputs = model(**batch) logits, loss = outputs loss /= current_accumulation current_loss += loss.item() # 反向传播 scaler.scale(loss).backward() if (idx + 1) % args.gradient_accumulation_steps == 0 or (idx + 1) == len( data_loader ): # 梯度裁剪 scaler.unscale_(optimizer=optimizer) torch.nn.utils.clip_grad_norm_( model.parameters(), max_norm=args.max_norm ) # 梯度更新 scaler.step(optimizer=optimizer) # 更新缩放因子 scaler.update() # 学习率更新 scheduler.step() # 清除梯度 optimizer.zero_grad() progress_bar.update(1) current_step += 1 if current_step % args.logging_steps == 0: current_epochs = current_step / steps_per_epoch info = { "Epoch": f"{current_epochs:.2f}/{args.epochs}", "Step": f"{current_step}/{total_steps}", "Loss": current_loss, "LR": scheduler.get_last_lr()[0], } loggin_info.append(info) print(info) if current_step % args.saving_steps == 0: ckpt_path = os.path.join( args.output_dir, f"checkpoint-{current_step}.pt", ) torch.save(model.state_dict(), ckpt_path) current_loss = 0.0 ckpt_path = os.path.join( args.output_dir, "last.pt", ) torch.save(model.state_dict(), ckpt_path) with open("logging.jsonl", "w", encoding="utf-8") as fw: for logging_data in loggin_info: fw.write(json.dumps(logging_data) + "\n") if __name__ == "__main__": set_seed(SEED) tokenizer = CustomTokenizer.from_pretrained("tokenizer") config = CustomConfig( vocab_size=len(tokenizer.get_vocab()), max_position_embeddings=2048, hidden_size=4096, intermediate_size=16384, num_hidden_layers=3, pad_token_id=tokenizer.pad_token_id, ) model = CustomForCausalLM(config) print(f"Model size is {get_model_size(model)}") dataset = Dataset.load_from_disk("nlp_datas/cached") data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) args = TrainingArgs( output_dir="result", gradient_checkpointing=True, batch_size=4, logging_steps=50, warm_up_ratio=0.03, epochs=1, gradient_accumulation_steps=8, lr=1e-3, weight_decay=1e-5, ) train(model=model, args=args, dataset=dataset, data_collator=data_collator)
至此我们成功完成了模型训练,为其注入了先验知识。现在它拥有各种工具,但是无法进行使用,后面我们进行 sft 教模型如何使用这些工具。
我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。
我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
第一阶段: 从大模型系统设计入手,讲解大模型的主要方法;
第二阶段: 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用;
第三阶段: 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统;
第四阶段: 大模型知识库应用开发以LangChain框架为例,构建物流行业咨询智能问答系统;
第五阶段: 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型;
第六阶段: 以SD多模态大模型为主,搭建了文生图小程序案例;
第七阶段: 以大模型平台应用与开发为主,通过星火大模型,文心大模型等成熟大模型构建大模型行业应用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。