MTCNN 的 “MT”是指多任务学习(Multi-Task),在同一个任务中同时学习“识别人脸”、“边框回归”,“人脸关键点识别”。
import sys import numpy as np import cv2 import os import numpy.random as npr stdsize = 12 anno_file = "label.txt" # im_dir = "samples" pos_save_dir = str(stdsize) + "/positive" part_save_dir = str(stdsize) + "/part" neg_save_dir = str(stdsize) + '/negative' save_dir = "12" def IoU(pr_box, boxes): """Compute IoU between detect box and gt boxes Parameters: ---------- box: numpy array , shape (5, ): x1, y1, x2, y2, score input box boxes: numpy array, shape (n, 4): x1, y1, x2, y2 input ground truth boxes Returns: ------- ovr: numpy.array, shape (n, ) IoU """ # print("随机锚框:",pr_box) box_area = (pr_box[2] - pr_box[0] + 1) * (pr_box[3] - pr_box[1] + 1) # print("随机面积box_area:",box_area) # print("(boxes[:, 2] - boxes[:, 0] + 1):",(boxes[:, 2] - boxes[:, 0] + 1)) #XML真实区域 X2-X1 +1 = W Y2-Y1 = H W*H area = (boxes[:, 2] - boxes[:, 0] + 1) * (boxes[:, 3] - boxes[:, 1] + 1) # print("真实面积area:",area) # print("probx[0]",pr_box[0]) # boxes[:, 0]代表取boxes这个nx4矩阵所有行的第一列数据 xx1 = np.maximum(pr_box[0], boxes[:, 0]) # print("xx1",xx1) yy1 = np.maximum(pr_box[1], boxes[:, 1]) # print("yy1",yy1) xx2 = np.minimum(pr_box[2], boxes[:, 2]) # print("xx2",xx2) yy2 = np.minimum(pr_box[3], boxes[:, 3]) # print("yy2",yy2) # compute the width and height of the bounding box # print("xx2-xx1",(xx2-xx1)) w = np.maximum(0, xx2 - xx1 + 1) h = np.maximum(0, yy2 - yy1 + 1) # inter_area = (xx1 - xx2 + 1) * (yy1 - yy2 + 1) # w = np.max(xx1,yy1) inter = w * h # print("inter",inter_area) ovr = inter / (box_area + area - inter) print("IOU:",ovr) return ovr # 生成一系列文件夹用于存储三类样本 def mkr(dr): if not os.path.exists(dr): os.mkdir(dr) mkr(save_dir) mkr(pos_save_dir) mkr(part_save_dir) mkr(neg_save_dir) # 生成一系列txt文档用于存储Positive,Negative,Part三类数据的信息 f1 = open(os.path.join(save_dir, 'pos_' + str(stdsize) + '.txt'), 'w') f2 = open(os.path.join(save_dir, 'neg_' + str(stdsize) + '.txt'), 'w') f3 = open(os.path.join(save_dir, 'part_' + str(stdsize) + '.txt'), 'w') # 读取label.txt with open(anno_file, 'r') as f: annotations = f.readlines() del annotations[0] num = len(annotations) print("%d pics in total" % num) p_idx = 0 # positive n_idx = 0 # negative d_idx = 0 # dont care idx = 0 box_idx = 0 for annotation in annotations: # print("annotation",annotation) annotation = annotation.strip().split(' ') im_path = annotation[0] bbox = list(map(float,annotation[1:])) boxes = np.array(bbox, dtype=np.float32).reshape(-1, 4) boxes[:, 2] += boxes[:, 0] - 1 boxes[:, 3] += boxes[:, 1] - 1 # print("boxes",boxes) img = cv2.imread(im_path) # print(img.shape) idx += 1 if idx % 100 == 0: print(idx, "images done") height, width, channel = img.shape print(img.shape) neg_num = 0 while neg_num < 50: # 生成随机数,对每张数据集中的图像进行切割,生成一系列小的图像 size = npr.randint(stdsize, min(width, height) / 2) nx = npr.randint(0, width - size) ny = npr.randint(0, height - size) crop_box = np.array([nx, ny, nx + size, ny + size]) # print(crop_box) # print("boxes",boxes) # 计算小的图像与标注产生的检测框之间的IoU Iou = IoU(crop_box, boxes) # print(Iou) cropped_im = img[ny : ny + size, nx : nx + size, :] resized_im = cv2.resize(cropped_im, (stdsize, stdsize), interpolation=cv2.INTER_LINEAR) if np.max(Iou) < 0.3: # Iou with all gts must below 0.3 save_file = os.path.join(neg_save_dir, "%s.jpg"%n_idx) f2.write(str(stdsize)+"/negative/%s"%n_idx + ' 0\n') cv2.imwrite(save_file, resized_im) n_idx += 1 neg_num += 1 for box in boxes: print(box) # box (x_left, y_top, x_right, y_bottom) x1, y1, x2, y2 = box w = x2 - x1 + 1 h = y2 - y1 + 1 # max(w, h) < 40:参数40表示忽略的最小的脸的大小 # in case the ground truth boxes of small faces are not accurate if max(w, h) < 20 or x1 < 0 or y1 < 0: continue # 生成与gt有重叠的反面例子 for i in range(5): size = npr.randint(stdsize, min(width, height) / 2) # delta_x and delta_y are offsets of (x1, y1) delta_x = npr.randint(max(-size, -x1), w) delta_y = npr.randint(max(-size, -y1), h) nx1 = int(max(0, x1 + delta_x)) ny1 = int(max(0, y1 + delta_y)) if nx1 + size > width or ny1 + size > height: continue crop_box = np.array([nx1, ny1, nx1 + size, ny1 + size]) Iou = IoU(crop_box, boxes) # cropped_im = img[ny: ny + size, nx: nx + size, :] cropped_im = img[ny1 : ny1 + size, nx1 : nx1 + size, :] resized_im = cv2.resize(cropped_im, (stdsize, stdsize), interpolation=cv2.INTER_LINEAR) if np.max(Iou) < 0.3: # Iou with all gts must below 0.3 save_file = os.path.join(neg_save_dir, "%s.jpg" % n_idx) f2.write(str(stdsize)+"/negative/%s" % n_idx + ' 0\n') cv2.imwrite(save_file, resized_im) n_idx += 1 # generate positive examples and part faces for i in range(20): size = npr.randint(int(min(w, h) * 0.8), np.ceil(1.25 * max(w, h))) # delta here is the offset of box center delta_x = npr.randint(-w * 0.2, w * 0.2) delta_y = npr.randint(-h * 0.2, h * 0.2) nx1 = max(x1 + w / 2 + delta_x - size / 2, 0) ny1 = max(y1 + h / 2 + delta_y - size / 2, 0) nx2 = nx1 + size ny2 = ny1 + size if nx2 > width or ny2 > height: continue crop_box = np.array([nx1, ny1, nx2, ny2]) offset_x1 = (x1 - nx1) / float(size) offset_y1 = (y1 - ny1) / float(size) offset_x2 = (x2 - nx2) / float(size) offset_y2 = (y2 - ny2) / float(size) cropped_im = img[int(ny1):int(ny2), int(nx1):int(nx2), :] resized_im = cv2.resize(cropped_im, (stdsize, stdsize), interpolation=cv2.INTER_LINEAR) box_ = box.reshape(1, -1) if IoU(crop_box, box_) >= 0.65: save_file = os.path.join(pos_save_dir, "%s.jpg"%p_idx) f1.write(str(stdsize)+"/positive/%s"%p_idx + ' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) p_idx += 1 elif IoU(crop_box, box_) >= 0.4: save_file = os.path.join(part_save_dir, "%s.jpg"%d_idx) f3.write(str(stdsize)+"/part/%s"%d_idx + ' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) d_idx += 1 box_idx += 1 print("%s images done, pos: %s part: %s neg: %s"%(idx, p_idx, d_idx, n_idx)) f1.close() f2.close() f3.close()
执行完后,会出现下图中的 名称为“12”的文件夹、里面所包含的东西。我不多叙述。上一篇博客里有写。
import sys import os save_dir = "12" if not os.path.exists(save_dir): os.mkdir(save_dir) f1 = open(os.path.join(save_dir, 'pos_%s.txt'%(save_dir)), 'r') f2 = open(os.path.join(save_dir, 'neg_%s.txt'%(save_dir)), 'r') f3 = open(os.path.join(save_dir, 'part_%s.txt'%(save_dir)), 'r') pos = f1.readlines() neg = f2.readlines() part = f3.readlines() f = open(os.path.join(save_dir, 'label-train%s.txt'%(save_dir)), 'w') for i in range(int(len(pos))): p = pos[i].find(" ") + 1 pos[i] = pos[i][:p-1] + ".jpg " + pos[i][p:-1] + "\n" f.write(pos[i]) for i in range(int(len(neg))): p = neg[i].find(" ") + 1 neg[i] = neg[i][:p-1] + ".jpg " + neg[i][p:-1] + " -1 -1 -1 -1\n" f.write(neg[i]) for i in range(int(len(part))): p = part[i].find(" ") + 1 part[i] = part[i][:p-1] + ".jpg " + part[i][p:-1] + "\n" f.write(part[i]) f1.close() f2.close() f3.close()
看到这个label-train.txt 文件。这就是我们需要的训练集了。
import os import random import sys import tensorflow as tf import cv2 from PIL import Image def _int64_feature(value): """Wrapper for insert int64 feature into Example proto.""" if not isinstance(value, list): value = [value] return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) def _float_feature(value): """Wrapper for insert float features into Example proto.""" if not isinstance(value, list): value = [value] return tf.train.Feature(float_list=tf.train.FloatList(value=value)) def _bytes_feature(value): """Wrapper for insert bytes features into Example proto.""" if not isinstance(value, list): value = [value] return tf.train.Feature(bytes_list=tf.train.BytesList(value=value)) def _process_image_withoutcoder(filename): """ 利用cv2将filename指向的图片tostring """ image = cv2.imread(filename) # transform data into string format image_data = image.tostring() assert len(image.shape) == 3 height = image.shape[0] width = image.shape[1] assert image.shape[2] == 3 # return string data and initial height and width of the image return image_data, height, width def _convert_to_example_simple(image_example, image_buffer): """ covert to tfrecord file Parameter ------------ image_example: dict, an image example image_buffer: string, JPEG encoding of RGB image Return ----------- Example proto """ class_label = image_example['label'] bbox = image_example['bbox'] roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']] # landmark = [bbox['xlefteye'],bbox['ylefteye'],bbox['xrighteye'],bbox['yrighteye'],bbox['xnose'],bbox['ynose'], # bbox['xleftmouth'],bbox['yleftmouth'],bbox['xrightmouth'],bbox['yrightmouth']] example = tf.train.Example(features=tf.train.Features(feature={ 'image/encoded': _bytes_feature(image_buffer), 'image/label': _int64_feature(class_label), 'image/roi': _float_feature(roi), # 'image/landmark': _float_feature(landmark) })) return example # 从图片和注释文件里加载数据并将其添加到TFRecord里 # 参数(变量):filename:存有数据的字典;tfrecord_writer:用来写入TFRecord的writer def _add_to_tfrecord(filename, image_example, tfrecord_writer): # print('---', filename) # imaga_data:转化为字符串的图片 # height:图片原始高度 # width:图片原始宽度 # image_example:包含图片信息的字典 # print(filename) image_data, height, width = _process_image_withoutcoder(filename) example = _convert_to_example_simple(image_example, image_data) tfrecord_writer.write(example.SerializeToString()) # 将imaga_data转化到image_example中并写入tfrecord def _get_output_filename(output_dir,net): # 定义一下输出的文件名 # return '%s/%s_%s_%s.tfrecord' % (output_dir, name, net, st) # st = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # time.strftime() 函数接收以时间元组,并返回以可读字符串表示的当地时间,格式由参数format决定:time.strftime(format[, t]),用来输出当前时间 # 返回的是'../../DATA/imglists/PNet/train_PNet_landmark.tfrecord' return '%s/train_%s_landmark.tfrecord' % (output_dir,net) def run(dataset_dir,net,output_dir,shuffling=False): """ 运行转换操作 Args: dataset_dir: 数据集所在的数据集目录 output_dir: 输出目录 """ # tfrecord name tf_filename = _get_output_filename(output_dir,net) # '../../DATA/imglists/PNet/train_PNet_landmark.tfrecord' if tf.io.gfile.exists(tf_filename): # tf.io.gfile模块提供了文件操作的API,包括文件的读取、写入、删除、复制等等 print('Dataset files already exist. Exiting without re-creating them.') # 判断是否存在同名文件 return # 获得数据集,并打乱顺序 dataset = get_dataset(dataset_dir) print(dataset) # filenames = dataset['filename'] if shuffling: tf_filename = tf_filename + '_shuffle' # random.seed(12345454) random.shuffle(dataset) # 打乱dataset数据集的顺序 # Process dataset files. # write the data to tfrecord print('lala') with tf.io.TFRecordWriter(tf_filename) as tfrecord_writer: for i, image_example in enumerate(dataset): # 读取dataset的索引和内容 if (i + 1) % 1 == 0: sys.stdout.write('\r>> %d/%d images has been converted' % ( i + 1, len(dataset))) # 输出“x00/ len(dataset) images has been converted” sys.stdout.flush() # 以一定间隔时间刷新输出 filename = image_example['filename'] # 赋值 _add_to_tfrecord(filename, image_example, tfrecord_writer) # 最后,编写标签文件 # labels_to_class_names = dict(zip(range(len(_CLASS_NAMES)), _CLASS_NAMES)) # dataset_utils.write_label_file(labels_to_class_names, dataset_dir) print('\nFinished converting the MTCNN dataset!') def get_dataset(dir): # 获取文件名字,标签和注释 item = 'label-train%s.txt'%(dir) dataset_dir = os.path.join(dir, item) # dataset_dir = '../../DATA/imglists/PNet/train_PNet_landmark.txt' # print(dataset_dir) imagelist = open(dataset_dir, 'r') # 以只读的形式打开train_PNet_landmark.txt,并传入imagelist里面 dataset = [] # 新建列表 for line in imagelist.readlines(): # 按行读取imagelist里面的内容 info = line.strip().split(' ') # .strip().split()去除每一行首尾空格并且以空格为分隔符读取内容到info里面 data_example = dict() # 新建字典 bbox = dict() data_example['filename'] = info[0] # filename=info[0] # print(data_example['filename']) data_example['label'] = int(info[1]) # label=info[1],info[1]的值有四种可能,1,0,-1,-2;分别对应着正、负、无关、关键点样本 bbox['xmin'] = 0 # 初始化bounding box的值 bbox['ymin'] = 0 bbox['xmax'] = 0 bbox['ymax'] = 0 # bbox['xlefteye'] = 0 # 初始化人脸坐标的值 # bbox['ylefteye'] = 0 # bbox['xrighteye'] = 0 # bbox['yrighteye'] = 0 # bbox['xnose'] = 0 # bbox['ynose'] = 0 # bbox['xleftmouth'] = 0 # bbox['yleftmouth'] = 0 # bbox['xrightmouth'] = 0 # bbox['yrightmouth'] = 0 if len(info) == 6: # 当info的长度等于6时,表示此时的info是正样本或者无关样本 bbox['xmin'] = float(info[2]) bbox['ymin'] = float(info[3]) bbox['xmax'] = float(info[4]) bbox['ymax'] = float(info[5]) # if len(info) == 12: # 当info的长度等于12时,表示此时的info是landmark样本 # bbox['xlefteye'] = float(info[2]) # bbox['ylefteye'] = float(info[3]) # bbox['xrighteye'] = float(info[4]) # bbox['yrighteye'] = float(info[5]) # bbox['xnose'] = float(info[6]) # bbox['ynose'] = float(info[7]) # bbox['xleftmouth'] = float(info[8]) # bbox['yleftmouth'] = float(info[9]) # bbox['xrightmouth'] = float(info[10]) # bbox['yrightmouth'] = float(info[11]) data_example['bbox'] = bbox # 将bounding box值传入字典 dataset.append(data_example) # 将data_example字典内容传入列表dataset return dataset # 返回的是dataset,datase是个列表,但里面每个元素都是一个字典,每个字典都含有3个key,分别是filename、label和bounding box if __name__ == '__main__': dir = '12' net = 'PNet' output_directory = '12' run(dir,net,output_directory,shuffling=True)
我画红圈的便是我们生成的tfrecord 格式文件。
import tensorflow as tf import numpy as np def image_color_distort(inputs): inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5) inputs = tf.image.random_brightness(inputs, max_delta=0.2) inputs = tf.image.random_hue(inputs,max_delta= 0.2) inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5) return inputs def red_tf(imgs,net_size): raw_image_dataset = tf.data.TFRecordDataset(imgs).shuffle(1000) image_feature_description = { 'image/encoded': tf.io.FixedLenFeature([], tf.string), 'image/label': tf.io.FixedLenFeature([], tf.int64), 'image/roi': tf.io.FixedLenFeature([4], tf.float32), } def _parse_image_function(example_proto): # Parse the input tf.Example proto using the dictionary above. return tf.io.parse_single_example(example_proto, image_feature_description) parsed_image_dataset = raw_image_dataset.map(_parse_image_function) print(parsed_image_dataset) image_batch = [] label_batch = [] bbox_batch = [] for image_features in parsed_image_dataset: image_raw = tf.io.decode_raw(image_features['image/encoded'],tf.uint8) # 将值规划在[-1,1]内 images = tf.reshape(image_raw, [net_size, net_size, 3]) image = (tf.cast(images, tf.float32) - 127.5) / 128 #图像变色 image = image_color_distort(image) image_batch.append(image) label = tf.cast(image_features['image/label'], tf.float32) label_batch.append(label) roi = tf.cast(image_features['image/roi'], tf.float32) bbox_batch.append(roi) return image_batch,label_batch,bbox_batch
import tensorflow.keras as keras import tensorflow as tf import numpy as np import cv2 #处理的12X12网络 def Pnet(): input = tf.keras.Input(shape=[None, None, 3]) x = tf.keras.layers.Conv2D(10, (3, 3), name='conv1',kernel_regularizer=keras.regularizers.l2(0.0005))(input) x = tf.keras.layers.PReLU(tf.constant_initializer(0.25),shared_axes=[1, 2], name='PReLU1')(x) x = tf.keras.layers.MaxPooling2D((2, 2))(x) x = tf.keras.layers.Conv2D(16, (3, 3),name='conv2',kernel_regularizer=keras.regularizers.l2(0.0005))(x) x = tf.keras.layers.PReLU(tf.constant_initializer(0.25),shared_axes=[1, 2], name='PReLU2')(x) x = tf.keras.layers.Conv2D(32, (3, 3),name='conv3',kernel_regularizer=keras.regularizers.l2(0.0005))(x) x = tf.keras.layers.PReLU(tf.constant_initializer(0.25),shared_axes=[1, 2], name='PReLU3')(x) classifier = tf.keras.layers.Conv2D(2, (1, 1), activation='softmax',name='conv4-1')(x) cls_prob = tf.squeeze(classifier, [1, 2], name='cls_prob') bbox_regress = tf.keras.layers.Conv2D(4, (1, 1), name='conv4-2')(x) bbox_pred = tf.squeeze(bbox_regress,[1,2],name='bbox_pred') model = tf.keras.models.Model([input], [classifier, bbox_regress]) return model #处理的24X24网络 def Rnet(): """定义RNet网络的架构""" input = tf.keras.Input(shape=[24, 24, 3]) x = tf.keras.layers.Conv2D(28, (3, 3),strides=1,padding='valid',name='conv1')(input) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu1')(x) x = tf.keras.layers.MaxPooling2D(pool_size=3,strides=2,padding='same')(x) x = tf.keras.layers.Conv2D(48, (3, 3),strides=1,padding='valid',name='conv2')(x) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu2')(x) x = tf.keras.layers.MaxPooling2D(pool_size=3,strides=2)(x) x = tf.keras.layers.Conv2D(64, (2, 2),strides=1,padding='valid',name='conv3')(x) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu3')(x) x = tf.keras.layers.Permute((3, 2, 1))(x) x = tf.keras.layers.Flatten()(x) x = tf.keras.layers.Dense(128, name='conv4')(x) x = tf.keras.layers.PReLU(name='prelu4')(x) classifier = tf.keras.layers.Dense(2,activation='softmax',name='conv5-1')(x) bbox_regress = tf.keras.layers.Dense(4, name='conv5-2')(x) model = tf.keras.models.Model([input], [classifier, bbox_regress]) return model #处理的48X48网络 def Onet(): """定义ONet网络的架构""" input = tf.keras.layers.Input(shape=[48, 48, 3]) # 48,48,3 -> 23,23,32 x = tf.keras.layers.Conv2D(32, (3, 3),strides=1,padding='valid',name='conv1')(input) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu1')(x) x = tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same')(x) # 23,23,32 -> 10,10,64 x = tf.keras.layers.Conv2D(64, (3, 3),strides=1,padding='valid',name='conv2')(x) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu2')(x) x = tf.keras.layers.MaxPool2D(pool_size=3,strides=2)(x) # 8,8,64 -> 4,4,64 x = tf.keras.layers.Conv2D(64, (3, 3),strides=1,padding='valid',name='conv3')(x) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu3')(x) x = tf.keras.layers.MaxPool2D(pool_size=2)(x) # 4,4,64 -> 3,3,128 x = tf.keras.layers.Conv2D(128, (2, 2),strides=1,padding='valid',name='conv4')(x) x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu4')(x) # 3,3,128 -> 128,12,12 x = tf.keras.layers.Permute((3, 2, 1))(x) # 1152 -> 256 x = tf.keras.layers.Flatten()(x) x = tf.keras.layers.Dense(256, name='conv5')(x) x = tf.keras.layers.PReLU(name='prelu5')(x) # 鉴别 # 256 -> 2 256 -> 4 256 -> 10 classifier = tf.keras.layers.Dense(2,activation='softmax',name='conv6-1')(x) bbox_regress = tf.keras.layers.Dense(4, name='conv6-2')(x) landmark_regress = tf.keras.layers.Dense(10, name='conv6-3')(x) model = tf.keras.models.Model([input], [classifier, bbox_regress,landmark_regress]) return model #人脸分类损失函数 def cls_ohem(cls_prob, label): zeros = tf.zeros_like(label, dtype=tf.float32) # 若label中的值小于等于0,则为0,否则为1,就是把label中-1变为0 label_filter_invalid = tf.where(tf.math.less(label,[0]),zeros,label) ## 类别size[2*batch] num_cls_prob = tf.size(cls_prob) #把cls_porob变成一维 cls_prob_reshape = tf.reshape(cls_prob,[num_cls_prob,-1]) label_int = tf.cast(label_filter_invalid,dtype=tf.int32) num_row = tf.cast(cls_prob.get_shape()[0],dtype=tf.int32) #[batch] # 对应某一batch而言,batch*2为非人类别概率, # batch*2+1为人概率类别,indices为对应 cls_prob_reshpae # 应该的真实值,后续用交叉熵计算损失 row = tf.range(num_row)*2 #[0 2 4 6] #就是如果label是pos就看1X2中的第2个,neg或part就看第1个 indices_ = row + label_int # 从cls_prob_reshape中获取 索引为indices_的值,squeeze后变成一维的长度为batch_size的张量。 label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_)) #OHEM向前时,全部的Roi通过网络 loss = -tf.math.log(label_prob+1e-10) zeros = tf.zeros_like(label_prob, dtype=tf.float32) ones = tf.ones_like(label_prob, dtype=tf.float32) # 把标签为±1的样本对应的索引设为1,其余设为0 #这一步是用来计算较大的候选RIO 用来OHEM valid_inds = tf.where(label < zeros,zeros,ones) #获取有效的样本数(即标签为±1 (正样本和负样本的数量) num_valid = tf.reduce_sum(valid_inds) #num_keep_radio = 0.7 选取70%的数据 keep_num = tf.cast(num_valid*0.7,dtype=tf.int32) # print("keep_num",keep_num) # 只选取neg,pos的70%损失 loss = loss * num_valid #OHEM就是对loss从高到底排序 # 反向时,根据排序选择Batch-size/N 个loss值得最大样本来后向传播model的权重 loss,_ = tf.math.top_k(loss, k=keep_num) return tf.math.reduce_mean(loss) # 人脸框损失函数 def bbox_ohem(bbox_pred,bbox_target,label): zeros_index = tf.zeros_like(label,dtype=tf.float32) ones_index = tf.ones_like(label,dtype=tf.float32) # 等于±1的有效为1,不等于1的无效为0,即筛选出pos和part的索引-OHEM策略 valid_inds = tf.where(tf.math.equal(tf.math.abs(label),1),ones_index,zeros_index) #计算平方差损失 square_error = tf.math.square(bbox_pred - bbox_target) #16-1-16-14 square_error = tf.math.reduce_sum(square_error,axis=1) #16*16*4 # 保留数据的个数 num_valid = tf.math.reduce_sum(valid_inds) keep_num = tf.cast(num_valid,dtype=tf.int32) #OHEM策略,保留部分pos,part的损失 square_error = square_error * num_valid # 选出最大的进行反向传播 _,k_index = tf.math.top_k(square_error,k=keep_num) # 将部分pos样本和part样本的平方和提取出来 square_error = tf.gather(square_error, k_index) return tf.reduce_mean(square_error) #人脸五官损失函数 def landmark_ohem(landmark_pred,landmark_target,label): #keep label =-2 then do landmark detection ones = tf.ones_like(label,dtype=tf.float32) zeros = tf.zeros_like(label,dtype=tf.float32) # 只保留landmark数据 valid_inds = tf.where(tf.equal(label,-2),ones,zeros) # 计算平方差损失 square_error = tf.square(landmark_pred-landmark_target) square_error = tf.reduce_sum(square_error,axis=1) # 保留数据个数 num_valid = tf.math.reduce_sum(valid_inds) # 0 keep_num = tf.cast(num_valid, dtype=tf.int32) # 0 # 保留landmark部分数据损失 square_error = square_error*valid_inds square_error, k_index = tf.nn.top_k(square_error, k=keep_num) # square_error = tf.gather(square_error, k_index) return tf.math.reduce_mean(square_error) # 当square_error为空时会出现nan bug #准确率 def cal_accuracy(cls_prob,label): # 预测最大概率的类别,0代表无人,1代表有人 pred = tf.argmax(cls_prob,axis=1) label_int = tf.cast(label,tf.int64) #返回pos和neg示例的索引 :按元素返回(x> = y)的真值 cond = tf.where(tf.greater_equal(label_int,0)) picked = tf.squeeze(cond) #true_label选出picked(pos和neg)坐标 label_picked = tf.gather(label_int,picked) #pre_label选出picked(pos和neg)坐标 pred_picked = tf.gather(pred,picked) # accuracy_op = tf.math.reduce_sum(tf.cast(tf.equal(label_picked,pred_picked),dtype=tf.float32)) # accuracy = tf.math.reduce_mean(tf.cast(tf.math.equal(label_picked, pred_picked), tf.float32)) return label_picked,pred_picked # return accuracy
import tensorflow as tf import tensorflow.keras as keras from tensorflow.keras import metrics from red_tf import * from MTCNN_ import Pnet,cls_ohem,bbox_ohem from tqdm import tqdm import os data_path = "12/train_PNet_landmark.tfrecord_shuffle" # 加载pokemon数据集的工具! def load_pokemon(mode='train'): """ 加载pokemon数据集的工具! :param root: 数据集存储的目录 :param mode: mode:当前加载的数据是train,val,还是test :return: """ # # 创建数字编码表,范围0-4; # name2label = {} # "sq...":0 类别名:类标签; 字典 可以看一下目录,一共有5个文件夹,5个类别:0-4范围; # for name in sorted(os.listdir(os.path.join(root))): # 列出所有目录; # if not os.path.isdir(os.path.join(root, name)): # continue # # 给每个类别编码一个数字 # name2label[name] = len(name2label.keys()) # 读取Label信息;保存索引文件images.csv # [file1,file2,], 对应的标签[3,1] 2个一一对应的list对象。 # 根据目录,把每个照片的路径提取出来,以及每个照片路径所对应的类别都存储起来,存储到CSV文件中。 size = 12 images,labels,boxes = red_tf(data_path,size) # 图片切割成,训练70%,验证15%,测试15%。 if mode == 'train': # 70% 训练集 images = images[:int(0.7 * len(images))] labels = labels[:int(0.7 * len(labels))] boxes = boxes[:int(0.7 * len(boxes))] elif mode == 'val': # 15% = 70%->85% 验证集 images = images[int(0.7 * len(images)):int(0.85 * len(images))] labels = labels[int(0.7 * len(labels)):int(0.85 * len(labels))] boxes = boxes[int(0.7 * len(boxes)):int(0.85 * len(boxes))] else: # 15% = 70%->85% 测试集 images = images[int(0.85 * len(images)):] labels = labels[int(0.85 * len(labels)):] boxes = boxes[int(0.85 * len(boxes)):] ima = tf.data.Dataset.from_tensor_slices(images) lab = tf.data.Dataset.from_tensor_slices(labels) roi = tf.data.Dataset.from_tensor_slices(boxes) # ima,lab,roi = preprocess(ima,lab,roi) train_data = tf.data.Dataset.zip((ima, lab, roi)).shuffle(1000).batch(32) train_data = list(train_data.as_numpy_iterator()) return train_data import numpy as np def train(eopch): model = Pnet() model.load_weights("pnet.h5") optimizer = keras.optimizers.Adam(learning_rate=1e-3) off = 1000 acc_meter = metrics.Accuracy() for epoch in tqdm(range(eopch)): for i,(img,lab,boxes) in enumerate(load_pokemon("train")): #img = image_color_distort(img) # 开一个gradient tape, 计算梯度 with tf.GradientTape() as tape: cls_prob, bbox_pred = model(img) cls_prob = tf.squeeze(cls_prob,[1,2]) cls_loss = cls_ohem(cls_prob, lab) bbox_pred = tf.squeeze(bbox_pred,[1,2]) bbox_loss = bbox_ohem(bbox_pred, boxes,lab) # landmark_loss = landmark_loss_fn(landmark_pred, landmark_batch, label_batch) # accuracy = cal_accuracy(cls_prob, label_batch) total_loss_value = cls_loss + 0.5 * bbox_loss grads = tape.gradient(total_loss_value, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if i % 200 == 0: print('Training loss (for one batch) at step %s: %s' % (i, float(total_loss_value))) print('Seen so far: %s samples' % ((i + 1) * 6)) for i, (v_img, v_lab1, boxes) in enumerate(load_pokemon("val")): v_img = image_color_distort(v_img) with tf.GradientTape() as tape: cls_prob, bbox_pred = model(v_img) cls_loss = cls_ohem(cls_prob, v_lab1) bbox_loss = bbox_ohem(bbox_pred, boxes,v_lab1) # landmark_loss = landmark_loss_fn(landmark_pred, landmark_batch, label_batch) # accuracy = cal_accuracy(cls_prob, label_batch) total_loss_value = cls_loss + 0.5 * bbox_loss grads = tape.gradient(total_loss_value, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if i % 200 == 0: print('val___ loss (for one batch) at step %s: %s' % (i, float(total_loss_value))) print('Seen so far: %s samples' % ((i + 1) * 6)) model.save_weights('./Weights/pnet_wight/pnet_30.ckpt') train(30)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。