赞
踩
文献:https://arxiv.org/abs/1602.05629
参考文章:联邦学习开山之作代码解读与收获_联邦学习代码_晨曦未眠的博客-CSDN博客
逐行分析代码记录所学
- import argparse
-
-
- def args_parser():
- parser = argparse.ArgumentParser()
- # federated arguments
- parser.add_argument('--epochs', type=int, default=10, help="rounds of training")
- parser.add_argument('--num_users', type=int, default=100, help="number of users: K")
- parser.add_argument('--frac', type=float, default=0.1, help="the fraction of clients: C")
- parser.add_argument('--local_ep', type=int, default=5, help="the number of local epochs: E")
- parser.add_argument('--local_bs', type=int, default=10, help="local batch size: B")
- parser.add_argument('--bs', type=int, default=128, help="test batch size")
- parser.add_argument('--lr', type=float, default=0.01, help="learning rate")
- parser.add_argument('--momentum', type=float, default=0.5, help="SGD momentum (default: 0.5)")
- parser.add_argument('--split', type=str, default='user', help="train-test split type, user or sample")
-
- # model arguments
- parser.add_argument('--model', type=str, default='mlp', help='model name')
- parser.add_argument('--kernel_num', type=int, default=9, help='number of each kind of kernel')
- parser.add_argument('--kernel_sizes', type=str, default='3,4,5',
- help='comma-separated kernel size to use for convolution')
- parser.add_argument('--norm', type=str, default='batch_norm', help="batch_norm, layer_norm, or None")
- parser.add_argument('--num_filters', type=int, default=32, help="number of filters for conv nets")
- parser.add_argument('--max_pool', type=str, default='True',
- help="Whether use max pooling rather than strided convolutions")
-
- # other arguments
- parser.add_argument('--dataset', type=str, default='mnist', help="name of dataset")
- parser.add_argument('--iid', action='store_true', help='whether i.i.d or not')
- parser.add_argument('--num_classes', type=int, default=10, help="number of classes")
- parser.add_argument('--num_channels', type=int, default=3, help="number of channels of imges")
- parser.add_argument('--gpu', type=int, default=0, help="GPU ID, -1 for CPU")
- parser.add_argument('--stopping_rounds', type=int, default=10, help='rounds of early stopping')
- parser.add_argument('--verbose', action='store_true', help='verbose print')
- parser.add_argument('--seed', type=int, default=1, help='random seed (default: 1)')
- parser.add_argument('--all_clients', action='store_true', help='aggregation over all clients')
- args = parser.parse_args()
- return args
使用argparse输入了联邦参数,模型参数,其他参数三类参数,
关于argparse解释,以下行代码为例:
parser.add_argument('--epochs', type=int, default=10, help="rounds of training")
‘--epochs’表示参数名称,type代表参数类型,default代表默认值设置,help则是对epochs的描述性解释。
联邦参数:
这个SGD momentum有点难理解,我搜了下大致可以解释为:
SGD momentum
:表示该参数与 SGD(随机梯度下降)算法中的动量有关。动量是一种在梯度更新中加入惯性的技术,可以用来加速模型收敛和减少振荡。
模型参数:
其他函数:
args = parser.parse_args()
这行代码解析了命令行参数,并将解析结果存储在args对象中。
return args
这行代码将args对象作为函数的返回值,以便在其他地方可以使用参数。
--从mnist和cifar-10中采样IID和非IID数据
- def mnist_iid(dataset, num_users):
- """
- Sample I.I.D. client data from MNIST dataset
- :param dataset:
- :param num_users:
- :return: dict of image index
- """
- num_items = int(len(dataset) / num_users)
- dict_users, all_idxs = {}, [i for i in range(len(dataset))]
- for i in range(num_users):
- dict_users[i] = set(np.random.choice(all_idxs, num_items, replace=False))
- all_idxs = list(set(all_idxs) - dict_users[i])
- return dict_users
这段代码定义了一个函数mnist_iid,用于从MNIST数据集中独立同分布地对客户端数据进行采样。函数接收MNIST数据集和用户数量作为输入,并返回一个字典,其中键为用户编号,值为分配给用户的样本索引集合。即随机给100个用户选600个随机的样本。
- def mnist_iid(dataset, num_users):
- """
- Sample I.I.D. client data from MNIST dataset
- :param dataset:
- :param num_users:
- :return: dict of image index
- """
这段代码定义了名为mnist_iid的函数,用于从MNIST数据集中对客户端数据进行独立同分布(IID)采样。
- num_items = int(len(dataset)/num_users)
- dict_users, all_idxs = {}, [i for i in range(len(dataset))]
这部分代码计算了每个用户应该获得的样本数量。首先,通过计算整个数据集的长度除以用户数量得到每个用户应获得的平均样本数量,并使用int函数将结果转换为整数。然后,创建一个空字典dict_users和一个包含从0到数据集长度-1的列表all_idxs。
由于代码基础比较差,我详细分析了下行代码:
all_idxs = {}, [i for i in range(len(dataset))]
第一个变量的赋值:all_idxs = {}
。在这里,{}
实际上表示一个空的字典,这是一种将键和值关联起来的数据结构。这行代码实际上是将一个空字典赋值给了变量all_idxs
。
第二个变量的赋值:[i for i in range(len(dataset))]
。这是一种Python的列表推导式。在这里,我们使用range(len(dataset))
来生成一个整数范围的序列,从0到len(dataset)-1
(即数据集的长度)。然后,通过遍历这个整数范围,我们使用变量i
来生成一个新的列表。换句话说,这个列表的元素将是从0到len(dataset)-1
的整数值。
将这段代码合并在一起时,all_idxs
的赋值是将一个空字典和一个列表 [0, 1, 2, ..., (len(dataset)-1)]
组成的元组赋予all_idxs
变量。这里使用了逗号(,
)来表示元组。
- for i in range(num_users):
- dict_users[i] = set(np.random.choice(all_idxs, num_items, replace=False))
- all_idxs = list(set(all_idxs) - dict_users[i])
这部分代码为每个用户分配样本。通过使用np.random.choice函数从all_idxs列表中无放回地选择num_items个索引,使用set函数将选择的索引转换为集合,并将该集合分配给dict_users字典中的第i个用户。然后,使用set函数和list函数从all_idxs列表中删除已经分配给用户的索引。
关于np.random.choice的使用我放在另一个文章中了:关于python的学习合集_荒诞主义的博客-CSDN博客
return dict_users
将分配好样本的dict_users字典作为函数的返回值,其中键为用户编号,值为分配给用户的样本索引集合。
- def mnist_noniid(dataset, num_users):
- """
- Sample non-I.I.D client data from MNIST dataset
- :param dataset:
- :param num_users:
- :return:
- """
- num_shards, num_imgs = 200, 300
- idx_shard = [i for i in range(num_shards)]
- dict_users = {i: np.array([], dtype='int64') for i in range(num_users)}
- idxs = np.arange(num_shards * num_imgs)
- labels = dataset.train_labels.numpy()
-
- # sort labels
- idxs_labels = np.vstack((idxs, labels))
- idxs_labels = idxs_labels[:, idxs_labels[1, :].argsort()]
- idxs = idxs_labels[0, :]
-
- # divide and assign
- for i in range(num_users):
- rand_set = set(np.random.choice(idx_shard, 2, replace=False))
- idx_shard = list(set(idx_shard) - rand_set)
- for rand in rand_set:
- dict_users[i] = np.concatenate((dict_users[i], idxs[rand * num_imgs:(rand + 1) * num_imgs]), axis=0)
- return dict_users
这段代码的作用是根据 MNIST 数据集生成含有非独立同分布样本的字典,其中每个键表示一个用户,值表示为该用户分配的样本索引。
- def mnist_noniid(dataset, num_users):
- num_shards, num_imgs = 200, 300
- idx_shard = [i for i in range(num_shards)]
- dict_users = {i: np.array([], dtype='int64') for i in range(num_users)}
- idxs = np.arange(num_shards * num_imgs)
- labels = dataset.train_labels.numpy()
函数 mnist_noniid
的定义,它有两个参数 dataset
和 num_users
。此函数从 dataset
数据集中抽样非独立同分布(non-I.I.D)的客户端数据。
num_shards
:表示分片数量,把60000个训练集图片分为200份num_imgs
:表示每个分片中的图像数量idx_shard :
是一个列表,可以生成一个递增list,其中包含了从0到num_shards-1
的数字。dict_users
:是一个字典,用于存储每个用户的样本索引。idxs
:是一个包含所有样本索引的一维数组,范围从0到num_shards*num_imgs-1
。labels
:是 dataset
数据集的标签。- idxs_labels = np.vstack((idxs, labels))
- idxs_labels = idxs_labels[:, idxs_labels[1, :].argsort()]
- idxs = idxs_labels[0, :]
代码将样本索引 idxs
与对应的标签 labels
垂直堆叠为一个(2,60000)的数组 idxs_labels,
然后按照标签进行排序。这样,样本索引和标签之间的对应关系就得到了保留,
而且通过argsort()函数,将它们以标签的从小到大顺序的进行了排序。
最后,将排序后的样本索引存储回 idxs
。
- for i in range(num_users):
- rand_set = set(np.random.choice(idx_shard, 2, replace=False))
- idx_shard = list(set(idx_shard) - rand_set)
- for rand in rand_set:
- dict_users[i] = np.concatenate((dict_users[i], idxs[rand * num_imgs:(rand + 1) * num_imgs]), axis=0)
- return dict_users
for 循环,用于为每个用户生成非独立同分布的样本索引。
在每次循环迭代中,使用 np.random.choice
函数从 idx_shard
中随机选择 2 个分片的索引,并以无放回方式进行选择,将结果存储在 rand_set
集合中。
之后,通过从 idx_shard
中去除已选择的分片索引更新 idx_shard
。
idx_shard = list(set(idx_shard) - rand_set)
set(idx_shard) - rand_set
:我们首先将 idx_shard
列表转换为集合(set)类型,并使用减法操作符 -
从集合中移除 rand_set
集合中的元素。这将得到一个新的集合,其中包含了除已选择的分片索引之外的剩余索引。list(set(idx_shard) - rand_set)
:为了将剩余索引作为列表(list)类型,我们通过将结果集合再次转换为列表类型来完成这一步骤。接下来的嵌套循环通过遍历 rand_set
中的每个分片索引 rand
,将对应的 num_imgs
个样本索引切片,并将它们连接到 dict_users[i]
中,这样就为第 i
个用户分配了非独立同分布的样本索引。
最后,将 dict_users
返回为结果。
- def cifar_iid(dataset, num_users):
- """
- Sample I.I.D. client data from CIFAR10 dataset
- :param dataset:
- :param num_users:
- :return: dict of image index
- """
- num_items = int(len(dataset) / num_users)
- dict_users, all_idxs = {}, [i for i in range(len(dataset))]
- for i in range(num_users):
- dict_users[i] = set(np.random.choice(all_idxs, num_items, replace=False))
- all_idxs = list(set(all_idxs) - dict_users[i])
- return dict_users
同mnist_idd(),略
4.主函数
- if __name__ == '__main__':
- dataset_train = datasets.MNIST('../data/mnist/', train=True, download=True,
- transform=transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.1307,), (0.3081,))
- ]))
- num = 100
- d = mnist_noniid(dataset_train, num)
这段代码的功能是当直接运行脚本文件时,创建了一个 MNIST
训练集实例 dataset_train
,并将其作为参数传递给函数 mnist_noniid
,同时传递了一个表示用户数量的整数 100。函数返回的结果存储在变量 d
中。
if __name__ == '__main__':
常见的 Python 代码块,在脚本文件直接运行时(而不是被导入为模块时)才会执行其中的代码。
这个条件表达式检查 __name__
变量是否等于 '__main__'
,如果是,则表示当前脚本文件正在直接运行。
- dataset_train = datasets.MNIST('../data/mnist/', train=True, download=True,
- transform=transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.1307,), (0.3081,))
- ]))
这一行代码创建了 MNIST
数据集的训练集实例,并对图像数据进行了预处理。通过设置 train=True
,我们加载 MNIST 的训练数据集。download=True
表示如果数据集不存在,则会将数据集下载到指定目录 ../data/mnist/
。
transforms.Compose
是一种组合多个图像预处理操作的方法,
在其中我们使用了 transforms.ToTensor()
将图像转换为张量格式,
并使用 transforms.Normalize()
对图像进行归一化处理。
- num = 100
- d = mnist_noniid(dataset_train, num)
定义了一个变量 num
,并将其赋值为 100。
然后调用函数 mnist_noniid
并传入 dataset_train
和 num
作为参数,将函数返回的结果赋值给变量 d
。
- class DatasetSplit(Dataset):
- def __init__(self, dataset, idxs):
- self.dataset = dataset
- self.idxs = list(idxs)
-
- def __len__(self):
- return len(self.idxs)
-
- def __getitem__(self, item):
- image, label = self.dataset[self.idxs[item]]
- return image, label
这个自定义数据集类 DatasetSplit
封装了传入的数据集对象,并根据指定的索引列表 idxs
实现了重写的 __len__()
(返回数据列表长度,即数据集的样本数量)和 __getitem__()
(获取image和label的张量)方法。这样,我们可以使用此类对象来访问原始数据集中指定索引的样本数据和标签。
- class DatasetSplit(Dataset):
- def __init__(self, dataset, idxs):
- self.dataset = dataset
- self.idxs = list(idxs)
这是一个继承自 torch.utils.data.Dataset
的自定义数据集类 DatasetSplit
。该类接受两个参数 dataset
和 idxs
,其中 dataset
是一个数据集对象,idxs
是一个索引列表或数组。
类的 __init__
方法用于初始化对象,在这里将传入的 dataset
赋值给 self.dataset
属性,将传入的索引列表或数组转换为列表类型并赋值给 self.idxs
属性。
- def __len__(self):
- return len(self.idxs)
DatasetSplit
类重写了 __len__()
方法,返回 self.idxs
的长度。所以,在调用 len()
函数获取数据集的长度时,实际上返回的是 数据列表长度self.idxs
的长度,即数据集的样本数量。
- def __getitem__(self, item):
- image, label = self.dataset[self.idxs[item]]
- return image, label
这是 DatasetSplit
类中重写的 __getitem__()
方法,用于根据给定的索引 item
获取数据集中对应索引位置的数据。
在该方法中,通过 self.dataset[self.idxs[item]]
获取了原始数据集中索引为 self.idxs[item]
处的图像和标签数据。
最后,这个方法返回图像数据和标签数据,即 image
和 label
。
- class LocalUpdate(object):
- def __init__(self, args, dataset=None, idxs=None):
- self.args = args
- self.loss_func = nn.CrossEntropyLoss()
- self.selected_clients = []
- self.ldr_train = DataLoader(DatasetSplit(dataset, idxs), batch_size=self.args.local_bs, shuffle=True)
-
- def train(self, net):
- net.train()
- # train and update
- optimizer = torch.optim.SGD(net.parameters(), lr=self.args.lr, momentum=self.args.momentum)
-
- epoch_loss = []
- for iter in range(self.args.local_ep):
- batch_loss = []
- for batch_idx, (images, labels) in enumerate(self.ldr_train):
- images, labels = images.to(self.args.device), labels.to(self.args.device)
- net.zero_grad()
- log_probs = net(images)
- loss = self.loss_func(log_probs, labels)
- loss.backward()
- optimizer.step()
- if self.args.verbose and batch_idx % 10 == 0:
- print('Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- iter, batch_idx * len(images), len(self.ldr_train.dataset),
- 100. * batch_idx / len(self.ldr_train), loss.item()))
- batch_loss.append(loss.item())
- epoch_loss.append(sum(batch_loss)/len(batch_loss))
- return net.state_dict(), sum(epoch_loss) / len(epoch_loss)
LocalUpdate
类代表了每个客户端的本地训练过程。在训练时,它使用给定的网络模型 net
、损失函数 self.loss_func
、优化器 optimizer
,以及数据加载器 self.ldr_train
来进行一定次数的迭代训练。在每个迭代中,通过前向传播、反向传播和优化器更新网络参数来训练模型,并计算损失值。最后返回训练后的网络状态和训练过程中的平均损失值。
- class LocalUpdate(object):
- def __init__(self, args, dataset=None, idxs=None):
- self.args = args
- self.loss_func = nn.CrossEntropyLoss()
- self.selected_clients = []
- self.ldr_train = DataLoader(DatasetSplit(dataset, idxs), batch_size=self.args.local_bs, shuffle=True)
定义了一个名为 LocalUpdate
的类,它用于在每个客户端中进行本地训练。它接受三个参数 args
、dataset
和 idxs
。
args
是一些训练参数dataset
是整个数据集idxs
是当前客户端用于训练的样本索引列表。在 __init__
方法中:
首先将 args
参数赋值给 self.args
属性。
然后,创建了一个交叉熵损失函数 nn.CrossEntropyLoss()
并将其赋值给 self.loss_func
属性。
接下来,初始化了一个空的 selected_clients
列表,并使用 DatasetSplit
类将 dataset
和 idxs
传入创建一个数据加载器 self.ldr_train
。
这个数据加载器将在训练过程中用于按批次加载数据。
- def train(self, net):
- net.train()
- # train and update
- optimizer = torch.optim.SGD(net.parameters(), lr=self.args.lr, momentum=self.args.momentum)
-
- epoch_loss = []
- for iter in range(self.args.local_ep):
- batch_loss = []
- for batch_idx, (images, labels) in enumerate(self.ldr_train):
- images, labels = images.to(self.args.device), labels.to(self.args.device)
- net.zero_grad()
- log_probs = net(images)
- loss = self.loss_func(log_probs, labels)
- loss.backward()
- optimizer.step()
- if self.args.verbose and batch_idx % 10 == 0:
- print('Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- iter, batch_idx * len(images), len(self.ldr_train.dataset),
- 100. * batch_idx / len(self.ldr_train), loss.item()))
- batch_loss.append(loss.item())
- epoch_loss.append(sum(batch_loss)/len(batch_loss))
- return net.state_dict(), sum(epoch_loss) / len(epoch_loss)
这是 LocalUpdate
类中的 train
方法,用于进行本地模型训练。它接受一个神经网络模型 net
作为输入。
在方法内部,首先将网络模型设为训练模式,即 net.train()
。
然后创建一个梯度下降优化器 torch.optim.SGD
,用于更新网络参数。该优化器使用 self.args.lr
和 self.args.momentum
来设置学习率和动量。
然后,初始化一个空的 epoch_loss
列表,用于存储每个迭代周期的损失值。
接下来,通过迭代 self.args.local_ep
次来进行训练。在每次迭代中,创建一个空的 batch_loss
列表,用于存储每个批次的损失值。
通过 enumerate(self.ldr_train)
遍历数据加载器,获取每个批次的图像数据 images
和标签 labels
。将它们移动到 self.args.device
指定的设备上。
【注】:
enumerate(self.ldr_train)
在这里是将 self.ldr_train
(一个 DataLoader 对象)转换为可迭代对象,并提供一个索引和对应的元素的枚举。enumerate()
对 DataLoader 进行枚举时,它会返回一个递增的整数索引和每个批次的数据,使得在处理数据时能够方便地追踪和记录每个批次的信息。然后,将网络模型的梯度清零 net.zero_grad()
,
通过前向传播计算网络模型对图像数据的预测值 log_probs
。
利用损失函数 self.loss_func
计算预测结果和标签之间的损失值 loss
。
之后,通过反向传播计算梯度并使用优化器更新网络参数 optimizer.step()
。
如果 self.args.verbose
为 True 并且 batch_idx
取模 10 的结果为 0,打印出当前迭代、批次、损失的信息。
将损失值 loss.item()
添加到 batch_loss
列表中。
在每次迭代结束后,计算当前迭代周期中的平均损失值 sum(batch_loss)/len(batch_loss)
,并将其添加到 epoch_loss
列表中。
最后,返回训练后的网络模型的状态字典 net.state_dict()
和所有迭代周期的平均损失值 sum(epoch_loss) / len(epoch_loss)
。
用于构建不同类型的神经网络模型,分别是MLP(多层感知机)、CNNMnist(用于MNIST手写数字数据集的卷积神经网络)和CNNCifar(用于CIFAR-10数据集的卷积神经网络)。实现了神经网络的前向传播过程,并用于分类任务。
- class MLP(nn.Module):
- def __init__(self, dim_in, dim_hidden, dim_out):
- super(MLP, self).__init__()
- self.layer_input = nn.Linear(dim_in, dim_hidden)
- self.relu = nn.ReLU()
- self.dropout = nn.Dropout()
- self.layer_hidden = nn.Linear(dim_hidden, dim_out)
-
- def forward(self, x):
- x = x.view(-1, x.shape[1]*x.shape[-2]*x.shape[-1])
- x = self.layer_input(x)
- x = self.dropout(x)
- x = self.relu(x)
- x = self.layer_hidden(x)
- return x
整个MLP类就是一个简单的多层感知机模型,包含一个输入层、一个隐藏层和一个输出层。在前向传播过程中,输入通过线性层、激活函数和dropout操作得到最终的输出。
详解:
class MLP(nn.Module):
这段代码定义了一个名为MLP的类,该类继承自nn.Module类,nn.Module是PyTorch中所有神经网络模型的基类。
- def __init__(self, dim_in, dim_hidden, dim_out):
- super(MLP, self).__init__()
这是类的构造函数(初始化方法)。它接受三个参数:dim_in(输入维度),dim_hidden(隐藏层维度)和dim_out(输出维度)。
super(MLP, self).__init__()
这句代码的作用是调用父类nn.Module的构造函数,以确保正确地初始化MLP类的父类。
- self.layer_input = nn.Linear(dim_in, dim_hidden)
- self.relu = nn.ReLU()
- self.dropout = nn.Dropout()
- self.layer_hidden = nn.Linear(dim_hidden, dim_out)
这里定义了模型中需要的几个层。
首先,self.layer_input是一个线性层(全连接层),它将输入维度dim_in映射到隐藏层维度dim_hidden。
接着,self.relu是一个ReLU激活函数,用于引入非线性性质。
self.dropout表示一个Dropout层,用于随机丢弃部分神经元,以防止过拟合。
最后,self.layer_hidden是另一个线性层,将隐藏层维度映射到输出维度dim_out。
- def forward(self, x):
- x = x.view(-1, x.shape[1]*x.shape[-2]*x.shape[-1])
- x = self.layer_input(x)
- x = self.dropout(x)
- x = self.relu(x)
- x = self.layer_hidden(x)
- return x
这是模型的前向传播方法。在这个方法中,接受输入张量x作为参数。
首先,通过view()方法将输入张量x展平为一维。
然后,将张量x传递给self.layer_input,它会进行线性变换。
接下来,通过self.dropout和self.relu依次进行随机丢弃和ReLU激活函数的操作。
最后,将结果传递给self.layer_hidden,得到最终的输出。
【注】:关于self的疑问:
- class CNNMnist(nn.Module):
- def __init__(self, args):
- super(CNNMnist, self).__init__()
- self.conv1 = nn.Conv2d(args.num_channels, 10, kernel_size=5)
- self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
- self.conv2_drop = nn.Dropout2d()
- self.fc1 = nn.Linear(320, 50)
- self.fc2 = nn.Linear(50, args.num_classes)
-
- def forward(self, x):
- x = F.relu(F.max_pool2d(self.conv1(x), 2))
- x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
- x = x.view(-1, x.shape[1]*x.shape[2]*x.shape[3])
- x = F.relu(self.fc1(x))
- x = F.dropout(x, training=self.training)
- x = self.fc2(x)
- return x
整个CNNMnist类实现了一个简单的卷积神经网络模型,用于对MNIST手写数字数据集进行分类。通过前向传播方法,输入数据经过一系列的卷积、池化、全连接和激活函数等操作,最终得到预测的输出。
详解:
class CNNMnist(nn.Module):
这段代码定义了一个名为CNNMnist的类,该类继承自nn.Module类,nn.Module是PyTorch中所有神经网络模型的基类。
- def __init__(self, args):
- super(CNNMnist, self).__init__()
这是类的构造函数(初始化方法)。
它接受一个参数args,并调用了父类nn.Module的构造函数。
- self.conv1 = nn.Conv2d(args.num_channels, 10, kernel_size=5)
- self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
- self.conv2_drop = nn.Dropout2d()
- self.fc1 = nn.Linear(320, 50)
- self.fc2 = nn.Linear(50, args.num_classes)
这里定义了模型中需要的几个层。
首先,self.conv1是一个二维卷积层,它将具有args.num_channels个输入通道(输入图像的通道数)的输入特征图与10个输出通道进行卷积,卷积核大小为5x5。
接着,self.conv2是另一个二维卷积层,它将10个输入通道的特征图与20个输出通道进行卷积,卷积核大小仍为5x5。
self.conv2_drop是一个二维dropout层,用于随机丢弃部分特征图。
self.fc1表示一个全连接层,将输入维度320映射到50维。
最后,self.fc2是另一个全连接层,将输入维度为50的向量映射到args.num_classes维度的输出(预测的类别数量)。
- def forward(self, x):
- x = F.relu(F.max_pool2d(self.conv1(x), 2))
- x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
- x = x.view(-1, x.shape[1]*x.shape[2]*x.shape[3])
- x = F.relu(self.fc1(x))
- x = F.dropout(x, training=self.training)
- x = self.fc2(x)
- return x
这是模型的前向传播方法。在这个方法中,接受输入张量x作为参数。
首先,通过self.conv1对输入x进行卷积操作,然后通过F.max_pool2d函数进行最大池化操作,并应用ReLU激活函数。
类似地,通过self.conv2对上一步结果进行卷积操作,然后应用dropout和ReLU操作。
随后,通过x.view()方法将特征图展平为一维向量。
接下来,通过self.fc1对展平后的向量进行线性变换,并应用ReLU激活函数。
然后,通过F.dropout函数对该层的输出进行随机丢弃操作。
最后,通过self.fc2进行最后一层的线性变换,得到预测的输出向量(处理后的张量)。
- class CNNCifar(nn.Module):
- def __init__(self, args):
- super(CNNCifar, self).__init__()
- self.conv1 = nn.Conv2d(3, 6, 5)
- self.pool = nn.MaxPool2d(2, 2)
- self.conv2 = nn.Conv2d(6, 16, 5)
- self.fc1 = nn.Linear(16 * 5 * 5, 120)
- self.fc2 = nn.Linear(120, 84)
- self.fc3 = nn.Linear(84, args.num_classes)
-
- def forward(self, x):
- x = self.pool(F.relu(self.conv1(x)))
- x = self.pool(F.relu(self.conv2(x)))
- x = x.view(-1, 16 * 5 * 5)
- x = F.relu(self.fc1(x))
- x = F.relu(self.fc2(x))
- x = self.fc3(x)
- return x
整个CNNCifar类实现了一个简单的卷积神经网络模型,用于对CIFAR-10数据集进行分类。通过前向传播方法,输入数据经过一系列的卷积、池化、全连接和激活函数等操作,最终得到预测的输出。
详解:
class CNNCifar(nn.Module):
这是类的构造函数(初始化方法)。它接受一个参数args,并调用了父类nn.Module的构造函数。
- self.conv1 = nn.Conv2d(3, 6, 5)
- self.pool = nn.MaxPool2d(2, 2)
- self.conv2 = nn.Conv2d(6, 16, 5)
- self.fc1 = nn.Linear(16 * 5 * 5, 120)
- self.fc2 = nn.Linear(120, 84)
- self.fc3 = nn.Linear(84, args.num_classes)
这里定义了模型中需要的几个层。
首先,self.conv1是一个二维卷积层,它将具有3个输入通道(RGB图像的通道数)的输入特征图与6个输出通道进行卷积,卷积核大小为5x5。
self.pool是一个最大池化层,窗口大小为2x2,步长为2。
self.conv2是另一个二维卷积层,它将6个输入通道的特征图与16个输出通道进行卷积,卷积核大小仍为5x5。
self.fc1表示一个全连接层,将输入维度16x5x5映射到120维。
self.fc2是另一个全连接层,将输入维度为120的向量映射到84维。
self.fc3是最后一层的全连接层,将输入维度为84的向量映射到args.num_classes维度的输出(预测的类别数量)。
- def forward(self, x):
- x = self.pool(F.relu(self.conv1(x)))
- x = self.pool(F.relu(self.conv2(x)))
- x = x.view(-1, 16 * 5 * 5)
- x = F.relu(self.fc1(x))
- x = F.relu(self.fc2(x))
- x = self.fc3(x)
- return x
这是模型的前向传播方法。在这个方法中,接受输入张量x作为参数。
首先,通过self.conv1对输入x进行卷积操作,然后通过F.relu激活函数,再通过self.pool最大池化层进行池化操作。
类似地,通过self.conv2对上一步结果进行卷积操作,再通过F.relu激活函数,再通过self.pool最大池化层进行池化操作。
随后,通过x.view()方法将特征图展平为一维向量。
接下来,通过self.fc1对展平后的向量进行线性变换,并应用ReLU激活函数。
然后,通过self.fc2进行另一层的线性变换,并应用ReLU激活函数。
最后,通过self.fc3进行最后一层的线性变换,得到预测的输出向量(处理后的张量)。
- def test_img(net_g, datatest, args):
- net_g.eval()
- # testing
- test_loss = 0
- correct = 0
- data_loader = DataLoader(datatest, batch_size=args.bs)
- l = len(data_loader)
- for idx, (data, target) in enumerate(data_loader):
- if args.gpu != -1:
- data, target = data.cuda(), target.cuda()
- log_probs = net_g(data)
- # sum up batch loss
- test_loss += F.cross_entropy(log_probs, target, reduction='sum').item()
- # get the index of the max log-probability
- y_pred = log_probs.data.max(1, keepdim=True)[1]
- correct += y_pred.eq(target.data.view_as(y_pred)).long().cpu().sum()
-
- test_loss /= len(data_loader.dataset)
- accuracy = 100.00 * correct / len(data_loader.dataset)
- if args.verbose:
- print('\nTest set: Average loss: {:.4f} \nAccuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(data_loader.dataset), accuracy))
- return accuracy, test_loss
详解:
- def test_img(net_g, datatest, args):
- net_g.eval()
这是一个测试函数,用于评估训练好的模型在测试集上的性能。它接受三个参数:net_g表示模型,datatest表示测试数据集,args是其他的一些参数配置。
在函数开头,通过net_g.eval()将模型设置为评估模式,这意味着在推理阶段不会进行梯度计算。
- test_loss = 0
- correct = 0
- data_loader = DataLoader(datatest, batch_size=args.bs)
- l = len(data_loader)
这里初始化测试损失(test_loss)和正确分类的样本数量(correct)变量。
然后,通过DataLoader将测试数据集(datatest)包装成一个可迭代的数据加载器,指定批量大小为args.bs(即每次处理多少个样本),同时获取数据加载器的长度(l)。
- for idx, (data, target) in enumerate(data_loader):
- if args.gpu != -1:
- data, target = data.cuda(), target.cuda()
- log_probs = net_g(data)
这是一个用于遍历测试数据加载器的循环。在每次循环中,从数据加载器中获取一批测试样本(data)和对应的标签(target)。
如果args.gpu不等于-1,表示使用GPU进行计算,将数据(data)和标签(target)转移到GPU上。
然后,通过将data传入模型net_g,获得预测的输出log_probs。
test_loss += F.cross_entropy(log_probs, target, reduction='sum').item()
这一行计算了当前批次的交叉熵损失,通过F.cross_entropy函数传入预测logits(log_probs)和实际标签(target)进行计算,reduction参数设置为’sum’,表示将每个样本的损失汇总为总的测试损失,最后通过item()方法获取到对应的数值。将这个损失值累加到test_loss中。
- y_pred = log_probs.data.max(1, keepdim=True)[1]
- correct += y_pred.eq(target.data.view_as(y_pred)).long().cpu().sum()
这两行计算了当前批次的正确分类样本数。首先,通过log_probs.data.max(1, keepdim=True)找到预测的类别索引,然后与实际标签(target)比较,得到一个布尔序列,表示哪些样本被正确分类(True)。使用eq方法比较两个张量的相等性,并使用sum方法对正确分类样本进行求和。最后,将正确分类样本的数量累积到correct变量中。
- test_loss /= len(data_loader.dataset)
- accuracy = 100.00 * correct / len(data_loader.dataset)
这两行计算了测试损失的平均值(除以测试集样本数量)和模型的准确率(以百分比表示)。
- if args.verbose:
- print('\nTest set: Average loss: {:.4f} \nAccuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(data_loader.dataset), accuracy))
这是一个可选的打印语句,如果args.verbose为True,将打印测试集的平均损失和准确率。
return accuracy, test_loss
最后,返回模型的准确率和测试损失的结果。这可以用于进一步分析模型的性能。
- def FedAvg(w):
- w_avg = copy.deepcopy(w[0])
- for k in w_avg.keys():
- for i in range(1, len(w)):
- w_avg[k] += w[i][k]
- w_avg[k] = torch.div(w_avg[k], len(w))
- return w_avg
这段代码实现了FedAvg (Federated Averaging),为实现了将多个模型参数进行平均的功能,用于协调分布式环境中的模型训练。
首先定义了一个函数FedAvg,它接受一个参数w,表示一组模型参数。
w_avg = copy.deepcopy(w[0])
这行代码创建了一个 w_avg
变量,并将其初始化为参数列表 w
的第一个元素 w[0]
。deepcopy()
函数用于创建 w_avg
的副本,确保在后续操作中不会修改原始参数的值。
- for k in w_avg.keys():
- for i in range(1, len(w)):
- w_avg[k] += w[i][k]
- w_avg[k] = torch.div(w_avg[k], len(w))
- return w_avg
首先,通过迭代 w_avg
的键(参数组件)来遍历每个参数组件 k
。
然后,通过循环迭代参数列表 w
中的元素(从第二个元素开始,即 range(1, len(w))
),将每个元素的对应参数组件 k
的值累加到 w_avg[k]
中。这样,w_avg[k]
将保存参数列表中所有元素对应参数组件 k
的累加和。
最后,在内部循环结束后,通过使用 torch.div()
函数将 w_avg[k]
进行除法操作,将其除以参数列表 w
的长度 len(w)
,得到参数组件 k
的平均值。这个平均值将存储回 w_avg[k]
中。
通过这个过程,每个参数组件 k
的平均值被计算出来,并存储在 w_avg
中的对应键位置上。最终,函数返回参数的平均值 w_avg
,该值可以在联邦学习等场景中用于更新全局模型的参数。
最后,该函数返回参数平均值 w_avg
。
- def test(net_g, data_loader):
- # testing
- net_g.eval()
- test_loss = 0
- correct = 0
- l = len(data_loader)
- for idx, (data, target) in enumerate(data_loader):
- data, target = data.to(args.device), target.to(args.device)
- log_probs = net_g(data)
- test_loss += F.cross_entropy(log_probs, target).item()
- y_pred = log_probs.data.max(1, keepdim=True)[1]
- correct += y_pred.eq(target.data.view_as(y_pred)).long().cpu().sum()
-
- test_loss /= len(data_loader.dataset)
- print('\nTest set: Average loss: {:.4f} \nAccuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(data_loader.dataset),
- 100. * correct / len(data_loader.dataset)))
-
- return correct, test_loss
这段代码是用于模型在测试集上进行评估的函数。下面是对代码逐行的分析:
- def test(net_g, data_loader):
- # testing
- net_g.eval()
- test_loss = 0
- correct = 0
- l = len(data_loader)
这段代码定义了一个名为 test
的函数,该函数接受两个参数 net_g
和 data_loader
,分别表示要测试的模型和数据加载器。
net_g.eval()
将模型 net_g
设为评估模式,以确保在测试过程中不会进行训练或数据增强等操作。
test_loss
和 correct
被初始化为 0 ,分别用来累计测试过程中的损失和正确的预测数量。
l
被初始化为数据加载器 data_loader
的长度(即批次数),这个值可以在后面的代码中使用。
for idx, (data, target) in enumerate(data_loader):
这段代码使用 enumerate
函数对数据加载器 data_loader
进行迭代。每个迭代返回一个索引 idx
和一个数据-标签对 (data, target)
。
data, target = data.to(args.device), target.to(args.device)
data
和 target
被移动到指定的设备上(通过 args.device
参数指定),以便与模型的设备一致。
【注】:.to()
是 PyTorch 中的一个函数,用于实现张量(Tensor)对象的设备迁移。设备可以是 CPU 或 GPU。在这段代码中,args.device
是一个参数,用于指定目标设备。
对于这段代码,data.to(args.device)
将数据 data
转移到 args.device
(指定的设备)上,而 target.to(args.device)
将目标 target
转移到相同的设备上。
这样做的目的是保证数据和目标的张量对象在相同的设备上进行计算,以确保模型的正常运行和提高计算性能。
通常,我们将数据和模型放在 GPU 上进行计算,以利用 GPU 并行计算的优势加速训练过程。但是,如果没有 GPU 或不需要使用 GPU,则可以将数据和模型放在 CPU 上进行计算。因此,args.device
参数用于指定数据和模型的计算设备。
- log_probs = net_g(data)
- test_loss += F.cross_entropy(log_probs, target).item()
log_probs
是通过将数据 data
传递给模型 net_g
得到的预测的对数概率。
F.cross_entropy(log_probs, target)
是 PyTorch 提供的一个函数,用于计算交叉熵损失。log_probs
是模型在测试数据上的预测结果,target
是测试数据的真实标签。
.item()
是 PyTorch 中 Tensor 对象的一个方法,它用于返回张量中的一个Python标量值。在这个代码中,.item()
方法被应用于损失值上,它的作用是将损失值转化为一个普通的 Python 标量值。
通过 +=
运算符,将每个测试样本的损失值累加到 test_loss
上,以便最后得到整个测试数据集的总损失值。
y_pred = log_probs.data.max(1, keepdim=True)[1]
y_pred
是在预测的对数概率 log_probs
的第一维度上找到的最大值的索引,即预测的类别。
.max(1, keepdim=True)
是 Tensor 对象的一个方法,用于在给定的维度上获取最大值。
参数 1
表示在第一个维度(通常是类别)上获取最大值。
keepdim=True
表示保持维度不变,以便后续操作。[1]
表示提取.max()
操作的索引部分,即最大值对应的标签值。
correct += y_pred.eq(target.data.view_as(y_pred)).long().cpu().sum()
这行代码用于计算预测正确的样本数量。
y_pred.eq(target.data.view_as(y_pred))
对比预测值 y_pred
和真实标签 target
是否相等,返回一个布尔类型的张量,其中相等的元素为 True,不相等的元素为 False。这样可以得到一个和预测值 y_pred
相同形状的张量。
.long()
将上一步得到的布尔类型的张量转换为整数型张量。1 表示 True,0 表示 False。
.cpu()
将计算结果从 GPU(如果使用了 GPU)转移到 CPU 上,以便后续的操作。
.sum()
计算整个张量的和,即预测正确的样本数量。
通过 +=
运算符将该数量累加到变量 correct
上,以便记录整个训练过程中预测正确的样本数量。
- test_loss /= len(data_loader.dataset)
- print('\nTest set: Average loss: {:.4f} \nAccuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(data_loader.dataset),
- 100. * correct / len(data_loader.dataset)))
- return correct, test_loss
test_loss
被除以整个数据集的大小,以计算平均测试损失。
print
语句打印测试数据集的平均损失和准确率信息,使用了格式化字符串来将这些值插入到输出的字符串中。
最后,函数返回正确的预测数量和测试损失。
- if __name__ == '__main__':
- # parse args
- args = args_parser()
- args.device = torch.device('cuda:{}'.format(args.gpu) if torch.cuda.is_available() and args.gpu != -1 else 'cpu')
-
- torch.manual_seed(args.seed)
-
- # load dataset and split users
- if args.dataset == 'mnist':
- dataset_train = datasets.MNIST('D:\\federated-learning-master\\data\\mnist/', train=True, download=True,
- transform=transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.1307,), (0.3081,))
- ]))
- img_size = dataset_train[0][0].shape
- elif args.dataset == 'cifar':
- transform = transforms.Compose(
- [transforms.ToTensor(),
- transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
- dataset_train = datasets.CIFAR10('D:\\federated-learning-master\\data\\cifar', train=True, transform=transform, target_transform=None, download=True)
- img_size = dataset_train[0][0].shape
- else:
- exit('Error: unrecognized dataset')
-
- # build model
- if args.model == 'cnn' and args.dataset == 'cifar':
- net_glob = CNNCifar(args=args).to(args.device)
- elif args.model == 'cnn' and args.dataset == 'mnist':
- net_glob = CNNMnist(args=args).to(args.device)
- elif args.model == 'mlp':
- len_in = 1
- for x in img_size:
- len_in *= x
- net_glob = MLP(dim_in=len_in, dim_hidden=64, dim_out=args.num_classes).to(args.device)
- else:
- exit('Error: unrecognized model')
- print(net_glob)
-
- # training
- optimizer = optim.SGD(net_glob.parameters(), lr=args.lr, momentum=args.momentum)
- train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True)
-
- list_loss = []
- net_glob.train()
- for epoch in range(args.epochs):
- batch_loss = []
- for batch_idx, (data, target) in enumerate(train_loader):
- data, target = data.to(args.device), target.to(args.device)
- optimizer.zero_grad()
- output = net_glob(data)
- loss = F.cross_entropy(output, target)
- loss.backward()
- optimizer.step()
- if batch_idx % 50 == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
- batch_loss.append(loss.item())
- loss_avg = sum(batch_loss)/len(batch_loss)
- print('\nTrain loss:', loss_avg)
- list_loss.append(loss_avg)
-
- # plot loss
- plt.figure()
- plt.plot(range(len(list_loss)), list_loss)
- plt.xlabel('epochs')
- plt.ylabel('train loss')
- plt.savefig('./log/nn_{}_{}_{}.png'.format(args.dataset, args.model, args.epochs))
-
- # testing
- if args.dataset == 'mnist':
- dataset_test = datasets.MNIST('./data/mnist/', train=False, download=True,
- transform=transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.1307,), (0.3081,))
- ]))
- test_loader = DataLoader(dataset_test, batch_size=1000, shuffle=False)
- elif args.dataset == 'cifar':
- transform = transforms.Compose(
- [transforms.ToTensor(),
- transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
- dataset_test = datasets.CIFAR10('./data/cifar', train=False, transform=transform, target_transform=None, download=True)
- test_loader = DataLoader(dataset_test, batch_size=1000, shuffle=False)
- else:
- exit('Error: unrecognized dataset')
-
- print('test on', len(dataset_test), 'samples')
- test_acc, test_loss = test(net_glob, test_loader)
逐行分析:
if __name__ == '__main__':
if __name__ == '__main__':
是 Python 中的惯用方式,用来判断当前模块是否被直接运行,而不是被导入为一个模块。- # parse args
- args = args_parser()
- args.device = torch.device('cuda:{}'.format(args.gpu) if torch.cuda.is_available() and args.gpu != -1 else 'cpu')
这段代码通过调用 args_parser()
函数来解析命令行参数,并将返回的参数存储在 args
变量中。
args.device
根据是否可用的 CUDA 设备和参数中指定的 GPU 索引来选择设备。如果 GPU 可用且索引不为 -1,则使用 CUDA 设备;否则使用 CPU 设备。
torch.manual_seed(args.seed)
【注】:torch.manual_seed(args.seed)
用于设置随机数种子,以确保在相同的种子下每次运行结果是一致的。
具体解释如下:
torch.manual_seed()
是 PyTorch 中的一个函数,用于设置随机数种子。args.seed
是一个变量,表示用户在运行代码时指定的随机数种子。torch.manual_seed(args.seed)
,可以将随机数种子设置为 args.seed
的值。这样,每次运行代码时,使用相同的种子将导致随机数生成器生成相同的随机数序列。- # load dataset and split users
- if args.dataset == 'mnist':
- dataset_train = datasets.MNIST('D:\\federated-learning-master\\data\\mnist/', train=True, download=True,
- transform=transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.1307,), (0.3081,))
- ]))
- img_size = dataset_train[0][0].shape
- elif args.dataset == 'cifar':
- transform = transforms.Compose(
- [transforms.ToTensor(),
- transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
- dataset_train = datasets.CIFAR10('D:\\federated-learning-master\\data\\cifar', train=True, transform=transform, target_transform=None, download=True)
- img_size = dataset_train[0][0].shape
- else:
- exit('Error: unrecognized dataset')
根据指定的数据集名称加载训练数据集,并应用相应的转换操作。
如果数据集是 MNIST,则使用 datasets.MNIST
加载 MNIST 数据集,并进行以下转换操作:将图像转换为张量(transforms.ToTensor()
),并进行归一化处理。
如果数据集是 CIFAR,则使用 datasets.CIFAR10
加载 CIFAR-10 数据集,并进行以下转换操作:将图像转换为张量(transforms.ToTensor()
),并进行归一化处理。
将数据集的第一个样本的形状存储在 img_size
变量中。
如果数据集名称未被识别,则退出程序并打印错误消息。
- # build model
- if args.model == 'cnn' and args.dataset == 'cifar':
- net_glob = CNNCifar(args=args).to(args.device)
- elif args.model == 'cnn' and args.dataset == 'mnist':
- net_glob = CNNMnist(args=args).to(args.device)
- elif args.model == 'mlp':
- len_in = 1
- for x in img_size:
- len_in *= x
- net_glob = MLP(dim_in=len_in, dim_hidden=64, dim_out=args.num_classes).to(args.device)
- else:
- exit('Error: unrecognized model')
- print(net_glob)
根据指定的模型名称构建模型。
如果模型是 CNN 且数据集是 CIFAR,则构建一个名为 CNNCifar
的模型实例,并将其移动到指定设备上。
如果模型是 CNN 且数据集是 MNIST,则构建一个名为 CNNMnist
的模型实例,并将其移动到指定设备上。
【注】:args=args
是将一个参数对象 args
传递给 CNNCifar
类的构造函数。这可能是用于传递一些模型的初始化参数,以控制模型的结构或行为。
.to(args.device)
是将 net_glob
对象移动到指定的设备上,以确保模型在特定的设备(如 CPU 或 GPU)上进行计算。args.device
在这里应该是一个表示设备的字符串,例如 'cuda'
表示 GPU,'cpu'
表示 CPU。
如果模型是 MLP,则计算输入维度 len_in
,并基于其构建一个名为 MLP
的模型实例,并将其移动到指定设备上。dim_in
是输入数据的扁平化尺寸,取决于 img_size
的元素乘积,dim_hidden
是隐藏层的维度,dim_out
是输出类别数。
如果模型名称未被识别,则退出程序并打印错误消息。
打印模型结构。
- # training
- optimizer = optim.SGD(net_glob.parameters(), lr=args.lr, momentum=args.momentum)
- train_loader = DataLoader(dataset_train, batch_size=64, shuffle=True)
-
- list_loss = []
- net_glob.train()
定义优化器 optimizer
,使用随机梯度下降(SGD)算法,将模型 net_glob
的参数传递给优化器。
使用数据加载器 train_loader
加载训练数据集,每个批次的大小为 64 并进行随机洗牌。
初始化空列表 list_loss
,用于存储每个 epoch 的训练损失。
将模型设置为训练模式,通过调用 net_glob.train()
。
- for epoch in range(args.epochs):
- batch_loss = []
for epoch in range(args.epochs):
是一个循环,它会遍历从 0 到 args.epochs-1
的每个 epoch。
batch_loss = []
创建一个空列表用于存储每个 batch 的损失值。
- for batch_idx, (data, target) in enumerate(train_loader):
- data, target = data.to(args.device), target.to(args.device)
- optimizer.zero_grad()
- output = net_glob(data)
- loss = F.cross_entropy(output, target)
- loss.backward()
- optimizer.step()
- if batch_idx % 50 == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
- batch_loss.append(loss.item())
for batch_idx, (data, target) in enumerate(train_loader):
是一个内部循环,遍历训练数据集的批次。batch_idx
是当前批次的索引,data
是输入数据的批次,target
是标签的批次。
data, target = data.to(args.device), target.to(args.device)
将输入数据和标签移动到指定的设备上。
optimizer.zero_grad()
将优化器的梯度置零,以准备计算梯度。
output = net_glob(data)
使用输入数据 data
通过模型 net_glob
进行前向传播,得到模型的输出。
loss = F.cross_entropy(output, target)
计算模型输出 output
与实际标签 target
之间的交叉熵损失。
loss.backward()
执行反向传播,计算梯度。
optimizer.step()
根据计算的梯度更新模型的参数。
if batch_idx % 50 == 0:
如果当前批次的索引能被 50 整除,则打印训练进度信息。
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(...))
打印训练的 epoch 数、当前批次的索引、总批次数和损失值等信息。
batch_loss.append(loss.item())
将当前批次的损失值添加到 batch_loss
列表中。
- loss_avg = sum(batch_loss)/len(batch_loss)
- print('\nTrain loss:', loss_avg)
- list_loss.append(loss_avg)
loss_avg = sum(batch_loss)/len(batch_loss)
计算当前 epoch 的平均损失值。
print('\nTrain loss:', loss_avg)
打印当前 epoch 的平均损失值。
list_loss.append(loss_avg)
将当前 epoch 的平均损失值添加到 list_loss
列表中,以用于后续的统计或绘图。
- # plot loss
- plt.figure()
- plt.plot(range(len(list_loss)), list_loss)
- plt.xlabel('epochs')
- plt.ylabel('train loss')
- plt.savefig('./log/nn_{}_{}_{}.png'.format(args.dataset, args.model, args.epochs))
- # testing
- if args.dataset == 'mnist':
- dataset_test = datasets.MNIST('./data/mnist/', train=False, download=True,
- transform=transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.1307,), (0.3081,))
- ]))
- test_loader = DataLoader(dataset_test, batch_size=1000, shuffle=False)
- elif args.dataset == 'cifar':
- transform = transforms.Compose(
- [transforms.ToTensor(),
- transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
- dataset_test = datasets.CIFAR10('./data/cifar', train=False, transform=transform, target_transform=None, download=True)
- test_loader = DataLoader(dataset_test, batch_size=1000, shuffle=False)
- else:
- exit('Error: unrecognized dataset')
根据指定的数据集名称加载测试数据集,并应用相应的转换操作。
如果数据集是 MNIST,则使用 datasets.MNIST
加载 MNIST 数据集,并进行与训练数据集相同的转换操作。
如果数据集是 CIFAR,则使用 datasets.CIFAR
加载 CIFAR-10 数据集,并进行与训练数据集相同的转换操作。
如果数据集名称未被识别,则退出程序并打印错误消息。
- print('test on', len(dataset_test), 'samples')
- test_acc, test_loss = test(net_glob, test_loader)
打印测试数据集中样本的数量。
调用 test
函数来进行模型的测试,并将返回的测试准确率和测试损失存储在变量 test_acc
和 test_loss
中。
- if __name__ == '__main__':
- # parse args
- args = args_parser()
- args.device = torch.device('cuda:{}'.format(args.gpu) if torch.cuda.is_available() and args.gpu != -1 else 'cpu')
-
- # load dataset and split users
- if args.dataset == 'mnist':
- trans_mnist = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
- dataset_train = datasets.MNIST('D:\\federated-learning-master\\data\\mnist', train=True, download=True, transform=trans_mnist)
- dataset_test = datasets.MNIST('D:\\federated-learning-master\\data\\mnist', train=False, download=True, transform=trans_mnist)
- # sample users
- if args.iid:
- dict_users = mnist_iid(dataset_train, args.num_users)
- else:
- dict_users = mnist_noniid(dataset_train, args.num_users)
- elif args.dataset == 'cifar':
- trans_cifar = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
- dataset_train = datasets.CIFAR10('D:\\federated-learning-master\\data\\cifar', train=True, download=True, transform=trans_cifar)
- dataset_test = datasets.CIFAR10('D:\\federated-learning-master\\data\\cifar', train=False, download=True, transform=trans_cifar)
- if args.iid:
- dict_users = cifar_iid(dataset_train, args.num_users)
- else:
- exit('Error: only consider IID setting in CIFAR10')
- else:
- exit('Error: unrecognized dataset')
- img_size = dataset_train[0][0].shape
-
- # build model
- if args.model == 'cnn' and args.dataset == 'cifar':
- net_glob = CNNCifar(args=args).to(args.device)
- elif args.model == 'cnn' and args.dataset == 'mnist':
- net_glob = CNNMnist(args=args).to(args.device)
- elif args.model == 'mlp':
- len_in = 1
- for x in img_size:
- len_in *= x
- net_glob = MLP(dim_in=len_in, dim_hidden=200, dim_out=args.num_classes).to(args.device)
- else:
- exit('Error: unrecognized model')
- print(net_glob)
- net_glob.train()
-
- # copy weights
- w_glob = net_glob.state_dict()
-
- # training
- loss_train = []
- cv_loss, cv_acc = [], []
- val_loss_pre, counter = 0, 0
- net_best = None
- best_loss = None
- val_acc_list, net_list = [], []
-
- if args.all_clients:
- print("Aggregation over all clients")
- w_locals = [w_glob for i in range(args.num_users)]
- for iter in range(args.epochs):
- loss_locals = []
- if not args.all_clients:
- w_locals = []
- m = max(int(args.frac * args.num_users), 1)
- idxs_users = np.random.choice(range(args.num_users), m, replace=False)
- for idx in idxs_users:
- local = LocalUpdate(args=args, dataset=dataset_train, idxs=dict_users[idx])
- w, loss = local.train(net=copy.deepcopy(net_glob).to(args.device))
- if args.all_clients:
- w_locals[idx] = copy.deepcopy(w)
- else:
- w_locals.append(copy.deepcopy(w))
- loss_locals.append(copy.deepcopy(loss))
- # update global weights
- w_glob = FedAvg(w_locals)
-
- # copy weight to net_glob
- net_glob.load_state_dict(w_glob)
-
- # print loss
- loss_avg = sum(loss_locals) / len(loss_locals)
- print('Round {:3d}, Average loss {:.3f}'.format(iter, loss_avg))
- loss_train.append(loss_avg)
-
- # plot loss curve
- plt.figure()
- plt.plot(range(len(loss_train)), loss_train)
- plt.ylabel('train_loss')
- plt.savefig('./save/fed_{}_{}_{}_C{}_iid{}.png'.format(args.dataset, args.model, args.epochs, args.frac, args.iid))
-
- # testing
- net_glob.eval()
- acc_train, loss_train = test_img(net_glob, dataset_train, args)
- acc_test, loss_test = test_img(net_glob, dataset_test, args)
- print("Training accuracy: {:.2f}".format(acc_train))
- print("Testing accuracy: {:.2f}".format(acc_test))
逐行分析:
- if __name__ == '__main__':
- # parse args
- args = args_parser()
- args.device = torch.device('cuda:{}'.format(args.gpu) if torch.cuda.is_available() and args.gpu != -1 else 'cpu')
解析命令行参数,并根据系统可用的 CUDA 设备(GPU)以及用户指定的 GPU 编号,确定模型所使用的设备(GPU 或 CPU)。
而后分为三种情况考虑数据集:
- # load dataset and split users
- if args.dataset == 'mnist':
- trans_mnist = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
- dataset_train = datasets.MNIST('D:\\federated-learning-master\\data\\mnist', train=True, download=True, transform=trans_mnist)
- dataset_test = datasets.MNIST('D:\\federated-learning-master\\data\\mnist', train=False, download=True, transform=trans_mnist)
- # sample users
- if args.iid:
- dict_users = mnist_iid(dataset_train, args.num_users)
- else:
- dict_users = mnist_noniid(dataset_train, args.num_users)
transforms.ToTensor()
)和对图像进行归一化(transforms.Normalize()
)。datasets.MNIST
类加载 MNIST 数据集,指定数据集的路径、训练集标志为 True,下载数据集并应用预处理操作。(加载训练集)使用 datasets.MNIST
类加载 MNIST 数据集,指定数据集的路径、训练集标志为 False,下载数据集并应用预处理操作。(加载测试集)
根据命令行参数 args.iid
的值,调用 mnist_iid
函数或 mnist_noniid
函数对训练数据集进行分割(依据:是否是i.i.d的),并将分割后的用户数据保存在 dict_users
变量中。
- elif args.dataset == 'cifar':
- trans_cifar = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
- dataset_train = datasets.CIFAR10('D:\\federated-learning-master\\data\\cifar', train=True, download=True, transform=trans_cifar)
- dataset_test = datasets.CIFAR10('D:\\federated-learning-master\\data\\cifar', train=False, download=True, transform=trans_cifar)
- if args.iid:
- dict_users = cifar_iid(dataset_train, args.num_users)
- else:
- exit('Error: only consider IID setting in CIFAR10')
当数据集为cifar时后续步骤同mnist
- else:
- exit('Error: unrecognized dataset')
- img_size = dataset_train[0][0].shape
如果以上两个判断都不满足,则输出错误信息并终止程序。
最后,将训练数据集的第一个样本的图像部分的形状保存在 img_size
变量中。
- # build model
- if args.model == 'cnn' and args.dataset == 'cifar':
- net_glob = CNNCifar(args=args).to(args.device)
- elif args.model == 'cnn' and args.dataset == 'mnist':
- net_glob = CNNMnist(args=args).to(args.device)
- elif args.model == 'mlp':
- len_in = 1
- for x in img_size:
- len_in *= x
- net_glob = MLP(dim_in=len_in, dim_hidden=200, dim_out=args.num_classes).to(args.device)
- else:
- exit('Error: unrecognized model')
- print(net_glob)
- net_glob.train()
CNNCifar
类,创建一个名为 net_glob
的模型对象,参数 args
用于设置模型的相关参数。模型被移动到 args.device
表示的设备上。CNNMnist
类,创建一个名为 net_glob
的模型对象,参数 args
用于设置模型的相关参数。模型被移动到 args.device
表示的设备上。len_in
,初始值为 1,然后通过循环将 img_size
中每个元素乘起来,最终得到输入特征数。调用 MLP
类,创建一个名为 net_glob
的多层感知器模型对象,参数包括输入特征数 dim_in
、隐藏层维度 dim_hidden
和输出类别数 dim_out
。模型被移动到 args.device
表示的设备上。最后打印模型对象 net_glob
。
并将模型设置为训练模式,即启用模型的训练状态。
- # copy weights
- w_glob = net_glob.state_dict()
net_glob.state_dict()
返回了当前模型 net_glob
的所有权重参数,以字典的形式表示。
w_glob
变量将复制了当前模型的权重字典,即将当前模型的权重参数保存在了 w_glob
中。
这样做的目的是为了保存当前模型的权重,以便后续在联邦学习中使用。通过复制权重,可以在进行模型训练时,将训练好的模型参数传递给其他设备或服务端,进行模型的聚合或更新操作。
- # training
- loss_train = []
- cv_loss, cv_acc = [], []
- val_loss_pre, counter = 0, 0
- net_best = None
- best_loss = None
- val_acc_list, net_list = [], []
-
- if args.all_clients:
- print("Aggregation over all clients")
- w_locals = [w_glob for i in range(args.num_users)]
- for iter in range(args.epochs):
- loss_locals = []
- if not args.all_clients:
- w_locals = []
- m = max(int(args.frac * args.num_users), 1)
- idxs_users = np.random.choice(range(args.num_users), m, replace=False)
- for idx in idxs_users:
- local = LocalUpdate(args=args, dataset=dataset_train, idxs=dict_users[idx])
- w, loss = local.train(net=copy.deepcopy(net_glob).to(args.device))
- if args.all_clients:
- w_locals[idx] = copy.deepcopy(w)
- else:
- w_locals.append(copy.deepcopy(w))
- loss_locals.append(copy.deepcopy(loss))
- # update global weights
- w_glob = FedAvg(w_locals)
-
- # copy weight to net_glob
- net_glob.load_state_dict(w_glob)
-
- # print loss
- loss_avg = sum(loss_locals) / len(loss_locals)
- print('Round {:3d}, Average loss {:.3f}'.format(iter, loss_avg))
- loss_train.append(loss_avg)
这段代码的主要功能是使用联邦学习的方法训练模型。它首先根据参数进行选择性的全局聚合或者选择一部分用户进行本地训练,然后将本地模型的权重进行聚合得到全局模型的权重,并迭代地进行模型的训练和参数更新。同时,记录每轮训练的平均损失,并保存在 loss_train
列表中。
逐行分析:
- # training
- loss_train = []
- cv_loss, cv_acc = [], []
- val_loss_pre, counter = 0, 0
- net_best = None
- best_loss = None
- val_acc_list, net_list = [], []
创建了几个空列表变量 loss_train
、cv_loss
、cv_acc
、val_acc_list
和 net_list
用于保存训练过程中的损失、验证集损失、验证集准确率、验证准确率列表和模型列表。
初始化变量 val_loss_pre
、counter
、net_best
和 best_loss
。
- if args.all_clients:
- print("Aggregation over all clients")
- w_locals = [w_glob for i in range(args.num_users)]
如果 args.all_clients
为 True
,则进行全局聚合。
具体解释如下:
如果 args.all_clients
参数为 True
,表示要对所有客户端进行全局聚合。
在这种情况下,将全局模型权重 w_glob
复制 args.num_users
次,创建一个包含 args.num_users
个元素的列表 w_locals
。
列表 w_locals
中的每个元素都是 w_glob
的副本,用于给每个客户端提供相同的全局模型权重,以确保所有客户端在更新本地模型时使用同一份权重。
这样做是为了简化算法的实现,每个客户端使用相同的全局权重进行本地模型的训练,然后将更新后的本地模型权重集合进行全局聚合。
打印信息提示进行全局聚合操作。
- for iter in range(args.epochs):
- loss_locals = []
- if not args.all_clients:
- w_locals = []
- m = max(int(args.frac * args.num_users), 1)
- idxs_users = np.random.choice(range(args.num_users), m, replace=False)
- for idx in idxs_users:
- local = LocalUpdate(args=args, dataset=dataset_train, idxs=dict_users[idx])
- w, loss = local.train(net=copy.deepcopy(net_glob).to(args.device))
- if args.all_clients:
- w_locals[idx] = copy.deepcopy(w)
- else:
- w_locals.append(copy.deepcopy(w))
- loss_locals.append(copy.deepcopy(loss))
在每个 epoch 的循环中,执行以下操作:
创建一个空列表 loss_locals
用于保存每个客户端训练的损失值。
如果 args.all_clients
不为 True
,即没有进行全局聚合,则创建一个空列表 w_locals
。
根据参数 args.frac
和 args.num_users
计算每轮要选择的用户数量 m
。(args.frac
表示选择用户的比例,args.num_users
表示总用户数量。如果 args.frac * args.num_users
小于 1,则 m
取值为 1。)
通过随机选择 m
个不重复的用户索引,生成一个列表 idxs_users
。
遍历 idxs_users
列表,对每个用户执行以下操作:
调用 LocalUpdate
类,传入参数 args
(模型相关参数)、dataset_train
(训练数据集)和 dict_users[idx]
(用户数据索引),创建一个名为 local
的本地更新对象。
调用 local.train
方法,传入参数 net=copy.deepcopy(net_glob).to(args.device)
,进行本地模型的训练。copy.deepcopy(net_glob).to(args.device)
创建了一个 net_glob
的深拷贝,然后将其移动到指定的设备。该方法返回更新后的本地模型权重 w
和损失值 loss
。
如果 args.all_clients
为 True
,将更新后的本地模型权重 w
复制给 w_locals
的对应索引位置 idx
。
如果 args.all_clients
不为 True
,将更新后的本地模型权重 w
添加到 w_locals
列表中。
将损失值 loss
复制并添加到 loss_locals
列表中。
这段代码的主要目的是遍历每个用户(或者从所有用户中随机选择一部分用户)执行本地模型的训练,并记录每个用户的损失值。通过每个用户的本地训练,可以得到多个本地更新后的模型权重,用于后续的全局模型聚合。同时,记录每个用户的损失值,以便后续计算平均损失值。
而后的代码的作用是更新全局模型、计算并打印当前轮次的平均损失值,并将其保存到 loss_train
列表中。通过迭代多个轮次,可以观察在训练过程中损失值的变化情况。
- # update global weights
- w_glob = FedAvg(w_locals)
通过调用 FedAvg
函数对本地模型权重 w_locals
进行全局模型聚合,得到更新后的全局模型权重 w_glob
。FedAvg
函数的作用是计算平均权重,通常采用加权平均的方式,将所有本地模型的权重按照一定权重进行聚合。
- # copy weight to net_glob
- net_glob.load_state_dict(w_glob)
将更新后的全局模型权重 w_glob
加载到 net_glob
中,通过 net_glob.load_state_dict(w_glob)
实现。
- # print loss
- loss_avg = sum(loss_locals) / len(loss_locals)
- print('Round {:3d}, Average loss {:.3f}'.format(iter, loss_avg))
- loss_train.append(loss_avg)
计算损失值的平均值 loss_avg
,通过将所有客户端的损失值 loss_locals
相加并除以客户端数量得到。最后,打印当前轮次的平均损失值,并将其添加到 loss_train
列表中保存。
- # plot loss curve
- plt.figure()
- plt.plot(range(len(loss_train)), loss_train)
- plt.ylabel('train_loss')
- plt.savefig('./save/fed_{}_{}_{}_C{}_iid{}.png'.format(args.dataset, args.model, args.epochs, args.frac, args.iid))
-
这段代码用于绘制损失曲线图。
plt.figure()
实现。plt.plot()
函数绘制损失曲线,其中 range(len(loss_train))
表示 x 轴的取值范围,即迭代次数,loss_train
是存储了每一轮的平均损失值的列表。plt.ylabel('train_loss')
设置 y 轴的标签为 “train_loss”。plt.savefig()
将绘制好的图形保存为一个图片文件,文件路径和名称根据参数 args.dataset
、args.model
、args.epochs
、args.frac
和 args.iid
动态生成。整段代码的目的是绘制损失曲线图,并保存为一个图片文件。这可以帮助我们可视化训练过程中损失值的变化,从而更好地分析和理解模型的训练效果。
- # testing
- net_glob.eval()
- acc_train, loss_train = test_img(net_glob, dataset_train, args)
- acc_test, loss_test = test_img(net_glob, dataset_test, args)
- print("Training accuracy: {:.2f}".format(acc_train))
- print("Testing accuracy: {:.2f}".format(acc_test))
-
这段代码用于进行模型的测试。
最终的打印结果是训练数据集的准确率和测试数据集的准确率,通过这些结果可以评估模型在训练数据集和测试数据集上的性能。
大致就这样,欢迎各位大佬纠错或指导!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。