当前位置:   article > 正文

百度自动驾驶apollo源码解读4:/cyber/task 模块_apollo 的task

apollo 的task

在这里就不贴源代码,太占空间了,源码连接:https://github.com/ApolloAuto/apollo/tree/master/cyber/task

cyber下面的task包是使用cyber协程的入口包,共有5个文件如下:
BUILD文件:构建文件
task_manager.h文件:任务管理头文件
task_manager.cc文件:任务管理主文件
task_test.cc文件:任务管理测试文件

task.h文件对task_manager封装使用的文件
构建文件和测试文件不再赘述,和其他包类似,比较容易理解,先来看看task.h文文件,该文件主要写了在apollo::cyber命名空间下的4个全局函数
Async函数:使用cyber协程的函数,参数要传入函数名字以及函数的实参,返回的是std::future类型。主体内容:判断当前是否是实时模式,如果是则将任务加入到taskmanager任务队列里面等待执行,如果否则使用std::async创建该任务(系统底层会新建线程)
Yield函数:判断是否存在协程,如果存在在放弃当前协程执行时间片,否则放弃当前线程执行时间片
SleepFor函数:执行休眠时间,参数类型std::chrono::duration,会判断是否存在协程,存在则协程休眠,否则线程休眠,实测的时候感觉休眠时间并不准确,原因待研究
USleep函数:同Sleep函数,参数类型useconds_t即微秒,执行体上看两者应该是完全一样的,只不过参数不同,执行效果亦不准确
直接使用linux自带的usleep函数和使用上述两个函数有什么差别呢?
使用linux自带的休眠函数,协程会一直固定在某个线程上面执行,并不会在cyber的线程池上切换。使用SleepFor或者USleep函数休眠之后唤醒可能在线程池里面的其他线程执行,那么如果部执行休眠呢?答案是也会固定在某个线程上面,这样来看,线程的切换执行协程原因是cyber检测到了协程的阻塞,也让整个协程休眠了,执行系统的休眠函数由于没有做hook之类的操作cyber检测不到休眠操作,在cyber看来该协程一直在运行,所以没有切换线程去执行协程。

task_manager.h文件和task_manager.cc文件组成的TaskManager类对协程任务进行添加和执行的封装体
该类使用了DECLARE_SINGLETON宏进行单例化,特点不用在头文件里面手写构造函数,禁用了拷贝构造函数和赋值构造函数,提供一个静态的Instance函数返回实例,提供一个静态的CleanUp对该类进行资源清理,需要创建一个Shutdown函数用以清理自定义的成员资源。观测一下TaskManager有哪些成员及含义
num_threads_:实际干活的线程个数,该值从scheduler包获取,用以创建和定义对应的同等数量的tasks_。
task_queue_size_:本类写死的值1000,含义是任务队列的数量,实际等待任务超过这个数量会怎么样?源代码没写这个逻辑,大概猜测是任务超过1000概率很小,甚至不可能,整体来看这个值没必要做成类成员
stop_:服务状态标志,正常工作判断是falsse状态,Shutdown函数会将该值置为true
tasks_:类型为std::vector<uint64_t>,vector里面存应该是协程id,数量等同于线程数量,代码上来看协程数量写死了等同于线程数量(其实感觉这个满难理解的,协程线程之间的关系不应该是M->N之间的关系吗?就像Go的GPM模型那样,协程数量可以成千上万个吗?,此处清晰的看到了协程数量等同于线程数量:根据默认配置文件为16
task_queue_:任务队列,是一个等待执行体的集合,说直白一点,一大堆等待执行的函数,添加任务的时候加入到这个队列里面,之后从队列里面拿出来并执行
构造函数逻辑:初始化task_queue_和task_queue_size_成员,num_threads_从scheduler包里取出来,创建RoutineFactory回调函数为不断的从task_queue_取出执行体进行执行,取不到了就挂起协程。创建等同线程数量的协程,并用tasks_记录所有的协程id,这些id会在Enqueue里面用到
析构函数逻辑:调用Shutdown函数
Shutdown函数:将stop_置为true,清除掉所有协程
Enqueue函数:用std::packaged_task将参里面的函数和参数封装,加入到task_queue_队列里面,然后遍历tasks_,scheduler对每一个task进行NotifyTask。有一说一这个函数理解起来难度挺大的,用packaged_task封装目的是什么?scheduler::Instance()->NotifyTask(task);的意义什么?目前尚不能完全理解,后期理解了再更新

附带一份测试代码main.cc

  1. #include <iostream>
  2. #include "cyber/cyber.h"
  3. int id = 1;
  4. void fun() {
  5. while (apollo::cyber::OK()) {
  6. //并发检测代码
  7. static std::set<decltype(std::this_thread::get_id())> thread_set;
  8. thread_set.insert(std::this_thread::get_id());
  9. std::cout << id++ << " "
  10. << "thread_id:" << std::this_thread::get_id()
  11. << " thread cout:" << thread_set.size();
  12. static std::set<apollo::cyber::croutine::CRoutine*> croutine_set;
  13. apollo::cyber::croutine::CRoutine* r =
  14. apollo::cyber::croutine::CRoutine::GetCurrentRoutine();
  15. croutine_set.insert(r);
  16. std::cout << " croutine_id:" << r->id() << " croutine name:" << r->name()
  17. << " processor_id:" << r->processor_id()
  18. << " croutine count: " << croutine_set.size() << std::endl;
  19. //如果使用系统usleep函数,那么只会有一个线程,使用下面两个cyber提供的睡眠,则是16线程执行任务
  20. //usleep(1000 * 100);
  21. //std::this_thread::sleep_for(std::chrono::microseconds{1000*100});
  22. // cyber这个sleep感觉睡眠时间不正确,参数小于1000*1000的时候大概率会睡眠0.333秒
  23. apollo::cyber::USleep(1000);
  24. // SleepFor也是不准确,大概率都会睡眠1秒
  25. //apollo::cyber::SleepFor(std::chrono::microseconds(1000));
  26. }
  27. }
  28. int main(int argc, char* argv[]) {
  29. apollo::cyber::Init(argv[0]);
  30. // #if 0
  31. // //方式一:使用CreateTask自定义协程名字(好像只有在cyber库里面使用,modules模块没人用)
  32. // //效果:16个线程随即线程执行task
  33. // apollo::cyber::scheduler::Instance()->CreateTask(fun, "test_cyber");
  34. // #else
  35. // //方式二:Async函数创建协程(modules模块大量使用)
  36. // //效果:16个线程随即线程执行task,协程名字:/internal/task数字
  37. // apollo::cyber::Async(fun);
  38. // #endif
  39. for (int a = 1; a <= 10; a++) {
  40. apollo::cyber::Async(fun);
  41. }
  42. apollo::cyber::WaitForShutdown();
  43. return 0;
  44. }

对应的BUILD文件:

  1. load("@rules_cc//cc:defs.bzl", "cc_binary")
  2. load("//tools/install:install.bzl", "install")
  3. package(default_visibility = ["//visibility:public"])
  4. cc_binary(
  5. name = "tt",
  6. includes = [
  7. ".",
  8. ],
  9. linkopts = [
  10. "-pthread",
  11. ],
  12. srcs = glob(
  13. ["*.cc"],
  14. ),
  15. deps = [
  16. "//cyber",
  17. ],
  18. )

编译和运行的命令:

bazel run  //modules/tools/tt:tt

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/879239
推荐阅读
相关标签
  

闽ICP备14008679号