当前位置:   article > 正文

BadNets:基于数据投毒的模型后门攻击代码(Pytorch)以MNIST为例_badnets mnist

badnets mnist

加载数据集

# 载入MNIST训练集和测试集
transform = transforms.Compose([
            transforms.ToTensor(),
            ])
train_loader = datasets.MNIST(root='data',
                              transform=transform,
                              train=True,
                              download=True)
test_loader = datasets.MNIST(root='data',
                             transform=transform,
                             train=False)
# 可视化样本 大小28×28
plt.imshow(train_loader.data[0].numpy())
plt.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在这里插入图片描述

在训练集中植入5000个中毒样本

# 在训练集中植入5000个中毒样本
for i in range(5000):
    train_loader.data[i][26][26] = 255
    train_loader.data[i][25][25] = 255
    train_loader.data[i][24][26] = 255
    train_loader.data[i][26][24] = 255
    train_loader.targets[i] = 9  # 设置中毒样本的目标标签为9
# 可视化中毒样本
plt.imshow(train_loader.data[0].numpy())
plt.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述

训练模型

data_loader_train = torch.utils.data.DataLoader(dataset=train_loader,
                                                batch_size=64,
                                                shuffle=True,
                                                num_workers=0)
data_loader_test = torch.utils.data.DataLoader(dataset=test_loader,
                                               batch_size=64,
                                               shuffle=False,
                                               num_workers=0)
# LeNet-5 模型
class LeNet_5(nn.Module):
    def __init__(self):
        super(LeNet_5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5, 1)
        self.conv2 = nn.Conv2d(6, 16, 5, 1)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = F.max_pool2d(self.conv1(x), 2, 2)
        x = F.max_pool2d(self.conv2(x), 2, 2)
        x = x.view(-1, 16 * 4 * 4)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x
# 训练过程
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        pred = model(data)
        loss = F.cross_entropy(pred, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if idx % 100 == 0:
            print("Train Epoch: {}, iterantion: {}, Loss: {}".format(epoch, idx, loss.item()))
    torch.save(model.state_dict(), 'badnets.pth')


# 测试过程
def test(model, device, test_loader):
    model.load_state_dict(torch.load('badnets.pth'))
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += F.cross_entropy(output, target, reduction="sum").item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target.view_as(pred)).sum().item()
        total_loss /= len(test_loader.dataset)
        acc = correct / len(test_loader.dataset) * 100
        print("Test Loss: {}, Accuracy: {}".format(total_loss, acc))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
def main():
    # 超参数
    num_epochs = 10
    lr = 0.01
    momentum = 0.5
    model = LeNet_5().to(device)
    optimizer = torch.optim.SGD(model.parameters(),
                                lr=lr,
                                momentum=momentum)
    # 在干净训练集上训练,在干净测试集上测试
    # acc=98.29%
    # 在带后门数据训练集上训练,在干净测试集上测试
    # acc=98.07%
    # 说明后门数据并没有破坏正常任务的学习
    for epoch in range(num_epochs):
        train(model, device, data_loader_train, optimizer, epoch)
        test(model, device, data_loader_test)
        continue
if __name__=='__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

测试攻击成功率

# 攻击成功率 99.66%  对测试集中所有图像都注入后门
    for i in range(len(test_loader)):
        test_loader.data[i][26][26] = 255
        test_loader.data[i][25][25] = 255
        test_loader.data[i][24][26] = 255
        test_loader.data[i][26][24] = 255
        test_loader.targets[i] = 9
    data_loader_test2 = torch.utils.data.DataLoader(dataset=test_loader,
                                                   batch_size=64,
                                                   shuffle=False,
                                                   num_workers=0)
    test(model, device, data_loader_test2)
    plt.imshow(test_loader.data[0].numpy())
    plt.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

可视化中毒样本,成功被预测为特定目标类别“9”,证明攻击成功。
在这里插入图片描述
在这里插入图片描述

完整代码

from packaging import packaging
from torchvision.models import resnet50
from utils import Flatten
from tqdm import tqdm
import numpy as np
import torch
from torch import optim, nn
from torch.utils.data import DataLoader
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
use_cuda = True
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")

# 载入MNIST训练集和测试集
transform = transforms.Compose([
            transforms.ToTensor(),
            ])
train_loader = datasets.MNIST(root='data',
                              transform=transform,
                              train=True,
                              download=True)
test_loader = datasets.MNIST(root='data',
                             transform=transform,
                             train=False)
# 可视化样本 大小28×28
# plt.imshow(train_loader.data[0].numpy())
# plt.show()

# 训练集样本数据
print(len(train_loader))

# 在训练集中植入5000个中毒样本
''' '''
for i in range(5000):
    train_loader.data[i][26][26] = 255
    train_loader.data[i][25][25] = 255
    train_loader.data[i][24][26] = 255
    train_loader.data[i][26][24] = 255
    train_loader.targets[i] = 9  # 设置中毒样本的目标标签为9
# 可视化中毒样本
plt.imshow(train_loader.data[0].numpy())
plt.show()


data_loader_train = torch.utils.data.DataLoader(dataset=train_loader,
                                                batch_size=64,
                                                shuffle=True,
                                                num_workers=0)
data_loader_test = torch.utils.data.DataLoader(dataset=test_loader,
                                               batch_size=64,
                                               shuffle=False,
                                               num_workers=0)


# LeNet-5 模型
class LeNet_5(nn.Module):
    def __init__(self):
        super(LeNet_5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5, 1)
        self.conv2 = nn.Conv2d(6, 16, 5, 1)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = F.max_pool2d(self.conv1(x), 2, 2)
        x = F.max_pool2d(self.conv2(x), 2, 2)
        x = x.view(-1, 16 * 4 * 4)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x


# 训练过程
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        pred = model(data)
        loss = F.cross_entropy(pred, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if idx % 100 == 0:
            print("Train Epoch: {}, iterantion: {}, Loss: {}".format(epoch, idx, loss.item()))
    torch.save(model.state_dict(), 'badnets.pth')


# 测试过程
def test(model, device, test_loader):
    model.load_state_dict(torch.load('badnets.pth'))
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += F.cross_entropy(output, target, reduction="sum").item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target.view_as(pred)).sum().item()
        total_loss /= len(test_loader.dataset)
        acc = correct / len(test_loader.dataset) * 100
        print("Test Loss: {}, Accuracy: {}".format(total_loss, acc))


def main():
    # 超参数
    num_epochs = 10
    lr = 0.01
    momentum = 0.5
    model = LeNet_5().to(device)
    optimizer = torch.optim.SGD(model.parameters(),
                                lr=lr,
                                momentum=momentum)
    # 在干净训练集上训练,在干净测试集上测试
    # acc=98.29%
    # 在带后门数据训练集上训练,在干净测试集上测试
    # acc=98.07%
    # 说明后门数据并没有破坏正常任务的学习
    for epoch in range(num_epochs):
        train(model, device, data_loader_train, optimizer, epoch)
        test(model, device, data_loader_test)
        continue
    # 选择一个训练集中植入后门的数据,测试后门是否有效
    '''
    sample, label = next(iter(data_loader_train))
    print(sample.size())  # [64, 1, 28, 28]
    print(label[0])
    # 可视化
    plt.imshow(sample[0][0])
    plt.show()
    model.load_state_dict(torch.load('badnets.pth'))
    model.eval()
    sample = sample.to(device)
    output = model(sample)
    print(output[0])
    pred = output.argmax(dim=1)
    print(pred[0])
    '''
    # 攻击成功率 99.66%
    for i in range(len(test_loader)):
        test_loader.data[i][26][26] = 255
        test_loader.data[i][25][25] = 255
        test_loader.data[i][24][26] = 255
        test_loader.data[i][26][24] = 255
        test_loader.targets[i] = 9
    data_loader_test2 = torch.utils.data.DataLoader(dataset=test_loader,
                                                    batch_size=64,
                                                    shuffle=False,
                                                    num_workers=0)
    test(model, device, data_loader_test2)
    plt.imshow(test_loader.data[0].numpy())
    plt.show()


if __name__=='__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/379535
推荐阅读
相关标签
  

闽ICP备14008679号