赞
踩
近些年,随着基于自注意(Self-Attention)结构的模型的发展,特别是Transformer模型的提出,极大地促进了自然语言处理模型的发展。由于Transformers的计算效率和可扩展性,它已经能够训练具有超过100B参数的空前规模的模型。
ViT则是自然语言处理和计算机视觉两个领域的融合结晶。在不依赖卷积操作的情况下,依然可以在图像分类任务上达到很好的效果。
ViT模型的主体结构是基于Transformer模型的Encoder部分(部分结构顺序有调整,如:Normalization的位置与标准Transformer不同),其结构图如下:
ViT模型主要应用于图像分类领域。因此,其模型结构相较于传统的Transformer有以下几个特点:
from download import download dataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/vit_imagenet_dataset.zip" path = "./" path = download(dataset_url, path, kind="zip", replace=True) import os import mindspore as ms from mindspore.dataset import ImageFolderDataset import mindspore.dataset.vision as transforms data_path = './dataset/' mean = [0.485 * 255, 0.456 * 255, 0.406 * 255] std = [0.229 * 255, 0.224 * 255, 0.225 * 255] dataset_train = ImageFolderDataset(os.path.join(data_path, "train"), shuffle=True) trans_train = [ transforms.RandomCropDecodeResize(size=224, scale=(0.08, 1.0), ratio=(0.75, 1.333)), transforms.RandomHorizontalFlip(prob=0.5), transforms.Normalize(mean=mean, std=std), transforms.HWC2CHW() ] dataset_train = dataset_train.map(operations=trans_train, input_columns=["image"]) dataset_train = dataset_train.batch(batch_size=16, drop_remainder=True)
下面将通过代码来细致剖析ViT模型的内部结构。
Transformer模型源于2017年的一篇文章。在这篇文章中提出的基于Attention机制的编码器-解码器型结构在自然语言处理领域获得了巨大的成功。模型结构如下图所示:
其主要结构为多个Encoder和Decoder模块所组成,其中Encoder和Decoder的详细结构如下图[2]所示:
Encoder与Decoder由许多结构组成,如:多头注意力(Multi-Head Attention)层,Feed Forward层,Normaliztion层,甚至残差连接(Residual Connection,图中的“Add”)。不过,其中最重要的结构是多头注意力(Multi-Head Attention)结构,该结构基于自注意力(Self-Attention)机制,是多个Self-Attention的并行组成。
所以,理解了Self-Attention就抓住了Transformer的核心。
以下是Self-Attention的解释,其核心内容是为输入向量的每个单词学习一个权重。通过给定一个任务相关的查询向量Query向量,计算Query和各个Key的相似性或者相关性得到注意力分布,即得到每个Key对应Value的权重系数,然后对Value进行加权求和得到最终的Attention数值。
在Self-Attention中:
{
q
i
=
W
q
⋅
x
i
k
i
=
W
k
⋅
x
i
,
i
=
1
,
2
,
3
…
v
i
=
W
v
⋅
x
i
(1)
{
a
1
,
1
=
q
1
⋅
k
1
/
d
a
1
,
2
=
q
1
⋅
k
2
/
d
a
1
,
3
=
q
1
⋅
k
3
/
d
(2)
S o f t m a x : a ^ 1 , i = e x p ( a 1 , i ) / ∑ j e x p ( a 1 , j ) , j = 1 , 2 , 3 … (3) Softmax: \hat a_{1,i} = exp(a_{1,i}) / \sum_j exp(a_{1,j}),\hspace{1em} j = 1,2,3 \ldots \tag{3} Softmax:a^1,i=exp(a1,i)/j∑exp(a1,j),j=1,2,3…(3)
b 1 = ∑ i a ^ 1 , i v i , i = 1 , 2 , 3... (4) b_1 = \sum_i \hat a_{1,i}v_i,\hspace{1em} i = 1,2,3... \tag{4} b1=i∑a^1,ivi,i=1,2,3...(4)
通过下图可以整体把握Self-Attention的全部过程。
多头注意力机制就是将原本self-Attention处理的向量分割为多个Head进行处理,这一点也可以从代码中体现,这也是attention结构可以进行并行加速的一个方面。
总结来说,多头注意力机制在保持参数总量不变的情况下,将同样的query, key和value映射到原来的高维空间(Q,K,V)的不同子空间(Q_0,K_0,V_0)中进行自注意力的计算,最后再合并不同子空间中的注意力信息。
所以,对于同一个输入向量,多个注意力机制可以同时对其进行处理,即利用并行计算加速处理过程,又在处理的时候更充分的分析和利用了向量特征。下图展示了多头注意力机制,其并行能力的主要体现在下图中的 a 1 a_1 a1和 a 2 a_2 a2是同一个向量进行分割获得的。
以下是Multi-Head Attention代码,结合上文的解释,代码清晰的展现了这一过程。
from mindspore import nn, ops class Attention(nn.Cell): def __init__(self, dim: int, num_heads: int = 8, keep_prob: float = 1.0, attention_keep_prob: float = 1.0): super(Attention, self).__init__() self.num_heads = num_heads head_dim = dim // num_heads self.scale = ms.Tensor(head_dim ** -0.5) self.qkv = nn.Dense(dim, dim * 3) self.attn_drop = nn.Dropout(p=1.0-attention_keep_prob) self.out = nn.Dense(dim, dim) self.out_drop = nn.Dropout(p=1.0-keep_prob) self.attn_matmul_v = ops.BatchMatMul() self.q_matmul_k = ops.BatchMatMul(transpose_b=True) self.softmax = nn.Softmax(axis=-1) def construct(self, x): """Attention construct.""" b, n, c = x.shape qkv = self.qkv(x) qkv = ops.reshape(qkv, (b, n, 3, self.num_heads, c // self.num_heads)) qkv = ops.transpose(qkv, (2, 0, 3, 1, 4)) q, k, v = ops.unstack(qkv, axis=0) attn = self.q_matmul_k(q, k) attn = ops.mul(attn, self.scale) attn = self.softmax(attn) attn = self.attn_drop(attn) out = self.attn_matmul_v(attn, v) out = ops.transpose(out, (0, 2, 1, 3)) out = ops.reshape(out, (b, n, c)) out = self.out(out) out = self.out_drop(out) return out
在了解了Self-Attention结构之后,通过与Feed Forward,Residual Connection等结构的拼接就可以形成Transformer的基础结构,下面代码实现了Feed Forward,Residual Connection结构。
from typing import Optional, Dict class FeedForward(nn.Cell): def __init__(self, in_features: int, hidden_features: Optional[int] = None, out_features: Optional[int] = None, activation: nn.Cell = nn.GELU, keep_prob: float = 1.0): super(FeedForward, self).__init__() out_features = out_features or in_features hidden_features = hidden_features or in_features self.dense1 = nn.Dense(in_features, hidden_features) self.activation = activation() self.dense2 = nn.Dense(hidden_features, out_features) self.dropout = nn.Dropout(p=1.0-keep_prob) def construct(self, x): """Feed Forward construct.""" x = self.dense1(x) x = self.activation(x) x = self.dropout(x) x = self.dense2(x) x = self.dropout(x) return x class ResidualCell(nn.Cell): def __init__(self, cell): super(ResidualCell, self).__init__() self.cell = cell def construct(self, x): """ResidualCell construct.""" return self.cell(x) + x
接下来就利用Self-Attention来构建ViT模型中的TransformerEncoder部分,类似于构建了一个Transformer的编码器部分,如下图所示:
ViT模型中的基础结构与标准Transformer有所不同,主要在于Normalization的位置是放在Self-Attention和Feed Forward之前,其他结构如Residual Connection,Feed Forward,Normalization都如Transformer中所设计。
从Transformer结构的图片可以发现,多个子encoder的堆叠就完成了模型编码器的构建,在ViT模型中,依然沿用这个思路,通过配置超参数num_layers,就可以确定堆叠层数。
Residual Connection,Normalization的结构可以保证模型有很强的扩展性(保证信息经过深层处理不会出现退化的现象,这是Residual Connection的作用),Normalization和dropout的应用可以增强模型泛化能力。
从以下源码中就可以清晰看到Transformer的结构。将TransformerEncoder结构和一个多层感知器(MLP)结合,就构成了ViT模型的backbone部分。
class TransformerEncoder(nn.Cell): def __init__(self, dim: int, num_layers: int, num_heads: int, mlp_dim: int, keep_prob: float = 1., attention_keep_prob: float = 1.0, drop_path_keep_prob: float = 1.0, activation: nn.Cell = nn.GELU, norm: nn.Cell = nn.LayerNorm): super(TransformerEncoder, self).__init__() layers = [] for _ in range(num_layers): normalization1 = norm((dim,)) normalization2 = norm((dim,)) attention = Attention(dim=dim, num_heads=num_heads, keep_prob=keep_prob, attention_keep_prob=attention_keep_prob) feedforward = FeedForward(in_features=dim, hidden_features=mlp_dim, activation=activation, keep_prob=keep_prob) layers.append( nn.SequentialCell([ ResidualCell(nn.SequentialCell([normalization1, attention])), ResidualCell(nn.SequentialCell([normalization2, feedforward])) ]) ) self.layers = nn.SequentialCell(layers) def construct(self, x): """Transformer construct.""" return self.layers(x)
传统的Transformer结构主要用于处理自然语言领域的词向量(Word Embedding or Word Vector),词向量与传统图像数据的主要区别在于,词向量通常是一维向量进行堆叠,而图片则是二维矩阵的堆叠,多头注意力机制在处理一维词向量的堆叠时会提取词向量之间的联系也就是上下文语义,这使得Transformer在自然语言处理领域非常好用,而二维图片矩阵如何与一维词向量进行转化就成为了Transformer进军图像处理领域的一个小门槛。
在ViT模型中:
通过将输入图像在每个channel上划分为16*16个patch,这一步是通过卷积操作来完成的,当然也可以人工进行划分,但卷积操作也可以达到目的同时还可以进行一次而外的数据处理;例如一幅输入224 x 224的图像,首先经过卷积处理得到16 x 16个patch,那么每一个patch的大小就是14 x 14。
再将每一个patch的矩阵拉伸成为一个一维向量,从而获得了近似词向量堆叠的效果。上一步得到的14 x 14的patch就转换为长度为196的向量。
这是图像输入网络经过的第一步处理。具体Patch Embedding的代码如下所示:
class PatchEmbedding(nn.Cell): MIN_NUM_PATCHES = 4 def __init__(self, image_size: int = 224, patch_size: int = 16, embed_dim: int = 768, input_channels: int = 3): super(PatchEmbedding, self).__init__() self.image_size = image_size self.patch_size = patch_size self.num_patches = (image_size // patch_size) ** 2 self.conv = nn.Conv2d(input_channels, embed_dim, kernel_size=patch_size, stride=patch_size, has_bias=True) def construct(self, x): """Path Embedding construct.""" x = self.conv(x) b, c, h, w = x.shape x = ops.reshape(x, (b, c, h * w)) x = ops.transpose(x, (0, 2, 1)) return x
输入图像在划分为patch之后,会经过pos_embedding 和 class_embedding两个过程。
class_embedding主要借鉴了BERT模型的用于文本分类时的思想,在每一个word vector之前增加一个类别值,通常是加在向量的第一位,上一步得到的196维的向量加上class_embedding后变为197维。
增加的class_embedding是一个可以学习的参数,经过网络的不断训练,最终以输出向量的第一个维度的输出来决定最后的输出类别;由于输入是16 x 16个patch,所以输出进行分类时是取 16 x 16个class_embedding进行分类。
pos_embedding也是一组可以学习的参数,会被加入到经过处理的patch矩阵中。
由于pos_embedding也是可以学习的参数,所以它的加入类似于全链接网络和卷积的bias。这一步就是创造一个长度维197的可训练向量加入到经过class_embedding的向量中。
实际上,pos_embedding总共有4种方案。但是经过作者的论证,只有加上pos_embedding和不加pos_embedding有明显影响,至于pos_embedding是一维还是二维对分类结果影响不大,所以,在我们的代码中,也是采用了一维的pos_embedding,由于class_embedding是加在pos_embedding之前,所以pos_embedding的维度会比patch拉伸后的维度加1。
总的而言,ViT模型还是利用了Transformer模型在处理上下文语义时的优势,将图像转换为一种“变种词向量”然后进行处理,而这样转换的意义在于,多个patch之间本身具有空间联系,这类似于一种“空间语义”,从而获得了比较好的处理效果。
以下代码构建了一个完整的ViT模型。
from mindspore.common.initializer import Normal from mindspore.common.initializer import initializer from mindspore import Parameter def init(init_type, shape, dtype, name, requires_grad): """Init.""" initial = initializer(init_type, shape, dtype).init_data() return Parameter(initial, name=name, requires_grad=requires_grad) class ViT(nn.Cell): def __init__(self, image_size: int = 224, input_channels: int = 3, patch_size: int = 16, embed_dim: int = 768, num_layers: int = 12, num_heads: int = 12, mlp_dim: int = 3072, keep_prob: float = 1.0, attention_keep_prob: float = 1.0, drop_path_keep_prob: float = 1.0, activation: nn.Cell = nn.GELU, norm: Optional[nn.Cell] = nn.LayerNorm, pool: str = 'cls') -> None: super(ViT, self).__init__() self.patch_embedding = PatchEmbedding(image_size=image_size, patch_size=patch_size, embed_dim=embed_dim, input_channels=input_channels) num_patches = self.patch_embedding.num_patches self.cls_token = init(init_type=Normal(sigma=1.0), shape=(1, 1, embed_dim), dtype=ms.float32, name='cls', requires_grad=True) self.pos_embedding = init(init_type=Normal(sigma=1.0), shape=(1, num_patches + 1, embed_dim), dtype=ms.float32, name='pos_embedding', requires_grad=True) self.pool = pool self.pos_dropout = nn.Dropout(p=1.0-keep_prob) self.norm = norm((embed_dim,)) self.transformer = TransformerEncoder(dim=embed_dim, num_layers=num_layers, num_heads=num_heads, mlp_dim=mlp_dim, keep_prob=keep_prob, attention_keep_prob=attention_keep_prob, drop_path_keep_prob=drop_path_keep_prob, activation=activation, norm=norm) self.dropout = nn.Dropout(p=1.0-keep_prob) self.dense = nn.Dense(embed_dim, num_classes) def construct(self, x): """ViT construct.""" x = self.patch_embedding(x) cls_tokens = ops.tile(self.cls_token.astype(x.dtype), (x.shape[0], 1, 1)) x = ops.concat((cls_tokens, x), axis=1) x += self.pos_embedding x = self.pos_dropout(x) x = self.transformer(x) x = self.norm(x) x = x[:, 0] if self.training: x = self.dropout(x) x = self.dense(x) return x
整体流程图如下所示:
from mindspore.nn import LossBase from mindspore.train import LossMonitor, TimeMonitor, CheckpointConfig, ModelCheckpoint from mindspore import train # define super parameter epoch_size = 10 momentum = 0.9 num_classes = 1000 resize = 224 step_size = dataset_train.get_dataset_size() # construct model network = ViT() # load ckpt vit_url = "https://download.mindspore.cn/vision/classification/vit_b_16_224.ckpt" path = "./ckpt/vit_b_16_224.ckpt" vit_path = download(vit_url, path, replace=True) param_dict = ms.load_checkpoint(vit_path) ms.load_param_into_net(network, param_dict) # define learning rate lr = nn.cosine_decay_lr(min_lr=float(0), max_lr=0.00005, total_step=epoch_size * step_size, step_per_epoch=step_size, decay_epoch=10) # define optimizer network_opt = nn.Adam(network.trainable_params(), lr, momentum) # define loss function class CrossEntropySmooth(LossBase): """CrossEntropy.""" def __init__(self, sparse=True, reduction='mean', smooth_factor=0., num_classes=1000): super(CrossEntropySmooth, self).__init__() self.onehot = ops.OneHot() self.sparse = sparse self.on_value = ms.Tensor(1.0 - smooth_factor, ms.float32) self.off_value = ms.Tensor(1.0 * smooth_factor / (num_classes - 1), ms.float32) self.ce = nn.SoftmaxCrossEntropyWithLogits(reduction=reduction) def construct(self, logit, label): if self.sparse: label = self.onehot(label, ops.shape(logit)[1], self.on_value, self.off_value) loss = self.ce(logit, label) return loss network_loss = CrossEntropySmooth(sparse=True, reduction="mean", smooth_factor=0.1, num_classes=num_classes) # set checkpoint ckpt_config = CheckpointConfig(save_checkpoint_steps=step_size, keep_checkpoint_max=100) ckpt_callback = ModelCheckpoint(prefix='vit_b_16', directory='./ViT', config=ckpt_config) # initialize model # "Ascend + mixed precision" can improve performance ascend_target = (ms.get_context("device_target") == "Ascend") if ascend_target: model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics={"acc"}, amp_level="O2") else: model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics={"acc"}, amp_level="O0") # train model model.train(epoch_size, dataset_train, callbacks=[ckpt_callback, LossMonitor(125), TimeMonitor(125)], dataset_sink_mode=False,)
运行结果:
模型验证过程主要应用了ImageFolderDataset,CrossEntropySmooth和Model等接口。
ImageFolderDataset主要用于读取数据集。
CrossEntropySmooth是损失函数实例化接口。
Model主要用于编译模型。
与训练过程相似,首先进行数据增强,然后定义ViT网络结构,加载预训练模型参数。随后设置损失函数,评价指标等,编译模型后进行验证。本案例采用了业界通用的评价标准Top_1_Accuracy和Top_5_Accuracy评价指标来评价模型表现。
在本案例中,这两个指标代表了在输出的1000维向量中,以最大值或前5的输出值所代表的类别为预测结果时,模型预测的准确率。这两个指标的值越大,代表模型准确率越高。
dataset_val = ImageFolderDataset(os.path.join(data_path, "val"), shuffle=True) trans_val = [ transforms.Decode(), transforms.Resize(224 + 32), transforms.CenterCrop(224), transforms.Normalize(mean=mean, std=std), transforms.HWC2CHW() ] dataset_val = dataset_val.map(operations=trans_val, input_columns=["image"]) dataset_val = dataset_val.batch(batch_size=16, drop_remainder=True) # construct model network = ViT() # load ckpt param_dict = ms.load_checkpoint(vit_path) ms.load_param_into_net(network, param_dict) network_loss = CrossEntropySmooth(sparse=True, reduction="mean", smooth_factor=0.1, num_classes=num_classes) # define metric eval_metrics = {'Top_1_Accuracy': train.Top1CategoricalAccuracy(), 'Top_5_Accuracy': train.Top5CategoricalAccuracy()} if ascend_target: model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics=eval_metrics, amp_level="O2") else: model = train.Model(network, loss_fn=network_loss, optimizer=network_opt, metrics=eval_metrics, amp_level="O0") # evaluate model result = model.eval(dataset_val) print(result)
运行结果:
从结果可以看出,由于我们加载了预训练模型参数,模型的Top_1_Accuracy和Top_5_Accuracy达到了很高的水平,实际项目中也可以以此准确率为标准。如果未使用预训练模型参数,则需要更多的epoch来训练。
在进行模型推理之前,首先要定义一个对推理图片进行数据预处理的方法。该方法可以对我们的推理图片进行resize和normalize处理,这样才能与我们训练时的输入数据匹配。
本案例采用了一张Doberman的图片作为推理图片来测试模型表现,期望模型可以给出正确的预测结果。
dataset_infer = ImageFolderDataset(os.path.join(data_path, "infer"), shuffle=True)
trans_infer = [
transforms.Decode(),
transforms.Resize([224, 224]),
transforms.Normalize(mean=mean, std=std),
transforms.HWC2CHW()
]
dataset_infer = dataset_infer.map(operations=trans_infer,
input_columns=["image"],
num_parallel_workers=1)
dataset_infer = dataset_infer.batch(1)
接下来,我们将调用模型的predict方法进行模型。
在推理过程中,通过index2label就可以获取对应标签,再通过自定义的show_result接口将结果写在对应图片上。
import os import pathlib import cv2 import numpy as np from PIL import Image from enum import Enum from scipy import io class Color(Enum): """dedine enum color.""" red = (0, 0, 255) green = (0, 255, 0) blue = (255, 0, 0) cyan = (255, 255, 0) yellow = (0, 255, 255) magenta = (255, 0, 255) white = (255, 255, 255) black = (0, 0, 0) def check_file_exist(file_name: str): """check_file_exist.""" if not os.path.isfile(file_name): raise FileNotFoundError(f"File `{file_name}` does not exist.") def color_val(color): """color_val.""" if isinstance(color, str): return Color[color].value if isinstance(color, Color): return color.value if isinstance(color, tuple): assert len(color) == 3 for channel in color: assert 0 <= channel <= 255 return color if isinstance(color, int): assert 0 <= color <= 255 return color, color, color if isinstance(color, np.ndarray): assert color.ndim == 1 and color.size == 3 assert np.all((color >= 0) & (color <= 255)) color = color.astype(np.uint8) return tuple(color) raise TypeError(f'Invalid type for color: {type(color)}') def imread(image, mode=None): """imread.""" if isinstance(image, pathlib.Path): image = str(image) if isinstance(image, np.ndarray): pass elif isinstance(image, str): check_file_exist(image) image = Image.open(image) if mode: image = np.array(image.convert(mode)) else: raise TypeError("Image must be a `ndarray`, `str` or Path object.") return image def imwrite(image, image_path, auto_mkdir=True): """imwrite.""" if auto_mkdir: dir_name = os.path.abspath(os.path.dirname(image_path)) if dir_name != '': dir_name = os.path.expanduser(dir_name) os.makedirs(dir_name, mode=777, exist_ok=True) image = Image.fromarray(image) image.save(image_path) def imshow(img, win_name='', wait_time=0): """imshow""" cv2.imshow(win_name, imread(img)) if wait_time == 0: # prevent from hanging if windows was closed while True: ret = cv2.waitKey(1) closed = cv2.getWindowProperty(win_name, cv2.WND_PROP_VISIBLE) < 1 # if user closed window or if some key pressed if closed or ret != -1: break else: ret = cv2.waitKey(wait_time) def show_result(img: str, result: Dict[int, float], text_color: str = 'green', font_scale: float = 0.5, row_width: int = 20, show: bool = False, win_name: str = '', wait_time: int = 0, out_file: Optional[str] = None) -> None: """Mark the prediction results on the picture.""" img = imread(img, mode="RGB") img = img.copy() x, y = 0, row_width text_color = color_val(text_color) for k, v in result.items(): if isinstance(v, float): v = f'{v:.2f}' label_text = f'{k}: {v}' cv2.putText(img, label_text, (x, y), cv2.FONT_HERSHEY_COMPLEX, font_scale, text_color) y += row_width if out_file: show = False imwrite(img, out_file) if show: imshow(img, win_name, wait_time) def index2label(): """Dictionary output for image numbers and categories of the ImageNet dataset.""" metafile = os.path.join(data_path, "ILSVRC2012_devkit_t12/data/meta.mat") meta = io.loadmat(metafile, squeeze_me=True)['synsets'] nums_children = list(zip(*meta))[4] meta = [meta[idx] for idx, num_children in enumerate(nums_children) if num_children == 0] _, wnids, classes = list(zip(*meta))[:3] clssname = [tuple(clss.split(', ')) for clss in classes] wnid2class = {wnid: clss for wnid, clss in zip(wnids, clssname)} wind2class_name = sorted(wnid2class.items(), key=lambda x: x[0]) mapping = {} for index, (_, class_name) in enumerate(wind2class_name): mapping[index] = class_name[0] return mapping # Read data for inference for i, image in enumerate(dataset_infer.create_dict_iterator(output_numpy=True)): image = image["image"] image = ms.Tensor(image) prob = model.predict(image) label = np.argmax(prob.asnumpy(), axis=1) mapping = index2label() output = {int(label): mapping[int(label)]} print(output) show_result(img="./dataset/infer/n01440764/ILSVRC2012_test_00000279.JPEG", result=output, out_file="./dataset/infer/ILSVRC2012_test_00000279.JPEG")
推理过程完成后,在推理文件夹下可以找到图片的推理结果,可以看出预测结果是Doberman,与期望结果相同,验证了模型的准确性。
截图时间
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。