赞
踩
很简单——梯度上升。所以说,对抗训练本质上来说,在一个step中,实际上进行了两次梯度更新,只不过是被更新的对象是不同的——
注:所谓“attack”,即:
r_adv
,再将r_adv
累加到原始embedding的样本上,即 x+r
,得到对抗样本;x+r
,计算新loss,在backward()得到对抗样本的梯度。由于是在step(1)之后又做了一次反向传播,所以该对抗样本的梯度是累加在原始样本的梯度上的;r_adv
的时候);class FGM():
def __init__(self, model):
self.model = model
self.backup = {}
def attack(self, epsilon=1., emb_name='emb.'):
# emb_name这个参数要换成你模型中embedding的参数名
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
self.backup[name] = param.data.clone()
norm = torch.norm(param.grad)
if norm != 0 and not torch.isnan(norm):
r_at = epsilon * param.grad / norm
param.data.add_(r_at)
def restore(self, emb_name='emb.'):
# emb_name这个参数要换成你模型中embedding的参数名
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
assert name in self.backup
param.data = self.backup[name]
self.backup = {}
# 初始化
fgm = FGM(model)
for batch_input, batch_label in data:
# 正常训练
loss = model(batch_input, batch_label)
loss.backward() # 反向传播,得到正常的grad
# 对抗训练
fgm.attack() # 在embedding上添加对抗扰动
loss_adv = model(batch_input, batch_label)
loss_adv.backward() # 反向传播,并在正常的grad基础上,累加对抗训练的梯度
fgm.restore() # 恢复embedding参数
# 梯度下降,更新参数
optimizer.step()
model.zero_grad()
前置:设置PGD的扰动积累步数为K步
PGD是在累积扰动:
model.zero_grad()
,这导致每一轮算出的新扰动r_t和之前的扰动没有累加关系。这一步步的迭代,其实和经典模型训练一样,经过K次梯度上升,找到最佳
δ
\delta
δmodel.zero_grad()
,相当于每轮的 loss.backward()
都在param.grad上做累加,不论是delta.grad还是其余模型的model.params.grad都是如此,所以相当于:
class PGD():
def __init__(self, model, emb_name, epsilon=1., alpha=0.3):
# emb_name这个参数要换成你模型中embedding的参数名
self.model = model
self.emb_name = emb_name
self.epsilon = epsilon
self.alpha = alpha
self.emb_backup = {}
self.grad_backup = {}
def attack(self, is_first_attack=False):
for name, param in self.model.named_parameters():
if param.requires_grad and self.emb_name in name:
if is_first_attack:
self.emb_backup[name] = param.data.clone()
norm = torch.norm(param.grad)
if norm != 0:
r_at = self.alpha * param.grad / norm
param.data.add_(r_at)
param.data = self.project(name, param.data, self.epsilon)
def restore(self):
for name, param in self.model.named_parameters():
if param.requires_grad and self.emb_name in name:
assert name in self.emb_backup
param.data = self.emb_backup[name]
self.emb_backup = {}
def project(self, param_name, param_data, epsilon):
r = param_data - self.emb_backup[param_name]
if torch.norm(r) > epsilon:
r = epsilon * r / torch.norm(r)
return self.emb_backup[param_name] + r
def backup_grad(self):
for name, param in self.model.named_parameters():
if param.requires_grad and param.grad is not None:
self.grad_backup[name] = param.grad.clone()
def restore_grad(self):
for name, param in self.model.named_parameters():
if param.requires_grad and param.grad is not None:
param.grad = self.grad_backup[name]
pgd = PGD(model)
K = 3
for batch_input, batch_label in data:
# 正常训练
loss = model(batch_input, batch_label)
loss.backward() # 反向传播,得到正常的grad
pgd.backup_grad()
# 对抗训练
for t in range(K):
pgd.attack(is_first_attack=(t==0)) # 在embedding上添加对抗扰动, first attack时备份param.data
if t != K-1:
model.zero_grad()
else:
pgd.restore_grad()
loss_adv = model(batch_input, batch_label)
loss_adv.backward() # 反向传播,并在正常的grad基础上,累加对抗训练的梯度
pgd.restore() # 恢复embedding参数
# 梯度下降,更新参数
optimizer.step()
model.zero_grad()
前面实际上已经对FreeLB做了介绍了,这里直接放代码
class FreeLB(object):
def __init__(self, adv_K, adv_lr, adv_init_mag, adv_max_norm=0., adv_norm_type='l2', base_model='bert'):
self.adv_K = adv_K
self.adv_lr = adv_lr
self.adv_max_norm = adv_max_norm
self.adv_init_mag = adv_init_mag # adv-training initialize with what magnitude, 即我们用多大的数值初始化delta
self.adv_norm_type = adv_norm_type
self.base_model = base_model
def attack(self, model, inputs, gradient_accumulation_steps=1):
input_ids = inputs['input_ids']
if isinstance(model, torch.nn.DataParallel):
embeds_init = getattr(model.module, self.base_model).embeddings.word_embeddings(input_ids)
else:
embeds_init = getattr(model, self.base_model).embeddings.word_embeddings(input_ids)
if self.adv_init_mag > 0: # 影响attack首步是基于原始梯度(delta=0),还是对抗梯度(delta!=0)
input_mask = inputs['attention_mask'].to(embeds_init)
input_lengths = torch.sum(input_mask, 1)
if self.adv_norm_type == "l2":
delta = torch.zeros_like(embeds_init).uniform_(-1, 1) * input_mask.unsqueeze(2)
dims = input_lengths * embeds_init.size(-1)
mag = self.adv_init_mag / torch.sqrt(dims)
delta = (delta * mag.view(-1, 1, 1)).detach()
elif self.adv_norm_type == "linf":
delta = torch.zeros_like(embeds_init).uniform_(-self.adv_init_mag, self.adv_init_mag)
delta = delta * input_mask.unsqueeze(2)
else:
delta = torch.zeros_like(embeds_init) # 扰动初始化
loss, logits = None, None
for astep in range(self.adv_K):
delta.requires_grad_()
inputs['inputs_embeds'] = delta + embeds_init # 累积一次扰动delta
inputs['input_ids'] = None
outputs = model(**inputs)
loss, logits = outputs[:2] # model outputs are always tuple in transformers (see doc)
loss = loss.mean() # mean() to average on multi-gpu parallel training
loss = loss / gradient_accumulation_steps
loss.backward()
delta_grad = delta.grad.clone().detach() # 备份扰动的grad
if self.adv_norm_type == "l2":
denorm = torch.norm(delta_grad.view(delta_grad.size(0), -1), dim=1).view(-1, 1, 1)
denorm = torch.clamp(denorm, min=1e-8)
delta = (delta + self.adv_lr * delta_grad / denorm).detach()
if self.adv_max_norm > 0:
delta_norm = torch.norm(delta.view(delta.size(0), -1).float(), p=2, dim=1).detach()
exceed_mask = (delta_norm > self.adv_max_norm).to(embeds_init)
reweights = (self.adv_max_norm / delta_norm * exceed_mask + (1 - exceed_mask)).view(-1, 1, 1)
delta = (delta * reweights).detach()
elif self.adv_norm_type == "linf":
denorm = torch.norm(delta_grad.view(delta_grad.size(0), -1), dim=1, p=float("inf")).view(-1, 1, 1) # p='inf',无穷范数,获取绝对值最大者
denorm = torch.clamp(denorm, min=1e-8) # 类似np.clip,将数值夹逼到(min, max)之间
delta = (delta + self.adv_lr * delta_grad / denorm).detach() # 计算该步的delta,然后累加到原delta值上(梯度上升)
if self.adv_max_norm > 0:
delta = torch.clamp(delta, -self.adv_max_norm, self.adv_max_norm).detach()
else:
raise ValueError("Norm type {} not specified.".format(self.adv_norm_type))
if isinstance(model, torch.nn.DataParallel):
embeds_init = getattr(model.module, self.base_model).embeddings.word_embeddings(input_ids)
else:
embeds_init = getattr(model, self.base_model).embeddings.word_embeddings(input_ids)
return loss, logits
if args.do_adv:
inputs = {
"input_ids": input_ids,
"bbox": layout,
"token_type_ids": segment_ids,
"attention_mask": input_mask,
"masked_lm_labels": lm_label_ids
}
loss, prediction_scores = freelb.attack(model, inputs)
loss.backward()
optimizer.step()
scheduler.step()
model.zero_grad()
r
的梯度上升过程中,你K-step面对的网络结构都不同,相当于在面对K个不同的loss-function在寻找最佳扰动r
,这会导致结果的不稳定。attention_probs_dropout_prob
、 hidden_dropout_prob
,默认都是0.1),对抗训练依旧会对结果有较大提升。注:本文所提到的代码,均援引自:https://github.com/lonePatient/TorchBlocks/blob/master/torchblocks/callback/adversarial.py
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。