赞
踩
Accelerate
是一个库,只需添加四行代码,就可以在任何分布式 configuration
中运行相同的 PyTorch
代码:
+ from accelerate import Accelerator
+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+ model, optimizer, training_dataloader, scheduler
+ )
for batch in training_dataloader:
optimizer.zero_grad()
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
+ accelerator.backward(loss)
optimizer.step()
scheduler.step()
上述代码可以通过 Accelerate
的 CLI
接口在任何系统上启动:
accelerate launch {my_script.py}
安装:
pip install accelerate
conda install -c conda-forge accelerate
pip install git+https://github.com/huggingface/accelerate
配置:
accelerate config
然后 accelerate
会向你询问一些问题从而生成配置。
检查配置:
accelerate env
为了使用 Accelerate
,你只需要修改四件事:
首先,导入 Accelerator
并创建一个 accelerator
对象:
from accelerate import Accelerator
accelerator = Accelerator()
然后,移除针对你的模型和输入数据的所有 .to(device)
或 .cuda()
的调用。accelerator
将会为你正确处理这个问题,并为你把所有这些对象放在正确的设备上。
如果你知道你在做什么,你可以保留那些 .to(device)
的调用,但你应该使用 accelerator
对象提供的设备: accelerator.device
。
要完全停用自动的 device placement
,在初始化 Accelerator
时传递 device_placement=False
。
接着,将所有与训练有关的对象(optimizer, model, training dataloader, learning rate scheduler
)传递给accelerator.prepare()
方法。这将确保一切都为训练做好准备。
model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, lr_scheduler
)
具体而言,training dataloader
将被分片到所有可用的 GPU/TPU
核心上,这样每个设备都能看到训练数据集的不同部分。此外,所有进程的随机数状态将在每次迭代开始时通过 dataloader
进行同步,以确保数据以相同的方式被混洗(如果你决定使用shuffle=True
或任何类型的 random sampler
)。
训练的实际 batch size
将是使用的设备数量乘以你在脚本中设置的 batch size
。另外,你可以在创建 accelerator
对象时使用 split_batches=True
参半,此时无论你在多少个 GPU
上运行你的脚本,实际 batch size
都会保持不变。
你需要再开始实际的 training loop
之前执行 accelerator.prepare()
。
只有当 scheduler
需要再每个 optimizer step
中被 stepped
时,才需要把 learning rate scheduler
传递给 prepare()
。
任何获取 training dataloader length
的方法(例如,你需要记录 total training step
)都应该在 accelerator.prepare()
之后进行。
你可能想、也可能不想把你的validation dataloader
发送到 prepare()
,这取决于你是否想运行分布式评估。
最后,用 accelerator.backward(loss)
替换 loss.backward()
。
现在,你的脚本将在你的本地机器上运行,也可以在多个 GPU
或 TPU
上运行。你可以使用你喜欢的工具来启动分布式训练,或者你可以使用 Accelerate launcher
启动。
分布式评估:
可以进行常规评估,此时你需要将 validation dataloader
保持在 accelerator.prepare()
之外。并且,你需要手动将 input
数据放在 accelerator.device
上。
也可以进行分布式评估,此时你需要将 validation dataloader
放置在 accelerator.prepare()
之内:
validation_dataloader = accelerator.prepare(validation_dataloader)
就像 training dataloader
,这意味着在分布式评估时,每个设备将仅看到部分 evaluation
数据。这意味着你需要把 predictions
进行 group
。可以通过 accelerator.gather_for_metrics()
方法来实现:
for inputs, targets in validation_dataloader:
predictions = model(inputs)
# Gather all predictions and targets
all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
# Example of use with a Datasets.Metric
metric.add_batch(all_predictions, all_targets)
类似 training dataloader
,把 validation dataloader
传入 prepare()
可能会改变该 dataloader
:如果你在 � 个 GPU
上运行,则它的长度将被除以 � (因为你的实际 batch size
将被乘以 � ),除非你设置 split_batches=True
。
任何获取 validation dataloader length
的方法都应该在 accelerator.prepare()
之后进行。
数据集末尾的一些数据可能是重复的,所以这个 batch
的数据可以平均分配给所有的工作者。因此,应该通过gather_for_metrics()
方法计算指标,以便在收集时自动删除重复的数据。如果出于某种原因,你不希望自动完成这项工作,可以用 accelerator.gather()
来收集所有进程的数据,然后手动完成。
gather()
和 gather_for_metrics()
要求每个进程上的张量是相同尺寸的。如果你在每个进程上有不同尺寸的张量(例如,当动态填充到一个 batch
的最大长度时),你应该使用 accelerator.gather.pad_across_processes()
方法将张量填充到跨进程的最大尺寸。
启动分布式脚本:你可以使用常规命令来启动你的分布式训练(如 PyTorch
的 torch.distributed.launch
),它们与 Accelerate
完全兼容。这里唯一需要注意的是: Accelerate
使用 environment
来确定所有有用的信息,所以 torch.distributed.launch
应与标志 --use_env
一起使用。
Accelerate
还提供了一个 CLI
工具,它统一了所有的 launcher
,所以你只需要记住一个命令:
accelerate config
你需要回答问题,然后 Accelerate
将在你的 cache folder
创建一个 default_config.yaml
文件。这个缓存目录是(根据优先级递减):
HF_HOME
的内容,以 accelerate
为后缀。XDG_CACHE_HOME
的内容,以 huggingface/accelerate
为后缀。~/.cache/huggingface/accelerate
。你也可以通过标志 --config_file
来指定你要保存的文件的位置。
然后,你可以通过运行来测试你的设置是否一切顺利:
accelerate test
这将启动一个简短的脚本,测试分布式环境。你也可以在测试期间指定配置文件的位置:
accelerate test --config_file path_to_config.yaml
如果测试通过,你可以通过如下的命令来执行你的脚本:
accelerate launch path_to_script.py --args_for_the_script
也可以指定配置文件的位置:
accelerate launch --config_file path_to_config.yaml path_to_script.py --args_for_the_script
从 notebook
中启动:在 Accelerate 0.3.0
中引入了一个 notebook_launcher()
从而帮助你在 notebook
上启动训练。
只要在 notebook
的一个 cell
中定义一个负责整个 train and/or evaluation
的函数,然后用以下代码执行一个 cell
:
from accelerate import notebook_launcher
notebook_launcher(training_function)
注意:你的 Accelerator
对象应该只在 training_function
中定义,这是因为初始化应该只在 launcher
内完成。
在 TPU
上训练:如果你想在 TPU
上启动你的脚本,有一些注意事项是你应该注意的。在幕后,TPU
将为你的 training step
(前向传播、反向传播、以及 optimizer step
)中发生的所有操作创建一个 graph
。这就是为什么你的第一个训练步总是非常长,因为建立和编译这个 graph
需要一些时间。
好消息是,这个编译将被缓存,所以第二步和所有后续的 step
将更快。坏消息是,这只适用于你的所有 step
做完全相同的操作,这意味着:
batch
必须有用相同的张量尺寸。step
中存在循环,那么循环次数在每个 step
必须相同)。如果上述任何一项在两个 step
之间发生变化,都会触发新的编译,这将再次花费大量时间。在实践中,这意味着:你必须特别注意让你的输入中的所有张量具有相同的形状(所以没有动态填充),并且不应该使用具有 for
循环的层,其中 for
循环的根据 input
的不同而具有不同长度(如 LSTM
)。否则,训练会慢得令人难受。
可以针对 TPU
执行一些特殊的代码:
from accelerate import DistributedType
if accelerator.distributed_type == DistributedType.TPU:
# do something of static shape
else:
# go crazy and be dynamic
最后要注意的是:如果你的模型有 tied weight
(比如语言模型将 embedding matrix
的权重与 decoder
的权重绑定),将这个模型移动到 TPU
(无论是你自己移动、还是由 prepare()
移动)会破坏绑定。你将需要在之后重新绑定权重。
在单个进程上执行的语句:有些语句只需要在特定的进程上执行而无需在所有进程上执行,如数据下载、记录日志、以及打印进度条。此时可以执行:
if accelerator.is_local_main_process:
# Is executed once per server
from tqdm.auto import tqdm
progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)
local
意思是每台机器上运行:如果你在两台服务器上训练,其中每台服务器有几个 GPU
,则代码将在每台服务器上执行一次。
如果你希望对所有进程仅执行一次(如,上传模型到 model hub
),则可以执行:
if accelerator.is_main_process:
# Is executed once only
对于 print
语句,你希望在每台机器上执行一次,则可以用 accelerator.print
代替 print
函数。
延迟执行:当你运行你的常规脚本时,指令是按顺序执行的。使用 Accelerate
在几个 GPU
上同时部署你的脚本会带来一个复杂的问题:虽然每个进程都是按顺序执行所有指令,但有些可能比其他的快。
你可能需要等待所有进程达到一定程度后再执行某条指令。例如,在确定每个进程都完成了训练之前,你不应该保存一个模型。要做到这一点,可以执行:
accelerator.wait_for_everyone()
这条指令将阻塞所有先到的进程,直到所有其他进程都到达该点(如果你只在一个 GPU
或 CPU
上运行你的脚本,这不会有任何作用)。
保存/加载模型:保存训练好的模型可能需要一些调整:
首先,你应该等待所有的进程到达脚本中的 “延迟执行” 所描述的那个点。
然后,你应该在保存模型之前 unwrap
你的模型。这是因为在通过 prepare()
方法时,你的模型可能被 wrap
从而用于分布式训练。如:
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model.state_dict(), filename)
如果你的脚本包含加载 checkpoint
的逻辑,我们也建议你在 unwrapped model
中加载你的权重(这只在 prepare()
后使用加载函数时有用)。如:
unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.load_state_dict(torch.load(filename))
保存/加载整个状态:当训练你的模型时,你可能想保存模型、优化器、随机数生成器、以及潜在的 LR scheduler
的当前状态,以便在同一个脚本中恢复训练。你可以分别使用 save_state()
和 load_state()
来做到这一点,只需简单地传入一个保存位置。
如果你通过 register_for_checkpointing()
注册了任何其他需要存储的 stateful item
,它们也会被保存和/或加载。
示例:
from accelerate import Accelerator
import torch
accelerator = Accelerator()
my_scheduler = torch.optim.lr_scheduler.StepLR(my_optimizer, step_size=1, gamma=0.99)
my_model, my_optimizer, my_training_dataloader = accelerate.prepare(my_model, my_optimizer, my_training_dataloader)
# Register the LR scheduler
accelerate.register_for_checkpointing(my_scheduler)
# Save the starting state
accelerate.save_state("my/save/path")
device = accelerator.device
my_model.to(device)
# Perform training
for epoch in range(num_epochs):
for batch in my_training_dataloader:
my_optimizer.zero_grad()
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = my_model(inputs)
loss = my_loss_function(outputs, targets)
accelerator.backward(loss)
my_optimizer.step()
my_scheduler.step()
# Restore previous state
accelerate.load_state("my/save/path")
梯度裁剪:如果你在脚本中使用梯度剪裁,你应该把对 torch.nn.utils.clip_grad_norm_
或 torch.nn.utils.clip_grad_value_
的调用分别替换为 accelerator.clipgrad_norm()
和 accelerator.clipgrad_value()
。
混合精度训练:如果你用 Accelerate
在混合精度下训练,那么模型内的计算将以混合精度进行,而模型外的每一次计算都将以 full precision
执行。例如,loss
的计算通常在模型外,且涉及 softmax
。然而,你可能想把你的 loss
计算放在 accelerator.autocast
上下文管理器中:
with accelerator.autocast():
loss = complex_loss_function(outputs, target)
混合精度训练的另一个注意事项是:梯度会在开始时跳过一些更新,有时在训练过程中也会跳过。这是因为动态损失缩放 dynamic loss scaling
策略,在训练过程中会有一些时刻,梯度已经溢出,loss scaling factor
会减少,从而避免在下一步再次发生这种情况。
这意味着你可能会在没有梯度更新的时候就更新你的 learning rate scheduler
。这在一般情况下是没有问题的,但是当你的训练数据非常少,或者你的 scheduler
的第一个学习率值非常重要时,可能会有影响。在这种情况下,你可以跳过 learning rate scheduler
的更新:
if not accelerator.optimizer_step_was_skipped:
lr_scheduler.step()
梯度累积:要执行梯度累积,请使用 accumulate()
并指定 gradient_accumulation_steps
。在多设备训练时,这也会自动确保梯度同步或不同步,检查是否真的应该执行该 step
,并自动计算损失:
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)
for input, label in training_dataloader:
with accelerator.accumulate(model):
predictions = model(input)
loss = loss_function(predictions, label)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
相比之下,传统的梯度累加方法会用更冗长的代码:
+ from accelerate import Accelerator
+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+ model, optimizer, training_dataloader, scheduler
+ )
for index, batch in enumerate(training_dataloader):
inputs, targets = batch
- inputs = inputs.to(device)
- targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss = loss / gradient_accumulation_steps
+ accelerator.backward(loss)
if (index+1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
DeepSpeed
:DeepSpeed
支持是实验性的,所以底层 API
将在不久的将来发展,可能会有一些轻微的破坏性变化。具体而言, Accelerate
还不支持你自己编写的 DeepSpeed
配置,这将在下一个版本中添加。
使用 accelerate launch
:
accelerate launch {script_name.py} --arg1 --arg2 ...
指定单个 GPU
:
CUDA_VISIBLE_DEVICES="0" accelerate launch {script_name.py} --arg1 --arg2 ...
在两个 GPU
上混合精度训练:
accelerate launch --multi_gpu --mixed_precision=fp16 --num_processes=2 {script_name.py} {--arg1} {--arg2} ...
建议总是在 accelerate launch
之前执行 accelerate config
,这样就无需再 accelerate launch
中指定各种配置。
在 notebook
中 launch
:
CUDA
的代码在一个函数中,该函数被传递给 notebook_launcher()
。num_processes
为训练的设备数量(如,GPU, CPU, TPU
数量)。TPU
,在 training loop
函数之外声明你的模型。如:
from accelerate import notebook_launcher
args = ("fp16", 42, 64)
notebook_launcher(training_loop, args, num_processes=2)
对于 TPU
:
model = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id))
args = (model, "fp16", 42, 64)
notebook_launcher(training_loop, args, num_processes=8)
启用 FSDP
:
首先进行配置:accelerate config
。FSDP
配置的一个例子:
compute_environment: LOCAL_MACHINE
deepspeed_config: {}
distributed_type: FSDP
downcast_bf16: 'no'
fsdp_config:
fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
fsdp_backward_prefetch_policy: BACKWARD_PRE
fsdp_offload_params: false
fsdp_sharding_strategy: 1
fsdp_state_dict_type: FULL_STATE_DICT
fsdp_transformer_layer_cls_to_wrap: GPT2Block
machine_rank: 0
main_process_ip: null
main_process_port: null
main_training_function: main
mixed_precision: 'no'
num_machines: 1
num_processes: 2
use_cpu: false
然后开始训练:
accelerate launch examples/nlp_example.py
这些配置参数的含义为:
Sharding Strategy
:
FULL_SHARD
:对 optimizer states, gradients, parameters
都进行分片。SHARD_GRAD_OP
:仅对 optimizer states, gradients
进行分片。NO_SHARD
:不进行分片。Offload Params
:一个布尔值,指定是否将 parameters
和 gradients
卸载到 CPU
。
Auto Wrap Policy
:可以为 TRANSFORMER_BASED_WRAP, SIZE_BASED_WRAP, NO_WRAP
。
Transformer Layer Class to Wrap
:当使用 TRANSFORMER_BASED_WRAP
时,指定特定的 transformer layer class name
(大小写敏感)从而执行 wrap
。如 BertLayer, GPTJBlock, T5Block,...
。
Min Num Params
:使用 SIZE_BASED_WRAP
的最小参数数量。
Backward Prefetch
:可以为 BACKWARD_PRE, BACKWARD_POST, NO_PREFETCH
。
State Dict Type
:可以为 FULL_STATE_DICT, LOCAL_STATE_DICT, SHARDED_STATE_DICT
。
有几个需要注意的地方:
PyTorch FSDP
会自动 wrap
子模块,对参数进行扁平化处理,并将参数分片。由于这个原因,任何在 model wrapping
之前创建的 optimizer
都会被破坏,并占用更多的内存。因此,强烈建议在创建 optimizer
之前准备好模型,这也是很有效的。Accelerate
将自动 wrap
模型,并在单个模型的情况下为你创建一个优化器,并发出警告信息:
FSDP Warning: When using FSDP, it is efficient and recommended to call prepare for the model before creating the optimizer.
下面是使用 FSDP
时准备模型和优化器的推荐方法:
model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True)
+ model = accelerator.prepare(model)
optimizer = torch.optim.AdamW(params=model.parameters(), lr=lr)
- model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
- model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
- )
+ optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
+ optimizer, train_dataloader, eval_dataloader, lr_scheduler
+ )
在单个模型的情况下,如果你用多个 parameter groups
创建了优化器,并且用它们一起调用 prepare
,那么 parameter groups
将被丢失,并显示以下警告:
FSDP Warning: When using FSDP, several parameter groups will be conflated into a single one due to nested module wrapping and parameter flattening.
这是因为,由于嵌套的 FSDP
模块的参数扁平化为一维数组,在 wrapping
前创建的 parameter groups
在 wrapping
后将没有意义。
在有多个模型的情况下,有必要在创建优化器之前准备好模型,否则会抛出一个错误。然后将优化器以与相应模型相同的顺序传递给 prepare()
方法,否则 accelerator.save_state()
和 accelerator.load_state()
将导致错误/意外的行为。
这个功能与 Transformers library
的 run_translation.py
脚本中的 --predict_with_generate
不兼容。
对于更多的控制,用户可以利用 FullyShardedDataParallelPlugin
。在创建这个类的实例后,用户可以把它传递给 Accelerator
类的实例。
启用 DeepSpeed
:DeepSpeed
实现了 ZeRO
论文中描述的一切。目前,它提供了如下的支持:Optimizer state partitioning
(ZeRO stage 1
)、Gradient partitioning
(ZeRO stage 2
)、Parameter partitioning
(ZeRO stage 3
)、Custom mixed precision training handling
、一系列基于 CUDA
扩展的快速优化器、ZeRO-Offload
到 CPU
和 Disk/NVMe
。
DeepSpeed ZeRO-2
主要只用于训练,因为它的功能对推理没有用处。DeepSpeed ZeRO-3
也可以用于推理,因为它允许在多个 GPU
上加载巨大的模型。
Accelerate
通过两种方式集成 DeepSpeed
:
deepspeed
配置文件来集成。它支持 DeepSpeed
的所有核心功能,并为用户提供了很大的灵活性。用户可能需要根据配置来改变几行代码。deepspeed_plugin
来集成。这支持 DeepSpeed
功能的子集,并对其余的配置使用默认选项。用户不需要改变任何代码。什么被集成了?
训练:DeepSpeed ZeRO
训练支持完整的 ZeRO stages 1, 2 and 3
、以及 optimizer states, gradients and parameters
的 CPU/Disk offload
。
Stage 1
:将 optimizer states
分片到数据并行 workers/GPUs
上。Stage 2
:将 optimizer states + gradients
分片到数据并行 workers/GPUs
上。Stage 3
:将 optimizer states + gradients + model parameters
分片到数据并行 workers/GPUs
上。Optimizer Offload
:将 optimizer states + gradients
卸载到 CPU/Disk
,建立在 ZERO Stage 2
之上。Param Offload
:将 model parameters
卸载到 CPU/Disk
,建立在 ZERO Stage 3
之上。注意:关于 Disk Offload
,磁盘应该是 NVME
的,以便有好的速度,但技术上可以在任何磁盘上工作。
推断:DeepSpeed ZeRO Inference
支持 ZeRO Stage 3
与 ZeRO-Infinity
。它使用与训练相同的 ZeRO
协议,但它不使用优化器和 lr scheduler
。
如何工作:
首先安装 DeepSpeed version >=0.6.5
。
然后配置:accelerate config
。一个配置的例子:
compute_environment: LOCAL_MACHINE
deepspeed_config:
gradient_accumulation_steps: 1
gradient_clipping: 1.0
offload_optimizer_device: none
offload_param_device: none
zero3_init_flag: true
zero_stage: 2
distributed_type: DEEPSPEED
fsdp_config: {}
machine_rank: 0
main_process_ip: null
main_process_port: null
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 2
use_cpu: false
最后执行训练:accelerate launch examples/nlp_example.py
。
配置参数的含义:
zero_stage
:0
表示禁用,1
表示 optimizer state partitioning
,2
表示 optimizer+gradient state partitioning
,3
表示 optimizer+gradient+parameter partitioning
。gradient_accumulation_steps
:一个整数,表示在 averaging
和 applying
这些梯度之前,积累梯度的 training steps
数量。gradient_clipping
:一个浮点数,指定启用梯度剪裁的值。offload_optimizer_device
:none
表示禁用 optimizer offloading
,cpu
表示 offload optimizer
到 CPU
,nvme
表示offload optimizer
到 NVMe SSD
。仅适用于 ZeRO >= Stage-2
。offload_param_device
:none
表示禁用 parameter offloading
,cpu
表示 offload parameter
到 CPU
,nvme
表示offload parameter
到 NVMe SSD
。仅适用于 ZeRO Stage-3
。zero3_init_flag
:决定是否启用 deepspeed.zero.Init
来构建大规模模型。只适用于 ZeRO Stage-3
。zero3_save_16bit_model
:决定是否在使用 ZeRO Stage-3
时保存 16
位模型权重。mixed_precision
:no
用于 FP32
训练,fp16
用于 FP16
混合精度训练,bf16
用于 BF16
混合精度训练。当使用配置文件时,需要修改一些代码:
DeepSpeed Optimizers and Schedulers
:
如果是 DeepSpeed Optim + DeepSpeed Scheduler
:用户必须使用 enhance.utils.DummyOptim
和enhance.utils.DummyScheduler
来取代他们代码中的 PyTorch/Custom
优化器和调度器:
# Creates Dummy Optimizer if `optimizer` was spcified in the config file else creates Adam Optimizer
optimizer_cls = (
torch.optim.AdamW
if accelerator.state.deepspeed_plugin is None
or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config
else DummyOptim
)
optimizer = optimizer_cls(optimizer_grouped_parameters, lr=args.learning_rate)
# Creates Dummy Scheduler if `scheduler` was spcified in the config file else creates `args.lr_scheduler_type` Scheduler
if (
accelerator.state.deepspeed_plugin is None
or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config
):
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=args.num_warmup_steps,
num_training_steps=args.max_train_steps,
)
else:
lr_scheduler = DummyScheduler(
optimizer, total_num_steps=args.max_train_steps, warmup_num_steps=args.num_warmup_steps
)
Custom Optim + Custom Scheduler
:当 DeepSpeed
配置文件中没有 optimizer key
和 scheduler key
的情况。在这种情况下,不需要用户修改代码,通过 DeepSpeed Plugin
使用集成时就是这种情况。
Custom Optim + DeepSpeed Scheduler
:这种情况下,用户必须使用accelerate.utils.DummyScheduler
来替换代码中的 PyTorch/Custom scheduler
。
DeepSpeed Optim + Custom Scheduler
:这将导致一个错误,因为当使用 DeepSpeed Optim
时必须使用 DeepSpeed Scheduler
。
DeepSpeed
配置文件中存在一些 "auto"
值,这些值是由 prepare
方法根据所提供的模型、dataloaders
、dummy optimizer
和 dummy schedulers
自动处理的。那些不是 "auto"
的字段必须由用户明确指定。如 zero_stage2_config.json
文件:
{
"fp16": {
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": "auto",
"weight_decay": "auto",
"torch_adam": true,
"adam_w_mode": true
}
},
"scheduler": {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": "auto",
"warmup_max_lr": "auto",
"warmup_num_steps": "auto",
"total_num_steps": "auto"
}
},
"zero_optimization": {
"stage": 2,
"allgather_partitions": true,
"allgather_bucket_size": 2e8,
"overlap_comm": true,
"reduce_scatter": true,
"reduce_bucket_size": "auto",
"contiguous_gradients": true
},
"gradient_accumulation_steps": 1,
"gradient_clipping": "auto",
"steps_per_print": 2000,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": false
}
保存和加载:
对于 ZeRO Stage-1
和 ZeRO Stage-2
,模型的保存和加载不需要改动。
对于 ZeRO Stage-3
,state_dict
仅只包含占位符,因为模型的权重被分片到多个 GPU
。ZeRO Stage-3
有两个选项:
保存整个 16
位的模型权重,然后使用 model.load_state_dict(torch.load(pytorch_model.bin))
来直接加载。为此,要么在 DeepSpeed
配置文件中把 zero_optimization.stage3_gather_16bit_weights_on_model_save
设为True
,要么在 DeepSpeed Plugin
中把 zero3_save_16bit_model
设为 True
。
请注意,这个选项需要在一个 GPU
上整合权重,这可能会很慢,而且对内存要求很高,所以只有在需要时才使用这个功能。
示例:
unwrapped_model = accelerator.unwrap_model(model)
# New Code #
# Saves the whole/unpartitioned fp16 model when in ZeRO Stage-3 to the output directory if
# `stage3_gather_16bit_weights_on_model_save` is True in DeepSpeed Config file or
# `zero3_save_16bit_model` is True in DeepSpeed Plugin.
# For Zero Stages 1 and 2, models are saved as usual in the output directory.
# The model name saved is `pytorch_model.bin`
unwrapped_model.save_pretrained(
args.output_dir,
is_main_process=accelerator.is_main_process,
save_function=accelerator.save,
state_dict=accelerator.get_state_dict(model),
)
为了获得 32
位的权重,首先使用 model.save_checkpoint()
保存模型:
success = model.save_checkpoint(PATH, ckpt_id, checkpoint_state_dict)
status_msg = "checkpointing: PATH={}, ckpt_id={}".format(PATH, ckpt_id)
if success:
logging.info(f"Success {status_msg}")
else:
logging.warning(f"Failure {status_msg}")
这将在 checkpoint
目录下创建 ZeRO model
和 optimizer
的 partitions
以及 zero_to_fp32.py
脚本。你可以使用这个脚本来做离线整合,这不需要配置文件或 GPU
。如:
cd /path/to/checkpoint_dir
./zero_to_fp32.py . pytorch_model.bin
# Processing zero checkpoint at global_step1
# Detected checkpoint of type zero stage 3, world_size: 2
# Saving fp32 state dict to pytorch_model.bin (total_numel=60506624)
要想加载 32
位的模型,做法如下:
from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint
unwrapped_model = accelerator.unwrap_model(model)
fp32_model = load_state_dict_from_zero_checkpoint(unwrapped_model, checkpoint_dir)
如果你仅仅想得到 state_dict
,做法如下:
from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir)
注意,加载时需要大约 2
倍于 final checkpoint
大小的内存。
ZeRO Inference
:DeepSpeed ZeRO Inference
支持 ZeRO stage 3
。通过 accelerate
的集成,你只需要 prepare
模型和 dataloader
,如下:
model, eval_dataloader = accelerator.prepare(model, eval_dataloader)
注意事项:
DeepSpeed
的 Pipeline Parallelism
。mpu
,限制了 Megatron-LM
中支持的张量并行。目前 Accelerate
支持如下的 tracker
:TensorBoard, WandB, CometML, MLFlow
,如:
from accelerate import Accelerator
from accelerate.utils import LoggerType
accelerator = Accelerator(log_with="all") # For all available trackers in the environment
accelerator = Accelerator(log_with="wandb")
accelerator = Accelerator(log_with=["wandb", LoggerType.TENSORBOARD])
然后需要初始化 tracker
:
hps = {"num_iterations": 5, "learning_rate": 1e-2}
accelerator.init_trackers("my_project", config=hps)
然后记录日志:
accelerator.log({"train_loss": 1.12, "valid_loss": 0.8}, step=1)
最后在训练结束时调用:accelerator.end_training()
。
你也可以通过 accelerator.get_tracker
来获取内置的 tracker
对象:
wandb_tracker = accelerator.get_tracker("wandb")
if accelerator.is_main_process:
wandb_run.log_artifact(some_artifact_to_log)
处理大模型:常规的加载预训练模型的方式:
import torch
my_model = ModelClass(...) # step 1
state_dict = torch.load(checkpoint_file) # step 2
my_model.load_state_dict(state_dict) # step 3
这对于常规大小的模型而言很有效,但是无法处理大型模型:在 step 1
我们在 RAM
中加载一个完整版本的模型,并花一些时间随机初始化权重(这将在 step 3
被丢弃);在 step 2
,我们在 RAM
中加载另一个完整版本的模型,并使用预训练的权重。
Accelerate
提供一些工具来帮助处理大模型(这些 API
是实验性质的,未来可能会发生改变):
init_empty_weights
上下文管理器:初始化一个模型而无需使用任何内存。这依赖于 PyTorch 1.9
中引入的 meta device
。
from accelerate import init_empty_weights
with init_empty_weights():
my_model = ModelClass(...)
在该上下文管理器中,每当有一个 parameter
被创建时,它就被立即移动到 meta device
。
sharded checkpoints
:有可能你的模型太大从而无法装入内存,这并不意味着它不能被加载:如果你有一个或几个 GPU
,这就有更多的内存可用于存储你的模型。此时需要你的 checkpoint
被拆分为几个小文件,即 checkpoint shards
。
Accelerate
将处理 checkpoint shards
,但是要满足如下格式:你的 checkpoint shards
应该放在一个文件夹中,并且有几个包含部分 state dict
的文件、以及一个 index.json
文件(将 parameter name
映射到包含该 parameter weights
的文件)。如:
first_state_dict.bin
index.json
second_state_dict.bin
其中 index.json
内容为:
{
"linear1.weight": "first_state_dict.bin",
"linear1.bias": "first_state_dict.bin",
"linear2.weight": "second_state_dict.bin",
"linear2.bias": "second_state_dict.bin"
}
load_checkpoint_and_dispatch
:在 empty model
中加载一个 checkpoint
。它支持 full checkpoints
(包含整个 state dict
的单一文件)以及 sharded checkpoints
。它还会在你可用的设备( GPU
、CPU
)上自动分配这些权重,所以如果你正在加载一个 sharded checkpoints
,最大的RAM
用量将是最大分片的大小。
例如:
git clone https://huggingface.co/sgugger/sharded-gpt-j-6B
cd sharded-gpt-j-6B
git-lfs install
git pull
初始化模型:
from accelerate import init_empty_weights
from transformers import AutoConfig, AutoModelForCausalLM
checkpoint = "EleutherAI/gpt-j-6B"
config = AutoConfig.from_pretrained(checkpoint)
with init_empty_weights():
model = AutoModelForCausalLM.from_config(config)
加载权重:
from accelerate import load_checkpoint_and_dispatch
model = load_checkpoint_and_dispatch(
model, "sharded-gpt-j-6B", device_map="auto", no_split_module_classes=["GPTJBlock"]
)
通过 device_map="auto"
,Accelerate
根据可用资源自动决定将模型的每一层放在哪里:
GPU
上的最大可用空间。CPU
上。RAM
,我们将剩余的权重作为内存映射的张量存储在硬盘上。no_split_module_classes=["GPTJBlock"]
表示属于 GPTJBlock
的模块不应该在不同的设备上分割。你应该在这里设置所有包括某种残差连接的 block
。
可以通过 model.hf_device_map
查看模型的权重的设备。
分布式训练的复现:
设置随机数种子:
from accelerate import set_seed
set_seed(42)
它在内部设置了五种随机数种子:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# ^^ safe to call this function even if cuda is not available
if is_tpu_available():
xm.set_rng_state(seed)
设置 batch size
:当使用 Accelerate
训练时,传递给 dataloader
的 batch size
是 batch size/GPU
,因此 final batch size
是 batch size * device num
。
设置学习率:学习率应该和 device num
成正比,如:
learning_rate = 1e-3
accelerator = Accelerator()
learning_rate *= accelerator.num_processes
optimizer = AdamW(params=model.parameters(), lr=learning_rate)
梯度同步:在 DDP
中,PyTorch
在一些特定的点上进行进程间通信。然而在梯度累积时,你会累积 n
个 loss
并跳过 .backward()
。这可能会导致明显的减速,因为所有的进程都需要与它们进行更多次的通信。
可以通过 no_sync
上下文管理器来避免:
ddp_model, dataloader = accelerator.prepare(model, dataloader)
for index, batch in enumerate(dataloader):
inputs, targets = batch
# Trigger gradient synchronization on the last batch
if index != (len(dataloader)-1):
- with ddp_model.no_sync():
+ with accelerator.no_sync(model):
# Gradients only accumulate
outputs = ddp_model(inputs)
loss = loss_func(outputs, targets)
accelerator.backward(loss)
else:
# Gradients finally sync
outputs = ddp_model(inputs)
loss = loss_func(outputs)
accelerator.backward(loss)
或者直接使用 accelerator.accumulate
:
ddp_model, dataloader = accelerator.prepare(model, dataloader)
for batch in dataloader:
with accelerator.accumulate(model):
optimizer.zero_grad()
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
进程间同步:
accelerator.wait_for_everyone()
这将阻塞所有进程直到所有进程都达到该点。
用途:
加载数据集:
with accelerator.main_process_first():
datasets = load_dataset("glue", "mrpc")
这等价于:
# First do something on the main process
if accelerator.is_main_process:
datasets = load_dataset("glue", "mrpc")
else:
accelerator.wait_for_everyone()
# And then send it to the rest of them
if not accelerator.is_main_process:
datasets = load_dataset("glue", "mrpc")
else:
accelerator.wait_for_everyone()
存取 state_dict
:
if accelerator.is_main_process:
model = accelerator.unwrap_model(model)
torch.save(model.state_dict(), "weights.pth")
with accelerator.main_process_first():
state = torch.load("weights.pth")
model.load_state_dict(state)
在 global main
进程上 tokenizing
,然后传播到每个 worker
:
datasets = load_dataset("glue", "mrpc")
with accelerator.main_process_first():
tokenized_datasets = datasets.map(
tokenize_function,
batched=True,
remove_columns=["idx", "sentence1", "sentence2"],
)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。