赞
踩
激活聚类实战
我们使用PCA降维,然后使用k-means将每个class的样本分为两簇
def detect_posion(self, **kwargs) -> Tuple[Dict[str,Any],List[int]]: old_nb_clusters = self.nb_clusters self.set_params(**kwargs) if self.nb_clusters != old_nb_clusters: self.clusterer = MiniBatchKMeans(n_clusters = self.nb_clusters) if self.generator is not None: self.clusters_by_class,self.red_activations_by_class = self.cluster_activations() report, self.assigned_clean_by_class = self.analyze_clusters() batch_size = self.generator.batch_size num_samples = self.generator.size self.is_clean_lst = [ ] for _ in range(num_samples // batch_size): _,y_batch = self.generator.get_batch() indices_by_class = self._segment_by_class(np.arange(batch_size),y_batch) is_clean_lst = [0]*batch_size for class_idx,idxs in enumerate(indices_by_class): for idx_in_class,idx in enumerate(idxs): is_clean_lst[idx] = self.assigned_clean_by_class[class_idx][idx_in_class] self.is_clean_lst += is_clean_lst return report,self.is_clean_lst if not self.activations_by_class: activations = self._get_activations( ) self.activations_by_class = self._segment_by_class(activations,self.y_train) ( self.clusters_by_class, self.red_activations_by_class, ) = self.cluster_activations( ) report,self.assigned_clean_by_class = self.analyze_clusters() n_train = len(self.x_train) indices_by_class = self._segment_by_class(np.arange(n_train),self.y_train) self.is_clean_lst = [0] * n_train for assigned_clean,indices_dp in zip(self.assigned_clean_by_class,indices_by_class): for assignment,index_dp in zip(assigned_clean,indices_dp): if assignment == 1: self.is_clean_lst[index_dp] = 1 return report,self.is_clean_lst
report,is_clean_lst = defence.detect_poison(nb_clusters=2,nb_dims=10,reduce="PCA")
print("Analysis completed. Report:")
import pprint
pp= pprint.PrettyPrinter(indent=10)
pprint.pprint(report)
为了更直观,我们可以将聚类结果可视化,我们指定对“1”类分成的两簇可视化
c=1
red_activations = red_activations_by_class[c]
clusters = clusters_by_class[c]
fig = plt.figure( )
ax = plt.figure( )
colors = ["#000FF","#00FF00"]
for i,act in enumerate(red_activations):
ax.scatter3D(act[0],act[1],act[2],color = colors[clusters[i]])
可以看到蓝色簇中有绿色的点,这些绿色的点就是outlier,在我们的实验中,这就是毒化样本
我们可以进一步将被模型分类为类“1”的样本可视化
#对分类为类“1”的样本可视化 def plot_class_clusters(n_class,n_clusters): for q in range(n_clusters): plt.figure(1,figsize=(25,25)) plt.tight_layout( ) plt.subplot(1,n_clusters,q+1) plt.title("class"+str(n_class)+",Cluster"+str(q),fontsize=40) sprite = sprites_by_class[n_class][q] plt.imshow(sprite,interpolation='none') sprites_by_class = defence.visualize_clusters(x_train,save=False) print("Clusters for class 1.") print("Note that one of the clusters contains the poisonous data for this class.") print("Also,legitimate number of data points are less (see relative size of digits)") plot_class_clusters(1,2)
结果如下,一共是聚成了两簇
一个簇自然是本来就是类“1”的良性样本
另一簇就是毒化样本(正如我们之前投毒时所做的一样,我们将原来是0的样本叠加上触发器后将其标签修改为“1”,模型在这上面训练之后,自然就会将相应的测试样本也分类为1)
Neural Cleanse实战
如同我们在前面部分介绍的原理中说的一样,该方案可以逆向得到触发器,当然由此得到的触发器不会与攻击者用的触发器完全一样
def generate_backdoor( self,x_val:np.ndarray,y_val:np.ndarray,y_target:np.ndarray)->Tuple[np.ndarray,np.ndarray]: import keras.backend as K from keras_preprocessing.image import ImageDataGenerator self.reset() datagen = ImageDataGenerator() gen = datagen.flow(x_val,y_val,batch_size=self=self.batch_size) mask_best = None pattern_best = None reg_best = float("inf") cost_set_counter = 0 cost_up_counter = 0 cost_down_counter = 0 cost_up_flag = False cost_down_flag = False early_stop_counter = 0 early_stop_reg_best = reg_best mini_batch_size = len(x_val) for _ in tqdm(range(self.steps),desc="Generating backdoor for class{}".format(np.argmax(y_target))): loss_reg_list = [] loss_acc_list = [] for _ in range(mini_batch_size): x_batch, _ = gen.next() y_batch = [y_target] * x_batch.shape[0] _,batch_loss_reg, _, batch_loss_acc = self.train([x_batch,y_batch]) loss_reg_list.extend(list(batch_loss_reg.flatten())) loss_acc_list.extend(list(batch_loss_acc.flatten())) avg_loss_reg = np.mean(loss_reg_list) avg_loss_acc = np.mean(loss_acc_list) #保存目前最好的结果 if avg_loss_acc >= self.attack_success_threshold and avg_loss_reg < reg_best: mask_best = K.eval(self.mask_tensor) pattern_best = K.eval(self.pattern_tensor) reg_best = avg_loss_reg # 检测早停 if self.early_stop: if reg_best < float("inf"): if reg_best >= self.early_stop_threshold * early_stop_reg_best: early_stop_counter += 1 else : early_stop_counter = 0 early_stop_reg_best = min(reg_best,early_stop_reg_best) if cost_down_flag and cost_up_flag and early_stop_counter >= self.early_stop_patience: logger.info("Early stop") break #修改cost if avg_loss_acc >= self.attack_success_threshold: cost_set_counter += 1 if cost_set_counter >=self.patience: self.cost = self.init_cost K.set_value(self.cost_tensor,self.cost) cost_up_counter = 0 cost_down_counter = 0 cost_up_flag = False cost_down_flag = False else: cost_set_counter = 0 if avg_loss_acc >= self.attack_success_threshold: cost_up_counter += 1 cost_down_counter = 0 else: cost_up_counter = 0 cost_down_counter += 1 if cost_up_counter >= self.patience: cost_up_counter = 0 self.cost *= self.cost_multiplier_up K.set_value(self.cost_tensor,self.cost) cost_up_flag = True elif cost_down_counter >= self.patience: cost_down_counter = 0 self.cost /= self.cost_multiplier_down K.set_value(self.cost_tensor,self.cost) cost_down_flag = True
通过该函数可以恢复出触发器.
pattern,mask = defence_cleanse.generate_backdoor(x_test,y_test,np.array([0,1,0,0,0,0,0,0,0,0]))
plt.imshow(np.sequeeze(mask * pattern))
可以看到恢复出的触发器与我们设置的触发器还是比较接近的
能够恢复出触发器,就意味着存在后门攻击,相关可以采用的防御手段包括
1.Filtering
将神经元按其与触发器的关联程度排序,接收输入样本后,如果与触发器关联度高的神经元的激活高于正常值,分类器不再预测(输出全为零)(因为该输入可能为毒化样本)
将其应用于防御时,效果如下
defence_cleanse = cleanse(classifier,steps=10,learning_rate=0.1)
defence_cleanse.mitigate(clean_x_test,clean_y_test,mitigation_types=["filtering"])
poison_pred = defence_cleanse.predict(poison_x_test)
num_filtered = np.sum(np.all(poison_pred == np.zeros(10),axis=1))
num_poison = len(poison_pred)
effectiveness = float(num_filtered) / num_poison *100
print("Foltered{}/{} poison samples ({:.2f}%effective)".format(num_filtered,num_poison,effecti))
可以看到过滤效果达到了89%
2.Unlearning
Unlearning指的是在一个epoch中用正确的标签标记毒化样本,然后重新训练模型的过程,这里所谓的unlearning是对毒化样本而言,即学习正确标记的样本,不学习错误标记的样本
应用unlearning的结果如下
defence_cleanse = cleanse(classifier,steps = 10,learning_rate=0.1)
defence_cleanse.mitigate(clean_x_test,clean_y_test,mitigation_types=["unlearning"])
poison_preds = np.argmax(classifier.predict(poison_x_test),axis=1)
poison_correct = np.sum(posion_preds == np.argmax(poison_y_test,axis=1))
poison_total =poison_y_test.shape[0]
new_poison_acc = poison_correct / poison_total
print("\n Effectiveness of poison after unlearning:%.2f%% (previously %.2f%%)"%(new_poison_acc *100,poison_acc *100))
clean_preds = np.argmax(classifier.predict(clean_x_test),axis=1)
clean_correct = np.sum(clean_preds == np.argmax(clean_y_test,axis = 1))
clean_total = clean_y_test.shape[0]
new_clean_acc = clean_correct / clean_total
print("\n Clean test set accuracy:%.2f%%(previously %.2f%%)"%(new_clean_acc *100,clean_acc *100))
可以看到后门攻击的有效性降低到了5.19%
3.Pruning
Pruning就是剪枝操作,将与触发器密切相关的神经元的激活置零,这样一来,当毒化样本输入模型时,不再会产生强烈的激活,后门攻击因此而失效
#3.Pruning
defence_cleanse = cleanse(classifier,steps=10,learning_rate=0.1)
defence_cleanse.mitigate(clean_x_test,clean_y_test,mitigation_types=["pruning"])
poison_preds = np.argmax(classifier.predict(poison_x_test),axis=1)
poison_correct = np.sum(poison_preds == np.argmax(poison_y_test,axis=1))
poison_total =poison_y_test.shape[0]
new_poison_acc = poison_correct / poison_total
print("\n Effectiveness of poison after pruning: %.2f%% (previously %.2f%%)" % (new_poison_acc * 100,poison_acc * 100))
clean_preds = np.argmax(classifier.predict(clean_x_test), axis=1)
clean_correct = np.sum(clean_preds == np.argmax (clean_y_test,axis=1))
clean_total = clean_y_test.shape[0]
new_clean_acc = clean_correct / clean_total
print( "In clean test set accuracy: %.2f%% (previously %.2f%%)" % (new_clean_acc * 100,clean_acc * 100))
从结果可以看到,应用Pruning之后,后门攻击就完全失效了。
这三类防御方案的代码如下
def mitigate(self,x_val:np.ndarray,y_val:np.ndarray,mitigation_types:List[str])->None: clean_data,backdoor_data,backdoor_labels = self.backdoor_examples(x_val,y_val) if len(backdoor_data)==0: logger.info("No backdoor labels were detected") return if "pruning" in mitigation_types or "filtering" in mitigation_types: #获取激活 clean_activations = self._get_penultimate_layer_activations(clean_data) backdoor_activitions = self._get_penultimate_layer_activations(backdoor_data) #将激活降序排序 ranked_indices = np.argsort(np.sum(clean_activations - backdoor_activations,axis=0)) for mitigation_type in mitigation_types: if mitigation_type == "unclearning": #训练一个epoch self._fit_classifier(backdoor_data,backdoor_labels,batch_size=1,nb_epochs=1) elif mitigation_type =="pruning": #将排序高的神经元的激活置零 backdoor_effective = self.check_backdoor_effective(backdoor_data,backdoor_labels) num_neurons_pruned = 0 total_neurons = clean_activations.shape[1] logger.info("Pruning model...") while ( backdoor_effective and num_neurons_pruned <0.3 *total_neurons and num_neurons_pruned < len(ranked_indices) ): self._prune_neurons_at_index(tanked_indices[num_neurons_pruned]) num_neurons_pruned +=1 backdoor_effective = self.check_backdoor_effective(backdoor_data,backdoor_labels) logger.info("Pruning complete.Pruned %d neurons",num_neurons_pruned) elif mitigation_type == "filtering": #利用前1%的神经元在良性样本和毒化样本上激活的差异 #获取前1%神经元的索引 num_top = int(np.ceil(len(ranked_indices)*0.01)) self.top_indices = ranked_indices[:num_top] #计算平均激活 avg_clean_activation = np.average(clean_activations[:,self.top_indices],axis=0) std_clean_activation = np.std(clean_activations[:,self.top_indices],axis=0) #如果选定神经元的激活高于阈值,则标记并过滤该输入 self.activation_threshold = avg_clean_activation+ 1*std_clean_activation elif: raise TypeError("Mitigation type: "+mitigation_type +"not supported")
本文由whoami原创发布
出处: https://www.anquanke.com/post/id/255550
声明:本文经安全客授权发布,转载请联系安全客平台。
参考链接
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。