当前位置:   article > 正文

OriginBot家庭助理计划之宝宝看护助手

OriginBot家庭助理计划之宝宝看护助手

这篇博客主要讲述了如何通过OriginBot来看护宝宝,当宝宝的脸不在摄像头的范围之内时,发送消息到钉钉群组,通知家人及时查看。


前言

我在上个月有了宝宝,为了方便照看宝宝,就买了一个带有宝宝看护功能的摄像头,但是产品做的不怎么样,最重要的脸部遮挡功能用不了,后来就退货了。退货后就萌生了自己用OriginBot做一个类似功能的想法,于是就有了这篇博客~


功能流程图(架构图)

具体的流程或者说架构如下:

其实整体也不复杂,OriginBot上有一个MIPI摄像头,然后利用地平线TogetheROS.Bot的人体检测和跟踪来实时检测摄像头中有没有脸部,如果没有脸部,就向后端发送一条数据,然后后端会向钉钉群组发消息告诉家人。钉钉群组里面需要事先创建一个webhook。

下面会分为人体检测、判断有无脸部、后端操作三个部分来记录。

人体检测

这一部分用的是地平线TogetheROS.Bot现成的功能,启动OriginBot之后,在命令行中运行如下命令:

# 配置tros.b环境source /opt/tros/setup.bash# 从tros.b的安装路径中拷贝出运行示例需要的配置文件。cp -r /opt/tros/lib/mono2d_body_detection/config/ .# 配置MIPI摄像头export CAM_TYPE=mipi# 启动launch文件ros2 launch mono2d_body_detection mono2d_body_detection.launch.py

 此时可以通过http://IP:8000查看检测效果,这个模块检测了人体、人头、人脸、人手检测框、检测框类型和目标跟踪ID以及人体关键点等,我只取其中的人脸部分,当然了,以后也可以增加如人体检测等。

上面的命令运行之后,在OriginBot上执行ros2 topic list,应该会有一个topic叫做hobot_mono2d_body_detection, 这个就是我们需要的,我们会订阅这个话题,然后分析其中发送数据来判断有没有脸部

判断摄像头中有没有脸部

按照TogetheROS.Bot的文档说明,hobot_mono2d_body_detection的消息类型是ai_msgs.msg.PerceptionTargets, 具体如下:

# 感知结果# 消息头std_msgs/Header header# 感知结果的处理帧率# fps val is invalid if fps is less than 0int16 fps# 性能统计信息,比如记录每个模型推理的耗时Perf[] perfs# 感知目标集合Target[] targets# 消失目标集合Target[] disappeared_targets

其中的disappeared_targets是我们关注的重点,如果face出现在disappeared_targets中,就说明之前是有脸部的,但是现在没有了,这个时候就要向后端发出数据进一步处理。

我为了判断有无脸部,写了一个Node,代码如下:

import rclpyfrom rclpy.node import Nodefrom ai_msgs.msg import PerceptionTargetsfrom cv_bridge import CvBridgeimport timefrom api_connection import APIConnectionBabyMonitorMapping = {    # 这里的k-v要根据后端Django中的值来确定    "face": "看不到脸",    "body": "不在摄像头范围内",}class FaceDetectionListener(Node):    """    检测宝宝的脸是不是在摄像头中    """    def __init__(self):        super().__init__("face_detection")        self.bridge = CvBridge()        self.subscription = self.create_subscription(            PerceptionTargets, "hobot_mono2d_body_detection", self.listener_callback, 10        )        self.conn = APIConnection()        self.timer = time.time()        self.counter = 0    def listener_callback(self, msg):        targets = msg.targets        disappeared_targets = msg.disappeared_targets        targets_list = []        disappeared_targets_list = []        if disappeared_targets:            for item in disappeared_targets:                disappeared_targets_list.append(item.rois[0].type)        if targets:            for item in targets:                targets_list.append(item.rois[0].type)        print(f"检测到的对象如下:{targets_list}")        print(f"消失的对象如下:{disappeared_targets_list}")        if disappeared_targets_list:            self.sending_notification(disappeared_targets_list)    def sending_notification(self, disappeared_targets_list):        for item in disappeared_targets_list:            if BabyMonitorMapping.get(item):                event = BabyMonitorMapping.get(item)                if self.counter == 0:                    # 这里baby的ID是模拟的,应该去数据库中查                    data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"}                     self.conn.post_data(item=data, api="api/monitor/face-detection/")                    self.counter += 1                    self.timer = time.time()                else:                    if time.time() - self.timer >= 60.0:                        # 60秒不重复发消钉钉消息                        data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"}                        self.conn.post_data(item=data, api="api/monitor/face-detection/")                        self.timer = time.time()                        self.counter += 1def main(args=None):    rclpy.init(args=args)    try:        face_detection_listener = FaceDetectionListener()        rclpy.spin(face_detection_listener)    except KeyboardInterrupt:        print("终止运行")    finally:        face_detection_listener.destroy_node()        rclpy.shutdown()if __name__ == "__main__":    main()

代码整体不难,但还是做几点必要的说明:
1. BabyMonitorMapping
这个字典的作用是把TogetheROS.Bot里面的字段跟后端的字段做一个映射关系,方便后面使用

2. API调用
代码中有两行如下:

data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"} self.conn.post_data(item=data, api="api/monitor/face-detection/")

这里的uri和data的格式由后端决定,不必细究,想了解细节的可以看后端代码。

3.APIConnection
APIConnection是用于请求的API的一个封装类,代码如下:

"""API CONNECTION FOR IMPORTING WRAPPER"""import jsonimport loggingimport requestsimport envslogging.basicConfig(    format="%(asctime)s %(levelname)-8s %(message)s",    level=logging.INFO,    datefmt="%Y-%m-%d %H:%M:%S",)class APIConnection:    """    Api Connection    """    def __init__(self):        self.api_url = envs.API_URL        self.token = None        self.headers = {            "Content-Type": "application/json",            "Cache-Control": "no-cache",        }        self.request_jwt()    def request_jwt(self):        """        Request JWT token.        """        logging.info("Requesting JWT..")        api_url = f"{self.api_url}api/token/"        data = {            "username": envs.SCRIPT_USER,            "password": envs.SCRIPT_PASSWORD,        }        res = requests.post(api_url, data=json.dumps(data), headers=self.headers)        if res.status_code == 200:            data = res.json()            self.token = data["access"]            self.headers["Authorization"] = f"Bearer {self.token}"        else:            logging.error(                f"Failed to obtain JWT. Status code: {res.status_code}, Message: {res.text}"            )    def upload_video(self, api, file):        """        post data        :param item: items to be posted in json format        :param api: path of endpoint        """        api_url = f"{self.api_url}{api}"        try:            res = requests.post(api_url, files=file, headers=self.headers, timeout=1)            if res.status_code == 401:                self.request_jwt()                self.post_data(api, file)            elif res.status_code in [200, 201]:                logging.info(f"{res.status_code} - video uploaded successfully")                return res.status_code            else:                logging.error(                    f"{res.status_code} - {res.json()}- unable to upload video"                )                return res.status_code        except Exception as err:            logging.error(err)            return 500    def post_data(self, item, api):        """        新建一条数据        """        api_url = f"{self.api_url}{api}"        try:            response = requests.post(                api_url, data=json.dumps(item), headers=self.headers, timeout=1            )            if response.status_code == 403:                self.request_jwt()                self.post_data(item, api)            elif response.status_code not in [200, 201]:                logging.error(                    f"post data to backend failed, \                      status code is {response.status_code}, \                      error message is:\n \                      {response.text}"                )        except Exception as err:            logging.error(                f"post data to backend failed, \                  error message is:\n \                  {err}"            )
点击OriginBot家庭助理计划之宝宝看护助手 - 古月居可查看全文
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/641634
推荐阅读
相关标签
  

闽ICP备14008679号