当前位置:   article > 正文

n个线程并发去完成m个任务的C++11实现_n个线程执行m个任务

n个线程执行m个任务

有一个需求,有m个计算任务,每个计算任务都有结果,仅有n个线程,让这n个线程去完成这m个任务,并将计算结果返回。

其中n<m,计算任务用vector<function<T> >来表示,返回结果假设统一为int,用一个vector<int>来记录各个计算任务的结果。

每个任务的计算量不同,有的很长,有的很短,要求尽可能快的完成所有计算任务。

 

思路

大体的思路都知道,多线程并发执行多个任务。难点在于如何进行调度,如何进行任务分配。

错误的想法:

1、做完1个任务就等待分配。

2、用队列。做完一个任务出队列。

每个线程执行完一个任务就去调用join之类的方法然后去给其分配下一个新任务。这是一个死胡同。

这种想法是想人为去设计一个算法然后在不同时刻给不同线程分配新的任务。不合适。

行得通的想法:

1、做完1个任务看还有没有可做的任务,有就拿过来继续做。

2、用数组,多个线程共享,当前未完成任务索引,多线程共享,各个线程区别在于取不同下标的任务完成得到计算结果后放到对应下标的结果数组中。

n个线程均去任务队列中取任务,然后执行,执行完当前任务后,将任务结果放到结果数组中,再执行下一个待执行的任务。

也即任一个线程在执行完一个任务后并不直接退出join之类,而是判断是否还有任务。如果还有任务,就执行,没有就退出。

这样就避免了被动分配任务(站在线程的角度,一个线程被分配一个任务)的问题,而让线程主动去申请任务,直到所有任务均完成。

这样下来,多个任务的执行函数逻辑都是一样的。共同从数组中取任务,将结果写到共同的结果数组中。由于取的是数组中第几个任务可以记录下来,那将其放到结果数组对应位置即可。

代码

头文件

  1. #ifndef CONCURRENCY_H
  2. #define CONCURRENCY_H
  3. #include <bits/stdc++.h>
  4. using namespace std;
  5. template<class T>
  6. class Concurrency
  7. {
  8. vector<function<T>> &m_tasks__; ///<reference to tasks vector
  9. vector<int> &m_results__; ///<reference to result vector
  10. atomic<int> m_aiCurJobIdx__;
  11. int work();
  12. static int payload();
  13. public:
  14. Concurrency(vector<function<T> > &vt,vector<int> &vr,int num);
  15. static void test();
  16. };
  17. #endif // CONCURRENCY_H

源文件

  1. #include "Concurrency.h"
  2. #include "common.h"
  3. namespace _concurrency {
  4. static ostringstream logs;
  5. static mutex mut;
  6. }
  7. template<class T>
  8. int Concurrency<T>::work()
  9. {
  10. static atomic<int> id(0);
  11. int myid=id++;
  12. int idx;
  13. atomic<int> count(0); //<if not atomic ++ op may be optimized after oss<<
  14. ostringstream oss;
  15. while((idx=m_aiCurJobIdx__++)<m_tasks__.size()){
  16. ++count;
  17. m_results__[idx]=m_tasks__[idx]();
  18. // oss<<"thread id:"<<myid<<" done job "<<idx<<endl;
  19. }
  20. oss<<"-------"<<myid<<" done "<<count.load(memory_order_acquire)<<" jobs"<<endl<<endl;
  21. lock_guard<mutex> lock(_concurrency::mut); ///<RAII
  22. _concurrency::logs<<oss.str();
  23. return myid;
  24. }
  25. template<class T>
  26. int Concurrency<T>::payload()
  27. {
  28. static atomic<int> sai(0);
  29. int ret=sai++;
  30. int sleepTime=random()%1000;
  31. cout<<"sleep "<<sleepTime<<"ms,return val:"<<ret<<endl;
  32. this_thread::sleep_for(chrono::milliseconds(sleepTime));
  33. return ret;
  34. }
  35. template<class T>
  36. Concurrency<T>::Concurrency(vector<function<T> > &vt, vector<int> &vr, int num):m_tasks__(vt),m_results__(vr),m_aiCurJobIdx__(0)
  37. {
  38. vector<thread> vth;
  39. for(int i=0;i<num;++i){
  40. vth.emplace_back(thread(&Concurrency::work,this));
  41. }
  42. for(auto &t:vth){
  43. t.join();
  44. }
  45. }
  46. template<class T>
  47. void Concurrency<T>::test()
  48. {
  49. int nJobNum=100,nThreadNum=5;
  50. vector<function<T> > jobs(nJobNum,Concurrency::payload);
  51. vector<int> results(nJobNum,-1);
  52. Concurrency(jobs,results,nThreadNum);
  53. print_arr_raw(results,results.size());
  54. cout<<endl;
  55. cout<<_concurrency::logs.str().c_str()<<endl;
  56. }

直接在外部调用test()方法即可看到结果。

注意:

1、由于模板方法实现放在了cpp文件中,所以在外面使用的时候直接include源文件。简单粗暴的方法实现模板类头源文件分离。也有别的繁琐一些的方法,比如在源文件中预先声明几个特化模板实例,确保编译器能生成对应特化的模板实例类。

2、多线程中原子操作。线程共享变量所依赖的局部变量,也需要加上保护,否则会出现共享变量会取到一个旧值的现象。比如

  1. int count=0;
  2. ++count;
  3. oss<<"-------"<<myid<<" done "<<count<<" jobs"<<endl<<endl;

count是局部变量,oss是线程间共享变量,count不加保护,oss取到的仍有可能是旧值0。

3、优先用锁,而不用mutex。因为mutex比较原始,不能自己释放,必须显式调用unlock才行,当unlock之前抛出了异常,或者没有unlock,那这个资源就锁死了。用lock_guard,这是一种RAII(Resource Acquisition Is Initialization)的实体实例。在构造函数中加锁,在析构函数中解锁。

4、由于直接#inlcude源文件。所以源文件中的全局变量,或者加static表示仅在文件内作用域,保险的方法还是加上命名空间,防止重名或者出现变量重复定义的错误。

 

工程与代码参见 https://github.com/junbujianwpl/LeetcodePro/blob/master/Concurrency.cpp

 

 

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号