实时 摔倒识别 /运动分析/打架等异常行为识别/控制手势识别等所有行为识别全家桶 原理 + 代码 + 数据+ 模型 开源!_打架异常行为识别




用姿态加目标检测结合的方式,效果是很不错的,不过一些这样类似Two stage的方案,速度较慢(也有很多实时的),同样有着一些不能通过解决时间上下文的问题。

即:摔倒检测 我们正常是应该有一个摔倒过程,才能被判断为摔倒的,而不是人倒下的就一定是摔倒(纯目标检测弊病)

运动检测 比如引体向上,和高抬腿计数,球类运动,若是使用目标检测做,那么会出现什么问题呢? 引体向上无法实现动作是否规范(当然可以通过后处理判断下巴是否过框,效果是不够人工智能的),高抬腿计数,目标检测是无法计数的,判断人物的球类运动,目标检测是有很大的误检的:第一种使用球检测,误检很大,第二种使用打球手势检测,遇到人物遮挡球类,就无法识别目标,在标注上也需要大量数据



神经网络使用的是这两个月开源的实时动作序列强分类神经网络:realtimenet 。

我的github将收集 所有的上述说到的动作序列视频数据,训练出能实用的检测任务:目前实现了手势控制的检测,等等,大家欢迎关注公众号,后续会接着更新。




本人做的没转gif,所以大家可以看看其他的演示效果图,跟我的是几乎一样的~ 只是训练数据不同


一、 基本过程和思想


(1) 对每个视频(训练和测试视频)以一定的FPS截出视频帧(jpegs)保存为训练集和测试集,将对图像的分类性能作为所对应视频的分类性能


(4) 训练完成后载入模型对test set内所有的视频帧进行检查验证,得出全测试集上的top1准确率和top5准确率输出。


二 、视频理解还有哪些优秀框架

第一个 就是我github这个了,比较方便,但不敢排前几,因为没有什么集成,

然后MMaction ,就是视频理解框架了,众所周知,他们家的东西很棒





体验官方的一些模型 (模型我已经放在里面了)

pip install -r requirements.txt


  1. resources
  2. ├── backbone
  3. │ ├── strided_inflated_efficientnet.ckpt
  4. │ └── strided_inflated_mobilenet.ckpt
  5. ├── fitness_activity_recognition
  6. │ └── ...
  7. ├── gesture_recognition
  8. │ └── ...
  9. └── ...

首先,请试用我们提供的演示。在sense/examples目录中,您将找到3个Python脚本, run_gesture_recognition.py ,健身_跟踪器 run_fitness_tracker.py .py,并运行卡路里_估算 run_calorie_estimation .py. 启动每个演示就像在终端中运行脚本一样简单,如下所述。


cd examples/

python run_gesture_recognition.py


python examples/run_fitness_tracker.py --weight=65 --age=30 --height=170 --gender=female
  1. --camera_id=CAMERA_ID ID of the camera to stream from
  2. --path_in=FILENAME Video file to stream from. This assumes that the video was encoded at 16 fps.


python examples/run_calorie_estimation.py --weight=65 --age=30 --height=170 --gender=female


首先 clone一下我的github,或者原作者github,

然后自己录制几个视频,比如我这里capture 一个类别,录制了几个视频,可以以MP4 或者avi后缀,再来个类别,再录制一些视频,以名字为类别。


cd tools\sense_studio\sense_studio.py






点击一下start new project




然后点击create project 即可制作数据。





我们在sense_studio 文件夹下,新建一个文件夹:我叫他cvdemo1

然后新建两个文件夹:videos_train 和videos_valid 里面存放的capture是你的类别名字的数据集,capture存放相关的训练集,click存放click的训练集,同样的videos_valid 存放验证集,

在cvdemo1文件夹下新建project_config.json ,里面写什么呢? 可以复制我的下面的代码:

  1. {
  2. "name": "cvdemo1",
  3. "date_created": "2021-02-03",
  4. "classes": {
  5. "capture": [
  6. "capture",
  7. "capture"
  8. ],
  9. "click": [
  10. "click",
  11. "click"
  12. ]
  13. }
  14. }

里面的name 改成你的文件夹名字即可。


python train_classifier.py 你可以将main中修改一下。

将path in修改成我们的训练数据地址,即可,其他的修改不多,就按照我的走即可,

  1. # Parse arguments
  2. # args = docopt(__doc__)
  3. path_in = './sense_studio/cvdemo1/'
  4. path_out = path_in
  5. os.makedirs(path_out, exist_ok=True)
  6. use_gpu = True
  7. path_annotations_train = None
  8. path_annotations_valid =None
  9. num_layers_to_finetune = 9
  10. temporal_training = False
  11. # Load feature extractor
  12. feature_extractor = feature_extractors.StridedInflatedEfficientNet()
  13. checkpoint = torch.load('../resources/backbone/strided_inflated_efficientnet.ckpt')
  14. feature_extractor.load_state_dict(checkpoint)
  15. feature_extractor.eval()
  16. # Get the require temporal dimension of feature tensors in order to
  17. # finetune the provided number of layers.
  18. if num_layers_to_finetune > 0:
  19. num_timesteps = feature_extractor.num_required_frames_per_layer.get(-num_layers_to_finetune)
  20. if not num_timesteps:
  21. # Remove 1 because we added 0 to temporal_dependencies
  22. num_layers = len(feature_extractor.num_required_frames_per_layer) - 1
  23. raise IndexError(f'Num of layers to finetune not compatible. '
  24. f'Must be an integer between 0 and {num_layers}')
  25. else:
  26. num_timesteps = 1



  1. # Parse arguments
  2. # args = docopt(__doc__)
  3. camera_id = 0
  4. path_in = None
  5. path_out = None
  6. custom_classifier = './sense_studio/cvdemo1/'
  7. title = None
  8. use_gpu = True
  9. # Load original feature extractor
  10. feature_extractor = feature_extractors.StridedInflatedEfficientNet()
  11. feature_extractor.load_weights_from_resources('../resources/backbone/strided_inflated_efficientnet.ckpt')
  12. # feature_extractor = feature_extractors.StridedInflatedMobileNetV2()
  13. # feature_extractor.load_weights_from_resources(r'../resources\backbone\strided_inflated_mobilenet.ckpt')
  14. checkpoint = feature_extractor.state_dict()
  15. # Load custom classifier
  16. checkpoint_classifier = torch.load(os.path.join(custom_classifier, 'classifier.checkpoint'))
  17. # Update original weights in case some intermediate layers have been finetuned
  18. name_finetuned_layers = set(checkpoint.keys()).intersection(checkpoint_classifier.keys())
  19. for key in name_finetuned_layers:
  20. checkpoint[key] = checkpoint_classifier.pop(key)
  21. feature_extractor.load_state_dict(checkpoint)
  22. feature_extractor.eval()
  23. print('[debug] net:', feature_extractor)
  24. with open(os.path.join(custom_classifier, 'label2int.json')) as file:
  25. class2int = json.load(file)
  26. INT2LAB = {value: key for key, value in class2int.items()}
  27. gesture_classifier = LogisticRegression(num_in=feature_extractor.feature_dim,
  28. num_out=len(INT2LAB))
  29. gesture_classifier.load_state_dict(checkpoint_classifier)
  30. gesture_classifier.eval()
  31. print(gesture_classifier)




同样的,我们使用的是使用efficienct 来做的特征,你也可以改成mobilenet 来做,有示例代码,就是训练的时候,用mobilenet ,检测的时候也是,只需要修改几行代码即可。

efficienct 提取特征部分代码:

  1. class StridedInflatedEfficientNet(StridedInflatedMobileNetV2):
  2. def __init__(self):
  3. super().__init__()
  4. self.cnn = nn.Sequential(
  5. ConvReLU(3, 32, 3, stride=2),
  6. InvertedResidual(32, 24, 3, spatial_stride=1),
  7. InvertedResidual(24, 32, 3, spatial_stride=2, expand_ratio=6),
  8. InvertedResidual(32, 32, 3, spatial_stride=1, expand_ratio=6, temporal_shift=True),
  9. InvertedResidual(32, 32, 3, spatial_stride=1, expand_ratio=6),
  10. InvertedResidual(32, 32, 3, spatial_stride=1, expand_ratio=6),
  11. InvertedResidual(32, 56, 5, spatial_stride=2, expand_ratio=6),
  12. InvertedResidual(56, 56, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True, temporal_stride=True),
  13. InvertedResidual(56, 56, 5, spatial_stride=1, expand_ratio=6),
  14. InvertedResidual(56, 56, 5, spatial_stride=1, expand_ratio=6),
  15. InvertedResidual(56, 112, 3, spatial_stride=2, expand_ratio=6),
  16. InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6, temporal_shift=True),
  17. InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6),
  18. InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6),
  19. InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6, temporal_shift=True, temporal_stride=True),
  20. InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6),
  21. InvertedResidual(112, 160, 5, spatial_stride=1, expand_ratio=6),
  22. InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
  23. InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6),
  24. InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6),
  25. InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
  26. InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6),
  27. InvertedResidual(160, 272, 5, spatial_stride=2, expand_ratio=6),
  28. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
  29. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
  30. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
  31. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
  32. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
  33. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
  34. InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
  35. InvertedResidual(272, 448, 3, spatial_stride=1, expand_ratio=6),
  36. ConvReLU(448, 1280, 1)
  37. )

这个InvertedResidual 在这,

  1. class InvertedResidual(nn.Module): # noqa: D101
  2. def __init__(self, in_planes, out_planes, spatial_kernel_size=3, spatial_stride=1, expand_ratio=1,
  3. temporal_shift=False, temporal_stride=False, sparse_temporal_conv=False):
  4. super().__init__()
  5. assert spatial_stride in [1, 2]
  6. hidden_dim = round(in_planes * expand_ratio)
  7. self.use_residual = spatial_stride == 1 and in_planes == out_planes
  8. self.temporal_shift = temporal_shift
  9. self.temporal_stride = temporal_stride
  10. layers = []
  11. if expand_ratio != 1:
  12. # Point-wise expansion
  13. stride = 1 if not temporal_stride else (2, 1, 1)
  14. if temporal_shift and sparse_temporal_conv:
  15. convlayer = SteppableSparseConv3dAs2d
  16. kernel_size = 1
  17. elif temporal_shift:
  18. convlayer = SteppableConv3dAs2d
  19. kernel_size = (3, 1, 1)
  20. else:
  21. convlayer = nn.Conv2d
  22. kernel_size = 1
  23. layers.append(ConvReLU(in_planes, hidden_dim, kernel_size=kernel_size, stride=stride,
  24. padding=0, convlayer=convlayer))
  25. layers.extend([
  26. # Depth-wise convolution
  27. ConvReLU(hidden_dim, hidden_dim, kernel_size=spatial_kernel_size, stride=spatial_stride,
  28. groups=hidden_dim),
  29. # Point-wise mapping
  30. nn.Conv2d(hidden_dim, out_planes, 1, 1, 0),
  31. # nn.BatchNorm2d(out_planes)
  32. ])
  33. self.conv = nn.Sequential(*layers)
  34. def forward(self, input_): # noqa: D102
  35. output_ = self.conv(input_)
  36. residual = self.realign(input_, output_)
  37. if self.use_residual:
  38. output_ += residual
  39. return output_
  40. def realign(self, input_, output_): # noqa: D102
  41. n_out = output_.shape[0]
  42. if self.temporal_stride:
  43. indices = [-1 - 2 * idx for idx in range(n_out)]
  44. return input_[indices[::-1]]
  45. else:
  46. return input_[-n_out:]


  1. def extract_features(path_in, net, num_layers_finetune, use_gpu, num_timesteps=1):
  2. # Create inference engine
  3. inference_engine = engine.InferenceEngine(net, use_gpu=use_gpu)
  4. # extract features
  5. for dataset in ["train", "valid"]:
  6. videos_dir = os.path.join(path_in, f"videos_{dataset}")
  7. features_dir = os.path.join(path_in, f"features_{dataset}_num_layers_to_finetune={num_layers_finetune}")
  8. video_files = glob.glob(os.path.join(videos_dir, "*", "*.avi"))
  9. print(f"\nFound {len(video_files)} videos to process in the {dataset}set")
  10. for video_index, video_path in enumerate(video_files):
  11. print(f"\rExtract features from video {video_index + 1} / {len(video_files)}",
  12. end="")
  13. path_out = video_path.replace(videos_dir, features_dir).replace(".mp4", ".npy")
  14. if os.path.isfile(path_out):
  15. print("\n\tSkipped - feature was already precomputed.")
  16. else:
  17. # Read all frames
  18. compute_features(video_path, path_out, inference_engine,
  19. num_timesteps=num_timesteps, path_frames=None, batch_size=16)
  20. print('\n')


  1. def generate_data_loader(dataset_dir, features_dir, tags_dir, label_names, label2int,
  2. label2int_temporal_annotation, num_timesteps=5, batch_size=16, shuffle=True,
  3. stride=4, path_annotations=None, temporal_annotation_only=False,
  4. full_network_minimum_frames=MODEL_TEMPORAL_DEPENDENCY):
  5. # Find pre-computed features and derive corresponding labels
  6. tags_dir = os.path.join(dataset_dir, tags_dir)
  7. features_dir = os.path.join(dataset_dir, features_dir)
  8. labels_string = []
  9. temporal_annotation = []
  10. if not path_annotations:
  11. # Use all pre-computed features
  12. features = []
  13. labels = []
  14. for label in label_names:
  15. feature_temp = glob.glob(f'{features_dir}/{label}/*.npy')
  16. features += feature_temp
  17. labels += [label2int[label]] * len(feature_temp)
  18. labels_string += [label] * len(feature_temp)
  19. else:
  20. with open(path_annotations, 'r') as f:
  21. annotations = json.load(f)
  22. features = ['{}/{}/{}.npy'.format(features_dir, entry['label'],
  23. os.path.splitext(os.path.basename(entry['file']))[0])
  24. for entry in annotations]
  25. labels = [label2int[entry['label']] for entry in annotations]
  26. labels_string = [entry['label'] for entry in annotations]
  27. # check if annotation exist for each video
  28. for label, feature in zip(labels_string, features):
  29. classe_mapping = {0: "counting_background",
  30. 1: f'{label}_position_1', 2:
  31. f'{label}_position_2'}
  32. temporal_annotation_file = feature.replace(features_dir, tags_dir).replace(".npy", ".json")
  33. if os.path.isfile(temporal_annotation_file):
  34. annotation = json.load(open(temporal_annotation_file))["time_annotation"]
  35. annotation = np.array([label2int_temporal_annotation[classe_mapping[y]] for y in annotation])
  36. temporal_annotation.append(annotation)
  37. else:
  38. temporal_annotation.append(None)
  39. if temporal_annotation_only:
  40. features = [x for x, y in zip(features, temporal_annotation) if y is not None]
  41. labels = [x for x, y in zip(labels, temporal_annotation) if y is not None]
  42. temporal_annotation = [x for x in temporal_annotation if x is not None]
  43. # Build dataloader
  44. dataset = FeaturesDataset(features, labels, temporal_annotation,
  45. num_timesteps=num_timesteps, stride=stride,
  46. full_network_minimum_frames=full_network_minimum_frames)
  47. data_loader = torch.utils.data.DataLoader(dataset, shuffle=shuffle, batch_size=batch_size)
  48. return data_loader


这个问题,主要是通过 系列时间内帧间图像组合成一个序列,送到网络中进行分类的,可以在许多地方找到相关参数,比如 display.py :

  1. class DisplayClassnameOverlay(BaseDisplay):
  2. """
  3. Display recognized class name as a large video overlay. Once the probability for a class passes the threshold,
  4. the name is shown and stays visible for a certain duration.
  5. """
  6. def __init__(
  7. self,
  8. thresholds: Dict[str, float],
  9. duration: float = 2.,
  10. font_scale: float = 3.,
  11. thickness: int = 2,
  12. border_size: int = 50,
  13. **kwargs
  14. ):
  15. """
  16. :param thresholds:
  17. Dictionary of thresholds for all classes.
  18. :param duration:
  19. Duration in seconds how long the class name should be displayed after it has been recognized.
  20. :param font_scale:
  21. Font scale factor for modifying the font size.
  22. :param thickness:
  23. Thickness of the lines used to draw the text.
  24. :param border_size:
  25. Height of the border on top of the video display. Used for correctly centering the displayed class name
  26. on the video.
  27. """
  28. super().__init__(**kwargs)
  29. self.thresholds = thresholds
  30. self.duration = duration
  31. self.font_scale = font_scale
  32. self.thickness = thickness
  33. self.border_size = border_size
  34. self._current_class_name = None
  35. self._start_time = None
  36. def _get_center_coordinates(self, img: np.ndarray, text: str):
  37. textsize = cv2.getTextSize(text, FONT, self.font_scale, self.thickness)[0]
  38. height, width, _ = img.shape
  39. height -= self.border_size
  40. x = int((width - textsize[0]) / 2)
  41. y = int((height + textsize[1]) / 2) + self.border_size
  42. return x, y
  43. def _display_class_name(self, img: np.ndarray, class_name: str):
  44. pos = self._get_center_coordinates(img, class_name)
  45. put_text(img, class_name, position=pos, font_scale=self.font_scale, thickness=self.thickness)
  46. def display(self, img: np.ndarray, display_data: dict):
  47. now = time.perf_counter()
  48. if self._current_class_name and now - self._start_time < self.duration:
  49. # Keep displaying the same class name
  50. self._display_class_name(img, self._current_class_name)
  51. else:
  52. self._current_class_name = None
  53. for class_name, proba in display_data['sorted_predictions']:
  54. if class_name in self.thresholds and proba > self.thresholds[class_name]:
  55. # Display new class name
  56. self._display_class_name(img, class_name)
  57. self._current_class_name = class_name
  58. self._start_time = now
  59. break
  60. return img


