当前位置:   article > 正文

语义分割学习笔记(五)U-net网络_u-net模型

u-net模型

推荐课程:U-Net网络结构讲解(语义分割)_哔哩哔哩_bilibili

感谢博主霹雳吧啦Wz  / 太阳花的小绿豆提供视频讲解和源码支持!

目录

1. U-net网络模型

2. 分割效果

3. U-Net源码解析(Pytorch版)

4. 测试结果


1. U-net网络模型

U-Net网络由两部分构成,contracting path(收缩路径) 和 expanding path(扩展路径)。

U-Net网络训练过程:

1. contracting path(收缩路径):由4组 { 两个3x3卷积层 + 一个池化层(下采样) } 构成。

输入特征图(572 x 572 x 1) --conv(3x3卷积)--> 长、宽、通道数(570 x 570 x 64)--conv(3x3卷积)--> (568 x 568 x 64)--max_pooling(池化)(减半)--> (284 x 284 x 64),  两个卷积层 + 一个池化层……最后到特征图(32 x 32 x 512)。

2. 中间又经过两个3x3卷积层:特征图(32 x 32 x 512) --conv(3x3卷积)--> (30 x 30 x 1024)--conv(3x3卷积)--> (28 x 28 x 1024)

3. expanding path(扩展路径):由4组 { 中心裁剪和拼接 + 一个上采样层(转置卷积) + 两个3x3卷积层 } 构成。

注意:copy and cope 中心裁剪和拼接,先进行裁剪 (64 x 64 x 512)--crop(中心裁剪)--> (56 x 56 x 512) 。这里裁剪的是contracting path(收缩路径)中的一个特征图。再在 expanding path(扩展路径)中进行拼接。

特征图(28 x 28 x 1024) --up-conv(上采样,转置卷积)--> (56 x 56 x 512)--cope(拼接,上面中心裁剪得到的特征图)-->(56 x 56 x 1024)--conv(3x3卷积)--> (54 x 54 x 512)--conv(3x3卷积)--> (52 x 52 x 512)  ,一次中心裁剪 + 一个上采样层(转置卷积) + 两个卷积层……最后得到特征图(388 x 388 x 64)。

4. 最后进行一次1x1卷积:特征图(388 x 388 x 64)--conv(1x1卷积)--> 特征图(388 x 388 x 2)。最后输出一个388 x 388 x 2的分割图。

U-Net网络模型改进:在步骤2和步骤3中的卷积层改为大小为3x3,填充为1的卷积层,这样 expanding path(扩展路径)中的特征图经过上采样后的大小与contracting path(收缩路径)中对应的特征图大小一致,可以省去中心裁剪这一步直接拼接。

2. 分割效果

3. U-Net源码解析(Pytorch版)

unet源码:https://github.com/WZMIAOMIAO/deep-learning-for-image-processing/tree/master/pytorch_segmentation/unet

DRIVE数据集下载地址 :百度云链接: https://pan.baidu.com/s/1Tjkrx2B9FgoJk0KviA-rDw 密码: 8no8

unet源码:

  1. ├── src: 搭建U-Net模型代码
  2. ├── train_utils: 训练、验证以及多GPU训练相关模块
  3. ├── my_dataset.py: 自定义dataset用于读取DRIVE数据集(视网膜血管分割)
  4. ├── train.py: 以单GPU为例进行训练
  5. ├── train_multi_GPU.py: 针对使用多GPU的用户使用
  6. ├── predict.py: 简易的预测脚本,使用训练好的权重进行预测测试
  7. └── compute_mean_std.py: 统计数据集各通道的均值和标准差

DRIVE数据集:

  1. test:
  2. 1st_manual目录:标注图片,金标准
  3. 2nd_manual目录:标注图片,验证
  4. images目录:用于分割的原图片
  5. mask目录:分割区域,
  6. training:
  7. 1st_manual目录:标注图片
  8. images目录:用于分割的原图片
  9. mask目录:分割区域

改进的U-Net网络模型:

 (1) U-Net网络模型代码

unet.py

  1. from typing import Dict
  2. import torch
  3. import torch.nn as nn
  4. import torch.nn.functional as F
  5. # 在uent中卷积一般成对使用
  6. class DoubleConv(nn.Sequential):
  7. # 输入通道数, 输出通道数, mid_channels为成对卷积中第一个卷积层的输出通道数
  8. def __init__(self, in_channels, out_channels, mid_channels=None):
  9. if mid_channels is None:
  10. mid_channels = out_channels
  11. super(DoubleConv, self).__init__(
  12. # 3*3卷积,填充为1,卷积之后输入输出的特征图大小一致
  13. nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
  14. nn.BatchNorm2d(mid_channels),
  15. nn.ReLU(inplace=True),
  16. nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
  17. nn.BatchNorm2d(out_channels),
  18. nn.ReLU(inplace=True)
  19. )
  20. # 下采样
  21. class Down(nn.Sequential):
  22. def __init__(self, in_channels, out_channels):
  23. super(Down, self).__init__(
  24. # 1.最大池化的窗口大小为2, 步长为2
  25. nn.MaxPool2d(2, stride=2),
  26. # 2.两个卷积
  27. DoubleConv(in_channels, out_channels)
  28. )
  29. # 上采样
  30. class Up(nn.Module):
  31. # bilinear是否采用双线性插值
  32. def __init__(self, in_channels, out_channels, bilinear=True):
  33. super(Up, self).__init__()
  34. if bilinear:
  35. # 使用双线性插值上采样
  36. # 上采样率为2,双线性插值模式
  37. self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
  38. self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
  39. else:
  40. # 使用转置卷积上采样
  41. self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
  42. self.conv = DoubleConv(in_channels, out_channels)
  43. def forward(self, x1: torch.Tensor, x2: torch.Tensor) -> torch.Tensor:
  44. x1 = self.up(x1)
  45. # [N, C, H, W]
  46. # 上采样之后的特征图与要拼接的特征图,高度方向的差值
  47. diff_y = x2.size()[2] - x1.size()[2]
  48. # 上采样之后的特征图与要拼接的特征图,宽度方向的差值
  49. diff_x = x2.size()[3] - x1.size()[3]
  50. # padding_left, padding_right, padding_top, padding_bottom
  51. # 1.填充差值
  52. x1 = F.pad(x1, [diff_x // 2, diff_x - diff_x // 2,
  53. diff_y // 2, diff_y - diff_y // 2])
  54. # 2.拼接
  55. x = torch.cat([x2, x1], dim=1)
  56. # 3.两个卷积
  57. x = self.conv(x)
  58. return x
  59. # 最后的1*1输出卷积
  60. class OutConv(nn.Sequential):
  61. def __init__(self, in_channels, num_classes):
  62. super(OutConv, self).__init__(
  63. nn.Conv2d(in_channels, num_classes, kernel_size=1)
  64. )
  65. # unet网络模型
  66. class UNet(nn.Module):
  67. # 参数: 输入通道数, 分割任务个数, 是否使用双线插值, 网络中第一个卷积通道个数
  68. def __init__(self,
  69. in_channels: int = 1,
  70. num_classes: int = 2,
  71. bilinear: bool = True,
  72. base_c: int = 64):
  73. super(UNet, self).__init__()
  74. self.in_channels = in_channels
  75. self.num_classes = num_classes
  76. self.bilinear = bilinear
  77. self.in_conv = DoubleConv(in_channels, base_c)
  78. # 下采样,参数:输入通道,输出通道
  79. self.down1 = Down(base_c, base_c * 2)
  80. self.down2 = Down(base_c * 2, base_c * 4)
  81. self.down3 = Down(base_c * 4, base_c * 8)
  82. # 如果采用双线插值上采样为 2,采用转置矩阵上采样为 1
  83. factor = 2 if bilinear else 1
  84. # 最后一个下采样,如果是双线插值则输出通道为512,否则为1024
  85. self.down4 = Down(base_c * 8, base_c * 16 // factor)
  86. # 上采样,参数:输入通道,输出通道
  87. self.up1 = Up(base_c * 16, base_c * 8 // factor, bilinear)
  88. self.up2 = Up(base_c * 8, base_c * 4 // factor, bilinear)
  89. self.up3 = Up(base_c * 4, base_c * 2 // factor, bilinear)
  90. self.up4 = Up(base_c * 2, base_c, bilinear)
  91. # 最后的1*1输出卷积
  92. self.out_conv = OutConv(base_c, num_classes)
  93. # 正向传播过程
  94. def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
  95. # 1. 定义最开始的两个卷积层
  96. x1 = self.in_conv(x)
  97. # 2. contracting path(收缩路径)
  98. x2 = self.down1(x1)
  99. x3 = self.down2(x2)
  100. x4 = self.down3(x3)
  101. x5 = self.down4(x4)
  102. # 3. expanding path(扩展路径)
  103. x = self.up1(x5, x4)
  104. x = self.up2(x, x3)
  105. x = self.up3(x, x2)
  106. x = self.up4(x, x1)
  107. # 4. 最后1*1输出卷积
  108. logits = self.out_conv(x)
  109. return {"out": logits}

(2)加载数据集

my_dataset.py

  1. import os
  2. from PIL import Image
  3. import numpy as np
  4. from torch.utils.data import Dataset
  5. # 载入数据集,继承Dataset类
  6. class DriveDataset(Dataset):
  7. # 获取文件路劲。参数:根目录, T 载入训练集/F 测试集, 数据预处理方式
  8. def __init__(self, root: str, train: bool, transforms=None):
  9. super(DriveDataset, self).__init__()
  10. # train为 T,flag为training,否则为test
  11. self.flag = "training" if train else "test"
  12. data_root = os.path.join(root, "DRIVE", self.flag)
  13. # 判断路径是否存在
  14. assert os.path.exists(data_root), f"path '{data_root}' does not exists."
  15. self.transforms = transforms
  16. # 遍历data_root下的images目录,得到以.tif结尾的文件名称
  17. img_names = [i for i in os.listdir(os.path.join(data_root, "images")) if i.endswith(".tif")]
  18. # 获取文件路径
  19. self.img_list = [os.path.join(data_root, "images", i) for i in img_names]
  20. self.manual = [os.path.join(data_root, "1st_manual", i.split("_")[0] + "_manual1.gif")
  21. for i in img_names]
  22. # check files
  23. for i in self.manual:
  24. if os.path.exists(i) is False:
  25. raise FileNotFoundError(f"file {i} does not exists.")
  26. self.roi_mask = [os.path.join(data_root, "mask", i.split("_")[0] + f"_{self.flag}_mask.gif")
  27. for i in img_names]
  28. # check files
  29. for i in self.roi_mask:
  30. if os.path.exists(i) is False:
  31. raise FileNotFoundError(f"file {i} does not exists.")
  32. #
  33. def __getitem__(self, idx):
  34. # 转化为RGB灰度图片
  35. img = Image.open(self.img_list[idx]).convert('RGB')
  36. manual = Image.open(self.manual[idx]).convert('L')
  37. # 前景区域像素值变为1,背景区域像素值变为0
  38. manual = np.array(manual) / 255
  39. roi_mask = Image.open(self.roi_mask[idx]).convert('L')
  40. # 感兴趣的区域像素值变为0,不感兴趣的区域像素值变为255
  41. roi_mask = 255 - np.array(roi_mask)
  42. mask = np.clip(manual + roi_mask, a_min=0, a_max=255)
  43. # 这里转回PIL的原因是,transforms中是对PIL数据进行处理
  44. mask = Image.fromarray(mask)
  45. if self.transforms is not None:
  46. # 进行图片预处理
  47. img, mask = self.transforms(img, mask)
  48. return img, mask
  49. def __len__(self):
  50. # 返回用于分割的原图片个数
  51. return len(self.img_list)
  52. @staticmethod
  53. def collate_fn(batch):
  54. images, targets = list(zip(*batch))
  55. batched_imgs = cat_list(images, fill_value=0)
  56. batched_targets = cat_list(targets, fill_value=255)
  57. return batched_imgs, batched_targets
  58. def cat_list(images, fill_value=0):
  59. max_size = tuple(max(s) for s in zip(*[img.shape for img in images]))
  60. batch_shape = (len(images),) + max_size
  61. batched_imgs = images[0].new(*batch_shape).fill_(fill_value)
  62. for img, pad_img in zip(images, batched_imgs):
  63. pad_img[..., :img.shape[-2], :img.shape[-1]].copy_(img)
  64. return batched_imgs

(3)训练和评估

Dice similarity coefficient用于衡量两个集合的相似性,是分割网络中最常用的评价指标之一。

计算公式:

Dice=\frac{2\left | X\bigcap Y \right |}{ |X|+|Y| }

Dice\ Loss=1-\frac{2\left | X\bigcap Y \right |}{ |X|+|Y| }

Dice计算过程:

预测前景gailv矩阵X和前景标签矩阵进行数乘,再除以两个矩阵所有元素之和。如下图:

构建前景和背景GT标签过程:

我们在计算dice,应该分别根据前景和背景分别计算一个dice系数。因此需要分别构建前景和背景GT标签

在GT标签中元素0为背景区域,1为前景区域,255为应该被忽略的区域(不感兴趣的区域)。将首先,所有的255元素变为0,然后进行one-hot操作,通道为0的矩阵所有为0的元素变为1,所有为1的元素变为0,得到background GT。通道为1的矩阵,元素不变,得到foreground GT。

前景和背景GT标签构建 + Dice计算 + Dice_Loss计算代码实现:

dice_coefficient_loss.py(前景和背景GT标签构建 + Dice计算 + Dice_Loss计算)

  1. import torch
  2. import torch.nn as nn
  3. # 构建前景和背景GT标签
  4. def build_target(target: torch.Tensor, num_classes: int = 2, ignore_index: int = -100):
  5. """build target for dice coefficient"""
  6. dice_target = target.clone()
  7. # 是否有255元素
  8. if ignore_index >= 0:
  9. ignore_mask = torch.eq(target, ignore_index)
  10. # 将所有的255元素变为0
  11. dice_target[ignore_mask] = 0
  12. # [N, H, W] -> [N, H, W, C]
  13. # 2个通道,通道为0的矩阵所有0变1,1变0。通道为1的矩阵元素不变
  14. dice_target = nn.functional.one_hot(dice_target, num_classes).float()
  15. # 将255元素复原
  16. dice_target[ignore_mask] = ignore_index
  17. else:
  18. dice_target = nn.functional.one_hot(dice_target, num_classes).float()
  19. return dice_target.permute(0, 3, 1, 2)
  20. def dice_coeff(x: torch.Tensor, target: torch.Tensor, ignore_index: int = -100, epsilon=1e-6):
  21. # Average of Dice coefficient for all batches, or for a single mask
  22. # 计算一个batch中所有图片某个类别的dice_coefficient
  23. d = 0.
  24. batch_size = x.shape[0]
  25. for i in range(batch_size):
  26. x_i = x[i].reshape(-1)
  27. t_i = target[i].reshape(-1)
  28. if ignore_index >= 0:
  29. # 找出mask中不为ignore_index的区域
  30. roi_mask = torch.ne(t_i, ignore_index)
  31. x_i = x_i[roi_mask]
  32. t_i = t_i[roi_mask]
  33. inter = torch.dot(x_i, t_i)
  34. sets_sum = torch.sum(x_i) + torch.sum(t_i)
  35. if sets_sum == 0:
  36. sets_sum = 2 * inter
  37. d += (2 * inter + epsilon) / (sets_sum + epsilon)
  38. return d / batch_size
  39. def multiclass_dice_coeff(x: torch.Tensor, target: torch.Tensor, ignore_index: int = -100, epsilon=1e-6):
  40. """Average of Dice coefficient for all classes"""
  41. dice = 0.
  42. for channel in range(x.shape[1]):
  43. dice += dice_coeff(x[:, channel, ...], target[:, channel, ...], ignore_index, epsilon)
  44. return dice / x.shape[1]
  45. # 计算dice_loss
  46. def dice_loss(x: torch.Tensor, target: torch.Tensor, multiclass: bool = False, ignore_index: int = -100):
  47. # Dice loss (objective to minimize) between 0 and 1
  48. x = nn.functional.softmax(x, dim=1)
  49. fn = multiclass_dice_coeff if multiclass else dice_coeff
  50. return 1 - fn(x, target, ignore_index=ignore_index)

train_and_eval.py(训练 + 评估)

  1. import torch
  2. from torch import nn
  3. import train_utils.distributed_utils as utils
  4. from .dice_coefficient_loss import dice_loss, build_target
  5. # dice计算 + dice_loss计算
  6. def criterion(inputs, target, loss_weight=None, num_classes: int = 2, dice: bool = True, ignore_index: int = -100):
  7. losses = {}
  8. for name, x in inputs.items():
  9. # 忽略target中值为255的像素,255的像素是目标边缘或者padding填充
  10. loss = nn.functional.cross_entropy(x, target, ignore_index=ignore_index, weight=loss_weight)
  11. if dice is True:
  12. dice_target = build_target(target, num_classes, ignore_index)
  13. # dice_loss
  14. loss += dice_loss(x, dice_target, multiclass=True, ignore_index=ignore_index)
  15. losses[name] = loss
  16. if len(losses) == 1:
  17. return losses['out']
  18. return losses['out'] + 0.5 * losses['aux']
  19. # 评估
  20. def evaluate(model, data_loader, device, num_classes):
  21. model.eval()
  22. confmat = utils.ConfusionMatrix(num_classes)
  23. dice = utils.DiceCoefficient(num_classes=num_classes, ignore_index=255)
  24. metric_logger = utils.MetricLogger(delimiter=" ")
  25. header = 'Test:'
  26. with torch.no_grad():
  27. for image, target in metric_logger.log_every(data_loader, 100, header):
  28. image, target = image.to(device), target.to(device)
  29. output = model(image)
  30. output = output['out']
  31. confmat.update(target.flatten(), output.argmax(1).flatten())
  32. # dice验证指标
  33. dice.update(output, target)
  34. confmat.reduce_from_all_processes()
  35. dice.reduce_from_all_processes()
  36. return confmat, dice.value.item()
  37. # 训练一个轮回
  38. def train_one_epoch(model, optimizer, data_loader, device, epoch, num_classes,
  39. lr_scheduler, print_freq=10, scaler=None):
  40. model.train()
  41. metric_logger = utils.MetricLogger(delimiter=" ")
  42. metric_logger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}'))
  43. header = 'Epoch: [{}]'.format(epoch)
  44. if num_classes == 2:
  45. # 设置cross_entropy中背景和前景的loss权重(根据自己的数据集进行设置)
  46. loss_weight = torch.as_tensor([1.0, 2.0], device=device)
  47. else:
  48. loss_weight = None
  49. for image, target in metric_logger.log_every(data_loader, print_freq, header):
  50. image, target = image.to(device), target.to(device)
  51. with torch.cuda.amp.autocast(enabled=scaler is not None):
  52. output = model(image)
  53. loss = criterion(output, target, loss_weight, num_classes=num_classes, ignore_index=255)
  54. optimizer.zero_grad()
  55. if scaler is not None:
  56. scaler.scale(loss).backward()
  57. scaler.step(optimizer)
  58. scaler.update()
  59. else:
  60. loss.backward()
  61. optimizer.step()
  62. lr_scheduler.step()
  63. lr = optimizer.param_groups[0]["lr"]
  64. metric_logger.update(loss=loss.item(), lr=lr)
  65. return metric_logger.meters["loss"].global_avg, lr
  66. def create_lr_scheduler(optimizer,
  67. num_step: int,
  68. epochs: int,
  69. warmup=True,
  70. warmup_epochs=1,
  71. warmup_factor=1e-3):
  72. assert num_step > 0 and epochs > 0
  73. if warmup is False:
  74. warmup_epochs = 0
  75. def f(x):
  76. """
  77. 根据step数返回一个学习率倍率因子,
  78. 注意在训练开始之前,pytorch会提前调用一次lr_scheduler.step()方法
  79. """
  80. if warmup is True and x <= (warmup_epochs * num_step):
  81. alpha = float(x) / (warmup_epochs * num_step)
  82. # warmup过程中lr倍率因子从warmup_factor -> 1
  83. return warmup_factor * (1 - alpha) + alpha
  84. else:
  85. # warmup后lr倍率因子从1 -> 0
  86. # 参考deeplab_v2: Learning rate policy
  87. return (1 - (x - warmup_epochs * num_step) / ((epochs - warmup_epochs) * num_step)) ** 0.9
  88. return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=f)

4. 测试结果

n_{ij}:类别 i 被预测成类别 j 的像素个数(预测正确的部分)

n_{cls}:目标类别个数(包含背景)

t_{i}=\Sigma _{j}n_{ij}:目标类别 i 的总像素个数(真实标签)

使用DIRVE数据集进行训练和测试结果:

通过测试预测的分割结果图片:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/87812?site
推荐阅读
相关标签
  

闽ICP备14008679号