#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> using namespace std; mutex mtx; condition_variable cv; bool ready = false; void thread_A(int id) { unique_lock<mutex> lck(mtx); while (!ready) cv.wait(lck); // 等待条件变量 cout << "Thread " << id << " is running\n"; } void thread_B() { cout << "Initializing...\n"; this_thread::sleep_for(chrono::milliseconds(2000)); // 模拟初始化过程 { lock_guard<mutex> lck(mtx); // 自动加锁互斥量 ready = true; // 设置条件变量 cout << "Initialization complete\n"; } cv.notify_all(); // 唤醒所有等待的线程 } int main() { thread t1(thread_A, 1); thread t2(thread_A, 2); thread t3(thread_A, 3); thread t4(thread_B); t1.join(); t2.join(); t3.join(); t4.join(); return 0; }
#include <iostream> #include <thread> // 定义一个无参无返回值的函数 void func() { std::cout << "Hello, C++11 thread!\n"; } int main() { // 创建线程并执行函数 std::thread t(func); // 等待线程结束 t.join(); return 0; }
#include <iostream> #include <thread> #include <mutex> // 共享资源 int cnt = 0; // 定义互斥量 std::mutex mtx; // 定义一个函数,该函数对cnt进行自增操作 void func() { // 创建lock_guard对象,注意lock_guard的生命周期 std::lock_guard<std::mutex> lck(mtx); cnt++; } int main() { std::thread t1(func); std::thread t2(func); t1.join(); t2.join(); std::cout << "cnt: " << cnt << std::endl; return 0; }
除了使用std::atomic类之外,C++11线程库还提供了无锁数据结构(Lock-Free Data Structure)的支持,例如std::atomic_flag、std::atomic*等。这些数据结构可以让程序在不使用锁的情况下实现高度的并发性能。
#include <iostream> #include <thread> #include <atomic> // 定义原子变量 std::atomic<int> cnt(0); // 定义一个函数,该函数对cnt进行自增操作 void func() { cnt++; } int main() { std::thread t1(func); std::thread t2(func); t1.join(); t2.join(); std::cout << "cnt: " << cnt << std::endl; return 0; }
在当今互联网时代,多线程编程已经成为了很多应用程序的必需品。C++ 作为一门多范式的编程语言,自然也提供了丰富的多线程编程接口和库。然而多线程编程也具有一些特殊的问题和挑战,例如线程安全、性能瓶颈、异常处理等等。本文将针对这些问题,通过实例演示 C++ 多线程编程的常见问题和调试技巧、多线程程序的设计和实现方法、多线程程序中的优化技巧、异常处理和资源管理、性能调优方法等方面的基础知识和实践经验。
以下是使用 std::mutex 和 std::condition_variable 保护共享资源的例子,来防止 race condition 问题:
#include <iostream> #include <mutex> #include <thread> #include <vector> class Counter { public: Counter() : value_(0) {} void Increment() { std::lock_guard<std::mutex> lock(mutex_); ++value_; } void Decrement() { std::lock_guard<std::mutex> lock(mutex_); --value_; } int value() const { return value_; } private: int value_; mutable std::mutex mutex_; }; int main() { Counter counter; std::vector<std::thread> threads; for (int i = 0; i < 5; ++i) { threads.push_back(std::thread([&counter](){ for (int j = 0; j < 1000; ++j) { counter.Increment(); } })); } // 等待所有线程执行完毕 for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << std::endl; return 0; }
在调试过程中建议使用线程安全的日志输出函数,例如依赖于 ATOMIC_FLAG_INIT 宏的 std::atomic_flag 类,保证多线程环境下的线程安全输出:
#include <iostream> #include <thread> #include <atomic> #include <string> void Log(const std::string& msg) { static std::atomic_flag lock = ATOMIC_FLAG_INIT; while (lock.test_and_set(std::memory_order_acquire)) {} std::cout << msg << std::endl; lock.clear(std::memory_order_release); } int main() { std::thread t1([]{ Log("Hello from thread 1"); }); std::thread t2([]{ Log("Hello from thread 2"); }); t1.join(); t2.join(); return 0; }
以下是使用 std::queue 和 std::condition_variable 进行多线程数据传输的例子:
在生产者函数中,使用 std::unique_lock 获取互斥锁,向队列中压入数据,并通过 std::condition_variable 的 notify_one() 通知消费者线程进行消费。在消费者函数中,通过 wait() 等待生产者线程通知,并获取队列中的数据进行消费。
#include <iostream> #include <queue> #include <mutex> #include <thread> #include <condition_variable> std::mutex mtx; std::queue<int> q; std::condition_variable cv; void producer() { for (int i = 0; i < 10; i++) { std::unique_lock<std::mutex> lck(mtx); q.push(i); lck.unlock(); cv.notify_one(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } void consumer() { while (true) { std::unique_lock<std::mutex> lck(mtx); cv.wait(lck, []{ return !q.empty(); }); std::cout << q.front() << std::endl; q.pop(); lck.unlock(); } } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; }
#include <iostream> #include <vector> #include <thread> #include <queue> #include <atomic> #include <condition_variable> #include <functional> class ThreadPool { public: ThreadPool(int numThreads) : stop_(false) { for (int i= 0; i < numThreads; ++i) { threads_.emplace_back(std::thread(std::bind(&ThreadPool::threadFunc, this))); } } ~ThreadPool() { { std::unique_lock<std::mutex> lock(mutex_); stop_ = true; } condition_.notify_all(); for (std::thread& t : threads_) { t.join(); } } template <typename F, typename... Args> void enqueue(F&& f, Args&&... args) { auto task = std::make_shared<std::function<void()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); { std::unique_lock<std::mutex> lock(mutex_); tasks_.push([task]{ (*task)(); }); } condition_.notify_one(); } private: void threadFunc() { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(mutex_); condition_.wait(lock, [this]{ return stop_ || !tasks_.empty(); }); if (stop_ && tasks_.empty()) { break; } task = std::move(tasks_.front()); tasks_.pop(); } task(); } } std::vector<std::thread> threads_; std::queue<std::function<void()>> tasks_; std::mutex mutex_; std::condition_variable condition_; std::atomic_bool stop_; }; void fun(int param) { std::cout << "Task #" << param << " running on thread " << std::this_thread::get_id() << std::endl; } int main() { ThreadPool pool(4); for (int i = 0; i < 8; ++i) { pool.enqueue(fun, i); } return 0; }
以下是一个使用 std::lock_guard 和 std::shared_ptr 管理资源的例子:
在示例中使用 std::shared_ptr 来管理一个资源,多个线程共享该资源的指针。使用 std::lock_guard 来保护共享资源,避免多个线程同时访问该资源导致数据竞争。在使用 shared_ptr 的情况下,无需手动释放资源,可以避免资源泄漏。同时程序具有更好的可读性和易于维护性。
#include <iostream> #include <memory> #include <mutex> class Resource { public: Resource() { std::cout << "Resource acquired." << std::endl; } ~Resource() { std::cout << "Resource released." << std::endl; } void DoSomething() { std::cout << "Doing something with the resource." << std::endl; } }; std::mutex mtx; void ThreadFunc(std::shared_ptr<Resource> ptr) { std::lock_guard<std::mutex> lock(mtx); ptr->DoSomething(); } int main() { std::shared_ptr<Resource> ptr(new Resource); std::thread t1(ThreadFunc, ptr); std::thread t2(ThreadFunc, ptr); t1.join(); t2.join(); return 0; }
在调度算法方面,例如使用 work stealing 任务窃取算法,避免某些线程的工作比其他线程更繁重,从而导致 CPU 核心的利用率不均衡。以下是一个使用 C++17 的 std::jthread 实现 work stealing 算法的例子:
在示例中使用 std::deque 存储任务队列,使用 std::mutex 保护共享数据。线程池支持将任务推入队列中,由线程池中的线程抢占式地从任务队列中获取任务并执行。如果某个线程的任务执行完毕,就会去其他线程队列中偷取任务执行,以避免某些线程工作量过大导致 CPU 资源利用率不均衡。
#include <iostream> #include <thread> #include <vector> #include <deque> #include <mutex> class work_stealing_thread_pool { public: using task_type = std::function<void()>; explicit work_stealing_thread_pool(std::size_t num_threads = std::thread::hardware_concurrency()); ~work_stealing_thread_pool(); template <typename F> void submit(F f) { tasks_.push_front(task_type(f)); } private: task_type get_task_from_local_queue(); task_type get_task_from_other_thread_queue(); task_type get_task_from_pool_queue(); std::vector<std::jthread> threads_; std::deque<task_type> tasks_; std::mutex mutex_; }; work_stealing_thread_pool::work_stealing_thread_pool(std::size_t num_threads) { for (std::size_t i = 0; i < num_threads; ++i) { threads_.emplace_back([this](){ while (true) { task_type task; if ((task = get_task_from_local_queue()) || (task = get_task_from_other_thread_queue()) || (task = get_task_from_pool_queue())) { task(); } else { break; } } }); } } work_stealing_thread_pool::~work_stealing_thread_pool() { for (std::jthread& t : threads_) { t.join(); } } work_stealing_thread_pool::task_type work_stealing_thread_pool::get_task_from_local_queue() { if (tasks_.empty()) { return task_type(); } task_type task = tasks_.front(); tasks_.pop_front(); return task; } work_stealing_thread_pool::task_type work_stealing_thread_pool::get_task_from_other_thread_queue() { for (std::size_t i = 0; i < threads_.size(); ++i) { std::size_t index = (i + 1 + threads_.size()) % threads_.size(); if (task_type task = threads_[index].get().tasks_.front()) { threads_[index].get().tasks_.pop_front(); return task; } } return task_type(); } work_stealing_thread_pool::task_type work_stealing_thread_pool::get_task_from_pool_queue() { std::lock_guard<std::mutex> lock(mutex_); if (tasks_.empty()) { return task_type(); } task_type task = tasks_.back(); tasks_.pop_back(); return task; } int main() { work_stealing_thread_pool pool; for (int i = 0; i < 10; ++i) { pool.submit([i](){ std::cout << "Task #" << i << " running on thread " << std::this_thread::get_id() << std::endl; }); } return 0; }
在现代计算机系统中,多核 CPU、GPU 和分布式计算成为了越来越主流的架构。并行计算和并行算法成为了程序员们必备的技能之一。
下面将会介绍并行计算和并行算法的基本概念和原理,以及如何在C++11中使用 std::execution 来调用并行算法。同时,我们还会深入探讨 OpenMP 编写并行程序,CUDA 编程和 GPU 加速,以及分布式计算和云计算技术。
C++11 引入了一组并行算法可以使用多个线程并发执行算法。这些算法与串行算法的接口相同,通过在线程池中调度任务来实现并行执行。
C++ 标准库中的并行算法按照使用方式可以分为以下三类:
#include <algorithm>
#include <vector>
std::vector<int> v{1, 2, 3, 4, 5};
std::for_each(v.begin(), v.end(), [](int& x){ x *= 2; });
并行算法指的是将算法并行执行的标准库函数。例如,在多处理器系统上并行执行 std::for_each 算法。
#include <algorithm>
#include <execution>
#include <vector>
std::vector<int> v { 1, 2, 3, 4, 5 };
std::for_each(std::execution::par, v.begin(), v.end(), [](int& x){ x *= 2; });
执行策略指的是指定算法在哪个线程上执行的方法,可以使用 std::execution::seq、std::execution::par 和 std::execution::par_unseq 来选择不同的执行策略。
OpenMP 是一种基于共享内存的并行编程模型,其目的是在性能和可移植性之间找到平衡。OpenMP 使用编译指示和运行时库来实现并行计算。
以下是使用 OpenMP 编写并行程序的示例:
在示例中使用 #pragma omp parallel 指令指示编译器并行运行代码块。通过 num_threads 参数可以设置并行运行的线程数。
#include <iostream>
#include <omp.h>
int main() {
#pragma omp parallel num_threads(4)
int tid = omp_get_thread_num();
std::cout << "Hello World from thread #" << tid << std::endl;
return 0;
CUDA 是 NVIDIA 公司推出的通用并行计算平台和编程模型,支持在 NVIDIA 的 GPU 上进行并行计算。CUDA 编程通过执行大量线程来实现并行计算。
以下是使用 CUDA 编写矩阵加法的示例:
在示例中使用 global 关键字声明一个 CUDA 内核函数 matrixAdd。在主函数中,通过调用 cudaMalloc 分配内存并将数据传输到 GPU 上处理,然后执行 kernel 函数,通过调用 cudaFree 释放内存。
__global__ void matrixAdd(float* A, float* B, float* C, int n) { int i = blockIdx.x * blockDim.x + threadIdx.x; int j = blockIdx.y * blockDim.y + threadIdx.y; if (i < n && j < n) { int idx = i * n + j; C[idx] = A[idx] + B[idx]; } } int main() { int n = 1024; float *A, *B, *C; cudaMalloc((void**)&A, n * n * sizeof(float)); cudaMalloc((void**)&B, n * n * sizeof(float)); cudaMalloc((void**)&C, n * n * sizeof(float)); dim3 threadsPerBlock(16, 16); dim3 numBlocks(n / threadsPerBlock.x, n / threadsPerBlock.y); matrixAdd<<<numBlocks, threadsPerBlock>>>(A, B, C, n); cudaFree(A); cudaFree(B); cudaFree(C); return 0; }
在现代软件开发中,多线程编程已成为必不可少的技能之一。多线程技术可以通过充分利用 CPU 和其他资源来提高程序的性能。
下面将会介绍 c++ 中使用多线程的实例,包括多线程下载器、并行映射/归约算法、任务调度器、生产者消费者模型和并行搜索算法等。
以下是使用 c++ 实现多线程下载器的示例:
在示例中使用 libcurl 库来处理 HTTP 请求和响应,同时使用 c++ 中的多线程技术来下载文件。我们将文件分成多个块,每个块启动一个线程来下载,最后通过合并块的方式合并文件。使用多线程下载器可以充分利用网络带宽,加快文件下载速度。
#include <iostream> #include <fstream> #include <thread> #include <vector> #include <curl/curl.h> struct Range { long long start; long long end; }; void download_range(const std::string& url, const Range& range, std::ofstream& out_file) { std::string range_str = std::to_string(range.start) + "-" + std::to_string(range.end); CURL* curl = curl_easy_init(); if (curl) { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_RANGE, range_str.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &out_file); curl_easy_perform(curl); curl_easy_cleanup(curl); } } void download_file(const std::string& url, const std::string& filename, int num_threads) { CURL* curl = curl_easy_init(); if (curl) { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_NOBODY, 1L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 10L); long long file_size = 0; curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &file_size); curl_easy_cleanup(curl); std::vector<std::thread> threads(num_threads); std::vector<std::ofstream> out_files(num_threads); std::vector<Range> ranges(num_threads); long long chunk_size = file_size / num_threads; for (int i = 0; i < num_threads; ++i) { out_files[i].open(filename + "." + std::to_string(i), std::ios::out | std::ios::binary); ranges[i].start = i * chunk_size; ranges[i].end = (i == num_threads - 1) ? (file_size - 1) : (ranges[i].start + chunk_size - 1); threads[i] = std::thread(download_range, url, ranges[i], std::ref(out_files[i])); } for (int i = 0; i < num_threads; ++i) { threads[i].join(); out_files[i].close(); } std::ofstream out_file(filename, std::ios::out | std::ios::binary); for (int i = 0; i < num_threads; ++i) { std::ifstream in_file(filename + "." + std::to_string(i), std::ios::in | std::ios::binary); out_file << in_file.rdbuf(); in_file.close(); remove((filename + "." + std::to_string(i)).c_str()); } out_file.close(); } } int main() { std::string url = "https://www.example.com/example_file.zip"; std::string filename = "example_file.zip"; int num_threads = 4; download_file(url, filename, num_threads); return 0; }
以下是使用 c++ 实现并行映射/归约算法的示例:
在示例中使用 c++ 的标准库函数 std::accumulate 来实现归约操作,使用多线程技术实现映射操作。我们将数据集分成多个块,每个块启动一个线程来计算部分和,最后通过合并部分和得到总和。使用并行映射/归约算法可以大大提高数据处理的效率。
#include <iostream> #include <vector> #include <numeric> #include <thread> #include <algorithm> void map_reduce(const std::vector<int>& data, int& result) { int num_threads = std::thread::hardware_concurrency(); std::vector<int> partial_results(num_threads); int chunk_size = data.size() / num_threads; std::vector<std::thread> threads(num_threads); for (int i = 0; i < num_threads; ++i) { threads[i] = std::thread([&data, &partial_results, i, chunk_size]() { int start = i * chunk_size; int end = (i == num_threads - 1) ? data.size() : start + chunk_size; partial_results[i] = std::accumulate(data.begin() + start, data.begin() + end, 0); }); } for (int i = 0; i < num_threads; ++i) { threads[i].join(); } result = std::accumulate(partial_results.begin(), partial_results.end(), 0); } int main() { std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; int result = 0; map_reduce(data, result); std::cout << "The sum is: " << result << std::endl; return 0; }
以下是使用 c++ 实现任务调度器的示例:
在示例中使用了 c++ 的线程、互斥锁、条件变量和 lambda 表达式等技术来实现任务调度。我们定义了一个任务队列和一个线程池,并使用条件变量来实现线程的等待和唤醒。我们使用 submit() 函数向任务队列提交任务,可以异步执行任务。我们还使用 wait() 函数来等待所有任务完成。使用任务调度器可以有效地管理任务的执行,并提高任务处理的效率和灵活性。
#include <iostream> #include <thread> #include <vector> #include <queue> #include <mutex> #include <condition_variable> class task_queue { public: void push(std::function<void()> f) { { std::unique_lock<std::mutex> lock(q_mutex_); queue_.push(f); } q_condition_.notify_one(); } std::function<void()> pop() { std::unique_lock<std::mutex> lock(q_mutex_); q_condition_.wait(lock, [this] { return !queue_.empty(); }); auto f = queue_.front(); queue_.pop(); return f; } private: std::queue<std::function<void()>> queue_; std::mutex q_mutex_; std::condition_variable q_condition_; }; class task_scheduler { public: task_scheduler() { for (int i = 0; i < num_threads_; ++i) { threads_.push_back(std::thread([&] { worker_thread(); })); } } ~task_scheduler() { stop_ = true; cv_.notify_all(); for (auto& t: threads_) { t.join(); } } void submit(std::function<void()> f) { task_queue_.push(f); } void wait() { std::unique_lock<std::mutex> lock(cv_mutex_); cv_.wait(lock, [&] { return task_queue_.empty() && active_tasks_ == 0; }); } private: void worker_thread() { std::function<void()> f; while (true) { { std::unique_lock<std::mutex> lock(cv_mutex_); cv_.wait(lock, [&] { return !task_queue_.empty() || stop_; }); if (stop_ && task_queue_.empty()) { return; } ++active_tasks_; } if (!task_queue_.empty()) { f = task_queue_.pop(); f(); } { std::unique_lock<std::mutex> lock(cv_mutex_); --active_tasks_; cv_.notify_all(); } } } int num_threads_ = std::thread::hardware_concurrency(); std::vector<std::thread> threads_; task_queue task_queue_; std::atomic<bool> stop_ = false; std::condition_variable cv_; std::mutex cv_mutex_; std::atomic<int> active_tasks_{0}; }; int main() { task_scheduler ts; int sum1 = 0, sum2 = 0; ts.submit([&sum1] { sum1 = 0; for (int i = 0; i < 1000; ++i) sum1 += i; }); ts.submit([&sum2] { sum2 = 0; for (int i = 1000; i < 2000; ++i) sum2 += i; }); ts.wait(); std::cout << "Sum: " << sum1 + sum2 << std::endl; return 0; }
以下是使用 c++ 实现生产者消费者模型的示例:
在示例中使用了 c++ 的互斥锁、条件变量和队列等技术来实现生产者消费者模型。我们定义了一个缓冲区类,其中包含向缓冲区添加和获取元素的方法。我们使用条件变量来进行等待和唤醒,以确保生产者和消费者之间的同步。我们还定义了一个生产者类和一个消费者类,并在主函数中启动两个线程来运行它们。使用生产者消费者模型可以有效地解决生产者和消费者之间的同步问题,并提高程序的并发和性能。
#include <iostream> #include <thread> #include <queue> #include <mutex> #include <condition_variable> class buffer { public: void add(int value) { std::unique_lock<std::mutex> lock(mutex_); while (queue_.size() >= capacity_) { full_condition_.wait(lock); } queue_.push(value); empty_condition_.notify_one(); } int get() { std::unique_lock<std::mutex> lock(mutex_); while (queue_.empty()) { empty_condition_.wait(lock); } int value = queue_.front(); queue_.pop(); full_condition_.notify_one(); return value; } private: std::queue<int> queue_; int capacity_ = 10; std::mutex mutex_; std::condition_variable empty_condition_; std::condition_variable full_condition_; }; class producer { public: producer(buffer& buffer) : buffer_(buffer) {} void run() { for (int i = 1; i <= 100; ++i) { buffer_.add(i); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } private: buffer& buffer_; }; class consumer { public: consumer(buffer& buffer) : buffer_(buffer) {} void run() { for (int i = 1; i <= 100; ++i) { int value = buffer_.get(); std::cout << "Consumer: " << value << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } private: buffer& buffer_; }; int main() { buffer buffer; producer p(buffer); consumer c(buffer); std::thread producer_thread(&producer::run, &p); std::thread consumer_thread(&consumer::run, &c); producer_thread.join(); consumer_thread.join(); return 0; }
以下是使用 c++ 实现并行搜索算法的示例:
在示例中使用了 c++ 的原子变量、线程和递归等技术来实现并行搜索算法。我们定义了一个 is_solution() 函数来判断是否为解决方案,使用 search() 函数来进行搜索,并使用原子变量来确保在找到解决方案后立即停止。我们还定义了一个 parallel_search() 函数来启动多个线程来进行并行搜索,并将结果合并。使用并行搜索算法可以大大加快搜索的速度,减少计算时间。
#include <iostream> #include <thread> #include <vector> #include <atomic> bool is_solution(const std::vector<int>& data) { int n = data.size(); int sum = 0; for (int i = 0; i < n; ++i) { sum += data[i]; } return sum == n * (n + 1) / 2; } void search(std::vector<int> data, std::vector<bool>& used, int level, std::vector<std::vector<int>>& results, std::atomic<bool>& found) { if (found) return; if (level == data.size() && is_solution(data)) { found = true; results.push_back(data); return; } for (int i = 0; i < data.size(); ++i) { if (!used[i]) { data[level] = i + 1; used[i] = true; if (found) return; std::thread t(search, data, std::ref(used), level + 1, std::ref(results), std::ref(found)); t.join(); used[i] = false; } } } std::vector<std::vector<int>> parallel_search(int n, int num_threads) { std::atomic<bool> found = false; std::vector<std::vector<int>> results; std::vector<int> data(n); std::vector<bool> used(n, false); std::vector<std::thread> threads(num_threads); for (int i = 0; i < num_threads; ++i) { threads[i] = std::thread(search, data, std::ref(used), 0, std::ref(results), std::ref(found)); } for (int i = 0; i < num_threads; ++i) { threads[i].join(); } return results; } int main() { std::vector<std::vector<int>> results = parallel_search(6, 4); std::cout << "Results found: " << results.size() << std::endl; for (auto& r: results) { std::cout << "Solution: "; for (auto& e: r) { std::cout << e << " "; } std::cout << std::endl; } return 0; }
