赞
踩
杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。
本次课程学习 tensorRT 高级-RAII 接口模式下的生产者消费者多 batch 实现
课程大纲可看下面的思维导图
这节课我们利用上节课学到的 RAII + 接口模式对我们的消费者生产者进行封装
我们来看代码
infer.hpp
#ifndef INFER_HPP
#define INFER_HPP
#include <memory>
#include <string>
#include <future>
class InferInterface{
public:
virtual std::shared_future<std::string> forward(std::string pic) = 0;
};
std::shared_ptr<InferInterface> create_infer(const std::string& file);
#endif // INFER_HPP
infer.cpp
#include "infer.hpp" #include <thread> #include <queue> #include <mutex> #include <future> using namespace std; struct Job{ shared_ptr<promise<string>> pro; string input; }; class InferImpl : public InferInterface{ public: virtual ~InferImpl(){ worker_running_ = false; cv_notify_one(); if(worker_thread_.joinable()) worker_thread_.join(); } bool load_model(const string& file){ // 尽量保证资源哪里分配哪里释放,哪里使用,这样使得程序足够简单,而不是太乱 // 线程内传回返回值的问题 promise<bool> pro; worker_running_ = true; worker_thread_ = thread(&InferImpl::worker, this, file, std::ref(pro)); return pro.get_future().get(); } virtual shared_future<string> forward(string pic) override{ // printf("使用 %s 进行推理\n", context_.c_str()); // 往队列抛任务 Job job; job.pro.reset(new promise<string>()); job.input = pic; lock_guard<mutex> l(job_lock_); qjobs_.push(job); // 被动通知,一旦有新的任务需要推理,通知我即可 // 发生通知的家伙 cv_.notify_one(); return job.pro->get_future(); } // 实际执行模型推理的部分 void worker(string file, promise<bool>& pro){ // worker内实现,模型的加载,使用,释放 string context = file; if(context.empty()){ pro.set_value(false); return; }else{ pro.set_value(true); } int max_batch_size = 5; vector<Job> jobs; int batch_id = 0; while(worker_running_){ // 等待接受的家伙 // 在队列取任务并执行的过程 unique_lock<mutex> l(job_lock_); cv_.wait(job_lock_, [&](){ // true 退出等待 // false 继续等待 return !qjobs_.empty() || !worker_running_; }); // 程序发送终止信号 if(!worker_running_) break; while(jobs.size() < max_batch_size && !qjobs_.empty()){ jobs.emplace_back(qjobs_.front()); qjobs.pop(); } // 可以在这里一次拿一批出来,最大拿 maxbatchsize 个 job 进行一次性处理 // jobs inference -> batch inference // 执行 batch 推理 for(int i = 0; i < jobs.size(); ++i){ auto& job = jobs[i]; char result[100]; sprintf(result, "%s : batch-> %d[%d]", job.input.c_str(), batch_id, jobs.size()); job.pro->set_value(result); } batch_id++; jobs.clear(); // 模拟推理耗时 this_thread::sleep_for(chrono::milliseconds(1000)); } // 释放模型 printf("释放: %s\n", context.c_str()); context.clear(); printf("Worker done.\n"); } private: atomic<bool> worker_running_{false}; thread worker_thread_; queue<Job> qjobs_; mutex job_lock_; condition_variable cv_; }; shared_ptr<InferInterface> create_infer(const string& file){ shared_ptr<InferImpl> instance(new Infer()); if(!instance->load_model(file)) instance.reset(); return instance; }
main.cpp
#include "infer.hpp" int main(){ auto infer = create_infer("a"); if(infer == nullptr){ printf("failed.\n"); return -1; } // 串行 // auto fa = infer->forward("A").get(); // auto fb = infer->forward("B").get(); // auto fc = infer->forward("C").get(); // printf("%s\n", fa.c_str()); // printf("%s\n", fb.c_str()); // printf("%s\n", fc.c_str()); // 并行 auto fa = infer->forward("A"); auto fb = infer->forward("B"); auto fc = infer->forward("C"); printf("%s\n", fa.get().c_str()); printf("%s\n", fb.get().c_str()); printf("%s\n", fc.get().c_str()); printf("Program done.\n"); return 0; }
上述示例代码相对复杂,结合了 RAII 和接口模式来实现模拟模型推理,具体是一个消费者-生产者模式的异步批处理机制,我们来简单解读下 infer.cpp 中具体干了些啥(form chatGPT)
1. 数据结构和类定义
2. InferImpl 类的方法和成员
3. 工厂函数
这个示例清晰地展示了如何使用 RAII 和接口模式来实现一个异步批处理机制,同时也展示了如何使用 C++11 的并发特性(如 thread、promise、condition_variable 等)来实现这种机制。
博主对多线程相关的知识不怎么了解,因此疯狂询问 chatGPT,故此做个记录方便下次查看,以下内容来自于博主和 chatGPT 之间的对话
问题1:work_running_ 为什么是 atomic<boll> 类型,为什么不直接使用 bool 类型?什么是 atomic<bool> 类型?
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/73342
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。