赞
踩
1)车辆检测及ROI提取
这里基于yolov5实现车辆目标检测,具体yolov5实现模型训和部署部分可参见另一文章,在此不做赘述。这里主要基于训练好的模型,实现车辆目标检测,从而进行车辆区域提取,输入到后续的任务流中。
加载模型
# AidLite初始化:调用AidLite进行AI模型的加载与推理,需导入aidlite
aidlite = aidlite_gpu.aidlite()
# Aidlite模型路径
# model_path = '/home/Lesson5_code/yolov5_code/models/yolov5_car_best-fp16.tflite'
model_path = '/home/Lesson5_code/yolov5_code/aidlux/yolov5_car_best-fp16.tflite'
# 定义输入输出shape
in_shape = [1 * 640 * 640 * 3 * 4]
out_shape = [1 * 25200 * 6 * 4]
# 加载Aidlite检测模型:支持tflite, tnn, mnn, ms, nb格式的模型加载
aidlite.ANNModel(model_path, in_shape, out_shape, 4, 0)
读取数据:
# 读取图片进行推理 # 设置测试集路径 source = "/home/Lesson5_code/yolov5_code/data/images/tests" images_list = os.listdir(source) print(images_list) frame_id = 0 # 读取数据集 for image_name in images_list: frame_id += 1 print("frame_id:", frame_id) image_path = os.path.join(source, image_name) frame = cvs.imread(image_path) # 预处理 img = preprocess_img(frame, target_shape=(640, 640), div_num=255, means=None, stds=None) # 数据转换:因为setTensor_Fp32()需要的是float32类型的数据,所以送入的input的数据需为float32,大多数的开发者都会忘记将图像的数据类型转换为float32 aidlite.setInput_Float32(img, 640, 640) # 模型推理API aidlite.invoke() # 读取返回的结果 pred = aidlite.getOutput_Float32(0) # 数据维度转换 pred = pred.reshape(1, 25200, 6)[0] # 模型推理后处理 pred = detect_postprocess(pred, frame.shape, [640, 640, 3], conf_thres=0.25, iou_thres=0.45) # 绘制推理结果 res_img = draw_detect_res(frame, pred) # cvs.imshow(res_img) # 测试结果展示停顿 #time.sleep(5) # 图片裁剪,提取车辆目标区域 extract_detect_res(frame, pred, image_name)
车辆区域提取
def extract_detect_res(img, all_boxes, image_name): ''' 检测结果提取 ''' img = img.astype(np.uint8) color_step = int(255/len(all_boxes)) for bi in range(len(all_boxes)): if len(all_boxes[bi]) == 0: continue count = 0 for box in all_boxes[bi]: x, y, w, h = [int(t) for t in box[:4]] #cv2.putText(img, f'{coco_class[bi]}', (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) #cv2.rectangle(img, (x,y), (x+w, y+h),(0, bi*color_step, 255-bi*color_step),thickness = 2) cut_img = img[y:(y+h), x:(x + w)] cv2.imwrite("/home/Lesson5_code/yolov5_code/aidlux/extract_results/" + image_name + "_" + str(count) + ".jpg",cut_img) count += 1 # cvs.imshow(cut_img)
### 读取图片 def get_image(): img_path = os.path.join("/home/Lesson5_code/adv_code/orig_images", "vid_5_31040.jpg_3.jpg") img_url = "https://farm1.static.flickr.com/230/524562325_fb0a11d1e1.jpg" def _load_image(): from skimage.io import imread return imread(img_path) / 255. if os.path.exists(img_path): return _load_image() else: import urllib urllib.request.urlretrieve(img_url, img_path) return _load_image() def tensor2npimg(tensor): return bchw2bhwc(tensor[0].cpu().numpy()) normalize = NormalizeByChannelMeanStd( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ### 常规模型加载 model = mobilenet_v2(pretrained=True) model.eval() model = nn.Sequential(normalize, model) model = model.to(device) ### 替身模型加载 model_su = resnet18(pretrained=True) model_su.eval() model_su = nn.Sequential(normalize, model_su) model_su = model_su.to(device) ### 数据预处理 np_img = get_image() img = torch.tensor(bhwc2bchw(np_img))[None, :, :, :].float().to(device) imagenet_label2classname = ImageNetClassNameLookup() ### 测试模型输出结果 pred = imagenet_label2classname(predict_from_logits(model(img))) print("test output:", pred) ### 输出原label pred_label = predict_from_logits(model_su(img)) ### 对抗攻击:L1PGD攻击算法 (eps = 100, 400, 1600) adversary = L1PGDAttack( model_su, eps=1600, eps_iter=2/255, nb_iter=80, rand_init=True, targeted=False) ### 完成攻击,输出对抗样本 advimg = adversary.perturb(img, pred_label) ### 展示源图片,对抗扰动,对抗样本以及模型的输出结果 show_images(model, img, advimg)
对抗攻击测试及效果展示:
### 攻击测试及效果展示 def show_images(model, img, advimg, enhance=127): np_advimg = tensor2npimg(advimg) np_perturb = tensor2npimg(advimg - img) #原图预测 pred = imagenet_label2classname(predict_from_logits(model(img))) #攻击样本预测 advpred = imagenet_label2classname(predict_from_logits(model(advimg))) import matplotlib.pyplot as plt plt.figure(figsize=(10, 5)) plt.subplot(1, 3, 1) plt.imshow(np_img) plt.axis("off") plt.title("original image\n prediction: {}".format(pred)) plt.subplot(1, 3, 2) # plt.imshow(np_perturb * enhance + 0.5) plt.axis("off") plt.title("the perturbation,\n enhanced {} times".format(enhance)) plt.subplot(1, 3, 3) # plt.imshow(np_advimg) # plt.imshow(np_advimg.astype(np.uint8)) plt.axis("off") plt.title("perturbed image\n prediction: {}".format(advpred)) plt.show()
3)对抗防御——监测及报警
### 对抗攻击监测模型 class Detect_Model(nn.Module): def __init__(self, num_classes=2): super(Detect_Model, self).__init__() self.num_classes = num_classes #model = create_model('mobilenetv3_large_075', pretrained=False, num_classes=num_classes) model = create_model('resnet50', pretrained=False, num_classes=num_classes) # self.multi_PreProcess = multi_PreProcess() pth_path = os.path.join("/home/Lesson5_code/model", 'track2_resnet50_ANT_best_albation1_64_checkpoint.pth') state_dict = torch.load(pth_path, map_location='cpu') is_strict = False if 'model' in state_dict.keys(): model.load_state_dict(state_dict['model'], strict=is_strict) else: model.load_state_dict(state_dict, strict=is_strict) normalize = NormalizeByChannelMeanStd( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # self.model = nn.Sequential(normalize, self.multi_PreProcess, model) self.model = nn.Sequential(normalize, model) def load_params(self): pass def forward(self, x): # x = x[:,:,32:193,32:193] # x = F.interpolate(x, size=(224,224), mode="bilinear", align_corners=True) # x = self.multi_PreProcess.forward(x) out = self.model(x) if self.num_classes == 2: out = out.softmax(1) #return out[:,1:] return out[:,1:]
抗攻击监测及预警
一般在实际场景的AI项目中,当对抗攻击监测模型,监测到对抗样本或者对抗攻击后,会出现一个告警弹窗,并且会告知安全人员及时进行安全排查。这里通过微信“喵提醒”的方式模拟现实场景(具体“喵提醒”使用在此不做赘述)。实现代码如下:
import os import torch import requests import time import torch.nn as nn from torchvision.models import mobilenet_v2,resnet18 from advertorch.utils import predict_from_logits from advertorch.utils import NormalizeByChannelMeanStd from robust_layer import GradientConcealment, ResizedPaddingLayer from timm.models import create_model from advertorch.attacks import LinfPGDAttack from advertorch_examples.utils import ImageNetClassNameLookup from advertorch_examples.utils import bhwc2bchw from advertorch_examples.utils import bchw2bhwc device = "cuda" if torch.cuda.is_available() else "cpu" def tensor2npimg(tensor): return bchw2bhwc(tensor[0].cpu().numpy()) normalize = NormalizeByChannelMeanStd( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) imagenet_label2classname = ImageNetClassNameLookup() ### 常规模型加载 class Model(nn.Module): def __init__(self, l=290): super(Model, self).__init__() self.l = l self.gcm = GradientConcealment() #model = resnet18(pretrained=True) model = mobilenet_v2(pretrained=True) # pth_path = "/Users/rocky/Desktop/训练营/model/mobilenet_v2-b0353104.pth" # print(f'Loading pth from {pth_path}') # state_dict = torch.load(pth_path, map_location='cpu') # is_strict = False # if 'model' in state_dict.keys(): # model.load_state_dict(state_dict['model'], strict=is_strict) # else: # model.load_state_dict(state_dict, strict=is_strict) normalize = NormalizeByChannelMeanStd( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) self.model = nn.Sequential(normalize, model) def load_params(self): pass def forward(self, x): #x = self.gcm(x) #x = ResizedPaddingLayer(self.l)(x) out = self.model(x) return out ### 对抗攻击监测模型 class Detect_Model(nn.Module): def __init__(self, num_classes=2): super(Detect_Model, self).__init__() self.num_classes = num_classes #model = create_model('mobilenetv3_large_075', pretrained=False, num_classes=num_classes) model = create_model('resnet50', pretrained=False, num_classes=num_classes) # self.multi_PreProcess = multi_PreProcess() pth_path = os.path.join("/home/Lesson5_code/model", 'track2_resnet50_ANT_best_albation1_64_checkpoint.pth') #pth_path = os.path.join("/Users/rocky/Desktop/训练营/Lesson5_code/model/", "track2_tf_mobilenetv3_large_075_64_checkpoint.pth") state_dict = torch.load(pth_path, map_location='cpu') is_strict = False if 'model' in state_dict.keys(): model.load_state_dict(state_dict['model'], strict=is_strict) else: model.load_state_dict(state_dict, strict=is_strict) normalize = NormalizeByChannelMeanStd( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # self.model = nn.Sequential(normalize, self.multi_PreProcess, model) self.model = nn.Sequential(normalize, model) def load_params(self): pass def forward(self, x): # x = x[:,:,32:193,32:193] # x = F.interpolate(x, size=(224,224), mode="bilinear", align_corners=True) # x = self.multi_PreProcess.forward(x) out = self.model(x) if self.num_classes == 2: out = out.softmax(1) #return out[:,1:] return out[:,1:] model = Model().eval().to(device) detect_model = Detect_Model().eval().to(device) ### 读取图片 def get_image(): img_path = os.path.join("/home/Lesson5_code/adv_code/orig_images", "vid_5_31040.jpg_3.jpg") # img_path = os.path.join("/home/Lesson5_code/adv_code/adv_results", "adv_image.png") img_url = "https://farm1.static.flickr.com/230/524562325_fb0a11d1e1.jpg" if os.path.exists(img_path): return _load_image(img_path) else: import urllib urllib.request.urlretrieve(img_url, img_path) return _load_image(img_path) def _load_image(img_path): from skimage.io import imread return imread(img_path) / 255. model = Model().eval().to(device) detect_model = Detect_Model().eval().to(device) if __name__ == "__main__": ## 读取图片 np_img = get_image() img = torch.tensor(bhwc2bchw(np_img))[None, :, :, :].float().to(device) ### 对抗攻击监测 detect_pred = detect_model(img) x = detect_pred.tolist()[0][0] ### 对抗攻击监测结果判断,如果风险,则报警,否则进一步进行后续业务(常规模型对样本进行分类) if detect_pred > 0.5: id = 't50SOmT' # 填写喵提醒中,发送的消息,这里放上前面提到的图片外链 text = "出现对抗攻击风险!!" print(text) print(image_name) # print("结果概率:") # print("%.2f" % x) print("\n") ts = str(time.time()) # 时间戳 type = 'json' # 返回内容格式 request_url = "http://miaotixing.com/trigger?" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.67 Safari/537.36 Edg/87.0.664.47'} result = requests.post(request_url + "id=" + id + "&text=" + text + "&ts=" + ts + "&type=" + type, headers=headers) else: print("正常样本") print(image_name) # print("结果概率:") # print("%.2f" % x) ### 正常样本分类 pred = imagenet_label2classname(predict_from_logits(model(img))) print("预测结果:") print(pred)
将车辆检测+检测框提取+使用对抗样本+AI安全监测与告警功能串联,运行app_main.py ,根据输入判断是否进行攻击,并进行后续操作,当发现对抗样本风险存在时,通过“喵提醒”进行报警。
主程序入口代码如下:
import os import torch import requests import time import torch.nn as nn # aidlux相关 from cvs import * import time import torch import requests import aidlite_gpu import torch.nn as nn import torchvision.utils import copy from torchvision.models import mobilenet_v2, resnet18 from advertorch.utils import predict_from_logits from advertorch.utils import NormalizeByChannelMeanStd from advertorch_examples.utils import ImageNetClassNameLookup from advertorch_examples.utils import bhwc2bchw from advertorch_examples.utils import bchw2bhwc from detect_adv_code import Model,Detect_Model from advertorch.attacks import FGSM, LinfPGDAttack from extractUtil import detect_postprocess, preprocess_img device = "cuda" if torch.cuda.is_available() else "cpu" normalize = NormalizeByChannelMeanStd( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) imagenet_label2classname = ImageNetClassNameLookup() # 模型加载 ### 对抗攻击常规模型加载 model = mobilenet_v2(pretrained=True) model.eval() model = nn.Sequential(normalize, model) model = model.to(device) ### 对抗攻击替身模型加载 model_su = resnet18(pretrained=True) model_su.eval() model_su = nn.Sequential(normalize, model_su) model_su = model_su.to(device) ### 常规模型加载 model_normal = Model().eval().to(device) ### 对抗攻击监测模型加载 model_attack = Detect_Model().eval().to(device) """ model-常规模型 model_su-替身模型 img_np - 原始图片 return: advimg - 增加对抗攻击后的图片 """ def BlackAttack(model, model_su, img_np): np_img = img_np[:,:,::-1] / 255.0 img = torch.tensor(bhwc2bchw(np_img))[None, :, :, :].float().to(device) ### 测试模型输出结果 pred = imagenet_label2classname(predict_from_logits(model(img))) print("test output:", pred) ### 输出原label pred_label = predict_from_logits(model_su(img)) ### 对抗攻击:PGD攻击算法 # adversary = LinfPGDAttack( # model_su, eps=8/255, eps_iter=2/255, nb_iter=80, # rand_init=True, targeted=False) adversary = LinfPGDAttack( model, loss_fn=nn.CrossEntropyLoss(reduction="sum"), eps=0.3, nb_iter=40, eps_iter=0.01, rand_init=True, clip_min=0.0, clip_max=1.0, targeted=False) ### 对抗攻击:L2PGD攻击算法 (eps = 0.5, 2, 8) # adversary3 = L2PGDAttack( # model_su, eps=0.5, eps_iter=2/255, nb_iter=80, # rand_init=True, targeted=False) ### 完成攻击,输出对抗样本 advimg = adversary.perturb(img, pred_label) # advimg = np.transpose(advimg.squeeze().numpy(), (1, 2, 0)) return advimg def tensor2npimg(tensor): return bchw2bhwc(tensor[0].cpu().numpy()) ### 读取图片 def get_image(img_path): img_url = "https://farm1.static.flickr.com/230/524562325_fb0a11d1e1.jpg" if os.path.exists(img_path): return _load_image(img_path) else: import urllib urllib.request.urlretrieve(img_url, img_path) return _load_image(img_path) def _load_image(img_path): from skimage.io import imread return imread(img_path) / 255. # AidLite初始化:调用AidLite进行AI模型的加载与推理,需导入aidlite aidlite = aidlite_gpu.aidlite() # Aidlite模型路径 model_path = '/home/Lesson5_code/model/yolov5_car_best-fp16.tflite' # 定义输入输出shape in_shape = [1 * 640 * 640 * 3 * 4] out_shape = [1 * 25200 * 6 * 4] # 加载Aidlite检测模型:支持tflite, tnn, mnn, ms, nb格式的模型加载 aidlite.ANNModel(model_path, in_shape, out_shape, 4, 0) # 读取图片进行推理 # 设置测试集路径 source = "/home/Lesson5_code/adv_code/test_images" images_list = os.listdir(source) print(images_list) if __name__ == '__main__': print("是否进行攻击?") isAttack = input() # 读取图片进行推理 # 设置测试集路径 print(images_list) frame_id = 0 # 读取数据集 for image_name in images_list: frame_id += 1 print("frame_id:", frame_id) image_path = os.path.join(source, image_name) frame = cvs.imread(image_path) # 1、ROI提取 # 预处理 img = preprocess_img(frame, target_shape=(640, 640), div_num=255, means=None, stds=None) # 数据转换:因为setTensor_Fp32()需要的是float32类型的数据,所以送入的input的数据需为float32,大多数的开发者都会忘记将图像的数据类型转换为float32 aidlite.setInput_Float32(img, 640, 640) # 模型推理API aidlite.invoke() # 读取返回的结果 pred = aidlite.getOutput_Float32(0) # 数据维度转换 pred = pred.reshape(1, 25200, 6)[0] # 模型推理后处理 pred = detect_postprocess(pred, frame.shape, [640, 640, 3], conf_thres=0.25, iou_thres=0.45) all_boxes = pred[0] frame = frame.astype(np.uint8) if len(all_boxes) > 0: for box in all_boxes: x, y, w, h = [int(t) for t in box[:4]] cut_img = frame[y:(y+h), x:(x + w)] # print("---", cut_img.shape) # 2、根据输入判断是否进行攻击 if isAttack == 1: advimg = BlackAttack(model, model_su, cut_img) print("+++++", type(advimg), advimg.shape) else: cut_img = copy.deepcopy(cut_img[:,:,::-1] / 255) advimg = torch.tensor(bhwc2bchw(cut_img))[None, :, :, :].float().to(device) ### 无对抗攻击监测模型 # detect_pred = model_det(advimg) ### 3、对抗攻击监测 detect_pred = model_attack(advimg) # print(detect_pred) x = detect_pred.tolist()[0][0] ### 4、对抗攻击监测结果判断,如果风险,则报警,否则进一步进行后续业务(常规模型对样本进行分类) if detect_pred > 0.5: id = 't50SOmT' # 填写喵提醒中,发送的消息,这里放上前面提到的图片外链 text = "出现对抗攻击风险!!" print(text) print(image_name) # print("结果概率:") # print("%.2f" % x) print("\n") ts = str(time.time()) # 时间戳 type = 'json' # 返回内容格式 request_url = "http://miaotixing.com/trigger?" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.67 Safari/537.36 Edg/87.0.664.47'} result = requests.post(request_url + "id=" + id + "&text=" + text + "&ts=" + ts + "&type=" + type, headers=headers) else: print("正常样本") print(image_name) # print("结果概率:") # print("%.2f" % x) ### 正常样本分类 pred = imagenet_label2classname(predict_from_logits(model_normal(advimg))) print("预测结果:") print(pred)
效果展示:
本次项目实战来源于AidLux智慧交通AI安全实战训练营,通过本次训练营学习对智慧交通中AI算法应用的安全及风险防御知识有了更多的认识,同时通过代码实践和在Aidlux端的部署实践,实现了车辆检测和安全风险防御预警。
另外。本次实践主要是基于Aidlux&机器学习算法的实践应用,代码部署于Aidlux端,Aidlux端部署具体操作在此不做赘述,可以参见另一文章。
最后,感谢此次训练营的培训Rocky老师的用心指导。
本项目代码地址:https://github.com/Chenshunli/aiAdvCode.git
参考链接:
AidLux智慧交通AI安全实战训练营学习参考
对抗攻击参考1
对抗攻击参考2
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。