赞
踩
这是我们研发的用于 消费决策的AI助理 ,我们会持续优化,欢迎体验与反馈。微信扫描二维码,添加即可。
官方链接:https://ailab.smzdm.com/
本教程将讲述 如何在Smithsonian Butterflies数据集的子集上,从头开始训练UNet2DModel,最终训练个【无条件图片生成模型】,就是不能进行文生图的啊,我觉得比较适合垂直领域的数据训练。
训练的数据集在这个:https://huggingface.co/datasets/huggan/smithsonian_butterflies_subset。可以使用代码进行下载。
完整的代码在最后,因为网络的原因,调代码花了一些时间(官网默认上传hugging face,我没上传),所以要运行的话,copy最后的全部代码。我的显卡是3050,8G显存。
from datasets import load_dataset
dataset = load_dataset("huggan/smithsonian_butterflies_subset")
代码运行完成后,它的默认下载路径在:
/Users/用户名/.cache/huggingface/datasets
进入该目录后,可以看见下载的文件夹。
为了方便起见,训练一个包含超参数的配置文件:
from dataclasses import dataclass @dataclass class TrainingConfig: image_size = 128 # the generated image resolution train_batch_size = 16 eval_batch_size = 16 # how many images to sample during evaluation num_epochs = 50 gradient_accumulation_steps = 1 learning_rate = 1e-4 lr_warmup_steps = 500 save_image_epochs = 10 save_model_epochs = 30 mixed_precision = "fp16" # `no` for float32, `fp16` for automatic mixed precision output_dir = "ddpm-butterflies-128" # the model name locally and on the HF Hub push_to_hub = True # whether to upload the saved model to the HF Hub hub_private_repo = False overwrite_output_dir = True # overwrite the old model when re-running the notebook seed = 0 config = TrainingConfig()
from datasets import load_dataset
config.dataset_name = "huggan/smithsonian_butterflies_subset"
dataset = load_dataset(config.dataset_name, split="train")
大家也可以添加一下,Smithsonian Butterflies 数据集中一些其他数据(创建一个ImageFolder文件夹),但是在 配置文件中 要进行添加对应的变量 imagefolder。当然,也可以使用自己的数据。
import matplotlib.pyplot as plt
fig, axs = plt.subplots(1, 4, figsize=(16, 4))
for i, image in enumerate(dataset[:4]["image"]):
axs[i].imshow(image)
axs[i].set_axis_off()
fig.show()
不过,这些图像的大小都不一样,所以你需要先对它们进行预处理:
from torchvision import transforms
preprocess = transforms.Compose(
[
transforms.Resize((config.image_size, config.image_size)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5]),
]
)
对图像进行预处理,将图像通道转化为RGB
def transform(examples):
images = [preprocess(image.convert("RGB")) for image in examples["image"]]
return {"images": images}
dataset.set_transform(transform)
可以再次可视化图像,以确认它们是否已经被调整。之后就可以将数据集打包到DataLoader中进行训练了!
import torch
train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=config.train_batch_size, shuffle=True)
from diffusers import UNet2DModel model = UNet2DModel( sample_size=config.image_size, # the target image resolution in_channels=3, # the number of input channels, 3 for RGB images out_channels=3, # the number of output channels layers_per_block=2, # how many ResNet layers to use per UNet block block_out_channels=(128, 128, 256, 256, 512, 512), # the number of output channels for each UNet block down_block_types=( "DownBlock2D", # a regular ResNet downsampling block "DownBlock2D", "DownBlock2D", "DownBlock2D", "AttnDownBlock2D", # a ResNet downsampling block with spatial self-attention "DownBlock2D", ), up_block_types=( "UpBlock2D", # a regular ResNet upsampling block "AttnUpBlock2D", # a ResNet upsampling block with spatial self-attention "UpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D", ), )
还有一个方法,快速检查样本图像的形状是否与模型输出形状匹配。
sample_image = dataset[0]["images"].unsqueeze(0)
print("Input shape:", sample_image.shape)
print("Output shape:", model(sample_image, timestep=0).sample.shape)
还需要一个调度器来为图像添加一些噪声。
调度器的作用在不同的场景下会生成不同的作用,这取决于您是使用模型进行训练还是推理。
在推理过程中,调度器从噪声中生成图像。
在训练过程中,调度器从图像上生成噪声。
可以看下DDPMScheduler调度器给图像增加噪声的效果:
import torch
from PIL import Image
from diffusers import DDPMScheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
noise = torch.randn(sample_image.shape)
timesteps = torch.LongTensor([50])
noisy_image = noise_scheduler.add_noise(sample_image, noise, timesteps)
Image.fromarray(((noisy_image.permute(0, 2, 3, 1) + 1.0) * 127.5).type(torch.uint8).numpy()[0])
模型的训练对象,就是去预测这些被覆盖在图像上的噪声。在这个训练过程中,loss可以被计算。
import torch.nn.functional as F
noise_pred = model(noisy_image, timesteps).sample
loss = F.mse_loss(noise_pred, noise)
到目前为止,已经完成了开始训练模型的大部分内容,剩下的就是将所有内容组合在一起。
再添加一个优化器和一个学习率调度器:
from diffusers.optimization import get_cosine_schedule_with_warmup
optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate)
lr_scheduler = get_cosine_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=config.lr_warmup_steps,
num_training_steps=(len(train_dataloader) * config.num_epochs),
)
你还需要一种方法去评估模型,可以使用 DDPMPipeline 去生成一个batch,然后将他存为一个grid。
from diffusers import DDPMPipeline import math import os def make_grid(images, rows, cols): w, h = images[0].size grid = Image.new("RGB", size=(cols * w, rows * h)) for i, image in enumerate(images): grid.paste(image, box=(i % cols * w, i // cols * h)) return grid def evaluate(config, epoch, pipeline): # Sample some images from random noise (this is the backward diffusion process). # The default pipeline output type is `List[PIL.Image]` images = pipeline( batch_size=config.eval_batch_size, generator=torch.manual_seed(config.seed), ).images # Make a grid out of the images image_grid = make_grid(images, rows=4, cols=4) # Save the images test_dir = os.path.join(config.output_dir, "samples") os.makedirs(test_dir, exist_ok=True) image_grid.save(f"{test_dir}/{epoch:04d}.png")
现在开始梳理 整个模型训练的循环过程:
from accelerate import Accelerator from huggingface_hub import HfFolder, Repository, whoami from tqdm.auto import tqdm from pathlib import Path import os def get_full_repo_name(model_id: str, organization: str = None, token: str = None): if token is None: token = HfFolder.get_token() if organization is None: username = whoami(token)["name"] return f"{username}/{model_id}" else: return f"{organization}/{model_id}" def train_loop(config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler): # Initialize accelerator and tensorboard logging accelerator = Accelerator( mixed_precision=config.mixed_precision, gradient_accumulation_steps=config.gradient_accumulation_steps, log_with="tensorboard", logging_dir=os.path.join(config.output_dir, "logs"), ) if accelerator.is_main_process: if config.push_to_hub: repo_name = get_full_repo_name(Path(config.output_dir).name) repo = Repository(config.output_dir, clone_from=repo_name) elif config.output_dir is not None: os.makedirs(config.output_dir, exist_ok=True) accelerator.init_trackers("train_example") # Prepare everything # There is no specific order to remember, you just need to unpack the # objects in the same order you gave them to the prepare method. model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, lr_scheduler ) global_step = 0 # Now you train the model for epoch in range(config.num_epochs): progress_bar = tqdm(total=len(train_dataloader), disable=not accelerator.is_local_main_process) progress_bar.set_description(f"Epoch {epoch}") for step, batch in enumerate(train_dataloader): clean_images = batch["images"] # Sample noise to add to the images noise = torch.randn(clean_images.shape).to(clean_images.device) bs = clean_images.shape[0] # Sample a random timestep for each image timesteps = torch.randint( 0, noise_scheduler.config.num_train_timesteps, (bs,), device=clean_images.device ).long() # Add noise to the clean images according to the noise magnitude at each timestep # (this is the forward diffusion process) noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps) with accelerator.accumulate(model): # Predict the noise residual noise_pred = model(noisy_images, timesteps, return_dict=False)[0] loss = F.mse_loss(noise_pred, noise) accelerator.backward(loss) accelerator.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() lr_scheduler.step() optimizer.zero_grad() progress_bar.update(1) logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0], "step": global_step} progress_bar.set_postfix(**logs) accelerator.log(logs, step=global_step) global_step += 1 # After each epoch you optionally sample some demo images with evaluate() and save the model if accelerator.is_main_process: pipeline = DDPMPipeline(unet=accelerator.unwrap_model(model), scheduler=noise_scheduler) if (epoch + 1) % config.save_image_epochs == 0 or epoch == config.num_epochs - 1: evaluate(config, epoch, pipeline) if (epoch + 1) % config.save_model_epochs == 0 or epoch == config.num_epochs - 1: if config.push_to_hub: repo.push_to_hub(commit_message=f"Epoch {epoch}", blocking=True) else: pipeline.save_pretrained(config.output_dir)
现在你终于可以使用 Accelerate的notebook_launcher函数来启动训练了。将训练循环、所有训练参数以及要用于训练的进程数(你可以将其更改为可用的GPU数量)传递给这个函数:
from accelerate import notebook_launcher
args = (config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler)
notebook_launcher(train_loop, args, num_processes=1)
在训练完成后,就可以看最后生成图像的模型了。
import glob
sample_images = sorted(glob.glob(f"{config.output_dir}/samples/*.png"))
Image.open(sample_images[-1])
# -*- coding:utf-8 _*- # Author : Robin Chen # Time : 2024/3/27 20:06 # File : train_diffusion.py # Purpose: train a unconditional diffusion model from diffusers import DDPMPipeline, DDPMScheduler, UNet2DModel from accelerate import Accelerator from huggingface_hub import HfFolder, Repository, whoami from tqdm.auto import tqdm from pathlib import Path import os from accelerate import notebook_launcher from diffusers.optimization import get_cosine_schedule_with_warmup from PIL import Image import torch.nn.functional as F import torch from torchvision import transforms from dataclasses import dataclass from datasets import load_dataset # from huggingface_hub import notebook_login os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890' os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890' # notebook_login() dataset = load_dataset("huggan/smithsonian_butterflies_subset") @dataclass class TrainingConfig: image_size = 128 # the generated image resolution train_batch_size = 16 eval_batch_size = 16 # how many images to sample during evaluation num_epochs = 50 gradient_accumulation_steps = 1 learning_rate = 1e-4 lr_warmup_steps = 500 save_image_epochs = 10 save_model_epochs = 30 mixed_precision = "fp16" # `no` for float32, `fp16` for automatic mixed precision output_dir = "ddpm-butterflies-128" # the model name locally and on the HF Hub push_to_hub = False # True # whether to upload the saved model to the HF Hub hub_private_repo = False overwrite_output_dir = True # overwrite the old model when re-running the notebook seed = 0 def transform(examples): images = [preprocess(image.convert("RGB")) for image in examples["image"]] return {"images": images} def make_grid(images, rows, cols): w, h = images[0].size grid = Image.new("RGB", size=(cols * w, rows * h)) for i, image in enumerate(images): grid.paste(image, box=(i % cols * w, i // cols * h)) return grid def evaluate(config, epoch, pipeline): # Sample some images from random noise (this is the backward diffusion process). # The default pipeline output type is `List[PIL.Image]` images = pipeline( batch_size=config.eval_batch_size, generator=torch.manual_seed(config.seed), ).images # Make a grid out of the images image_grid = make_grid(images, rows=4, cols=4) # Save the images test_dir = os.path.join(config.output_dir, "samples") os.makedirs(test_dir, exist_ok=True) image_grid.save(f"{test_dir}/{epoch:04d}.png") def get_full_repo_name(model_id: str, organization: str = None, token: str = None): if token is None: token = HfFolder.get_token() if organization is None: username = whoami(token)["name"] return f"{username}/{model_id}" else: return f"{organization}/{model_id}" config = TrainingConfig() config.dataset_name = "huggan/smithsonian_butterflies_subset" dataset = load_dataset(config.dataset_name, split="train") preprocess = transforms.Compose( [ transforms.Resize((config.image_size, config.image_size)), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.5], [0.5]), ] ) dataset.set_transform(transform) train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=config.train_batch_size, shuffle=True) model = UNet2DModel( sample_size=config.image_size, # the target image resolution in_channels=3, # the number of input channels, 3 for RGB images out_channels=3, # the number of output channels layers_per_block=2, # how many ResNet layers to use per UNet block block_out_channels=(128, 128, 256, 256, 512, 512), # the number of output channels for each UNet block down_block_types=( "DownBlock2D", # a regular ResNet downsampling block "DownBlock2D", "DownBlock2D", "DownBlock2D", "AttnDownBlock2D", # a ResNet downsampling block with spatial self-attention "DownBlock2D", ), up_block_types=( "UpBlock2D", # a regular ResNet upsampling block "AttnUpBlock2D", # a ResNet upsampling block with spatial self-attention "UpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D", ), ) sample_image = dataset[0]["images"].unsqueeze(0) print("Input shape:", sample_image.shape) print("Output shape:", model(sample_image, timestep=0).sample.shape) noise_scheduler = DDPMScheduler(num_train_timesteps=1000) noise = torch.randn(sample_image.shape) timesteps = torch.LongTensor([50]) noisy_image = noise_scheduler.add_noise(sample_image, noise, timesteps) Image.fromarray(((noisy_image.permute(0, 2, 3, 1) + 1.0) * 127.5).type(torch.uint8).numpy()[0]) noise_pred = model(noisy_image, timesteps).sample loss = F.mse_loss(noise_pred, noise) optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate) lr_scheduler = get_cosine_schedule_with_warmup( optimizer=optimizer, num_warmup_steps=config.lr_warmup_steps, num_training_steps=(len(train_dataloader) * config.num_epochs), ) def train_loop(config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler): # Initialize accelerator and tensorboard logging accelerator = Accelerator( mixed_precision=config.mixed_precision, gradient_accumulation_steps=config.gradient_accumulation_steps, log_with="tensorboard", ) # logging_dir=os.path.join(config.output_dir, "logs"), if accelerator.is_main_process: if config.push_to_hub: repo_name = get_full_repo_name(Path(config.output_dir).name) # repo = Repository(config.output_dir, clone_from=repo_name) elif config.output_dir is not None: os.makedirs(config.output_dir, exist_ok=True) accelerator.init_trackers("train_example") # Prepare everything # There is no specific order to remember, you just need to unpack the # objects in the same order you gave them to the prepare method. model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, lr_scheduler ) global_step = 0 # Now you train the model for epoch in range(config.num_epochs): progress_bar = tqdm(total=len(train_dataloader), disable=not accelerator.is_local_main_process) progress_bar.set_description(f"Epoch {epoch}") for step, batch in enumerate(train_dataloader): clean_images = batch["images"] # Sample noise to add to the images noise = torch.randn(clean_images.shape).to(clean_images.device) bs = clean_images.shape[0] # Sample a random timestep for each image timesteps = torch.randint( 0, noise_scheduler.config.num_train_timesteps, (bs,), device=clean_images.device ).long() # Add noise to the clean images according to the noise magnitude at each timestep # (this is the forward diffusion process) noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps) with accelerator.accumulate(model): # Predict the noise residual noise_pred = model(noisy_images, timesteps, return_dict=False)[0] loss = F.mse_loss(noise_pred, noise) accelerator.backward(loss) accelerator.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() lr_scheduler.step() optimizer.zero_grad() progress_bar.update(1) logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0], "step": global_step} progress_bar.set_postfix(**logs) accelerator.log(logs, step=global_step) global_step += 1 # After each epoch you optionally sample some demo images with evaluate() and save the model if accelerator.is_main_process: pipeline = DDPMPipeline(unet=accelerator.unwrap_model(model), scheduler=noise_scheduler) if (epoch + 1) % config.save_image_epochs == 0 or epoch == config.num_epochs - 1: evaluate(config, epoch, pipeline) if (epoch + 1) % config.save_model_epochs == 0 or epoch == config.num_epochs - 1: if config.push_to_hub: repo.push_to_hub(commit_message=f"Epoch {epoch}", blocking=True) else: pipeline.save_pretrained(config.output_dir) if __name__ == "__main__": args = (config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler) notebook_launcher(train_loop, args, num_processes=1)
当开始训练后,工程下面会生成一些文件:
在samples下,会保存一些固定轮数后,模型生成图片的结果, 可以看出模型生成的图片越来越好。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。