赞
踩
C++11 线程库:http://zh.cppreference.com/w/cpp/thread
互斥量和锁
std::unique_lock::lock
和 std::unique_lock::unlock
上锁操作,调用它所管理的 Mutex 对象的 lock 函数。如果在调用 Mutex 对象的 lock 函数时该 Mutex 对象已被另一线程锁住,则当前线程会被阻塞,直到它获得了锁。
条件变量 condition_variable 维基百科
void wait (unique_lock& lck); 无条件被阻塞。调用该函数前,当前线程应该已经对unique_lock lck完成了加锁。所有使用同一个条件变量的线程必须在wait函数中使用同一个unique_lock。该wait函数内部会自动调用lck.unlock()对互斥锁解锁,使得其他被阻塞在互斥锁上的线程恢复执行。使用本函数被阻塞的当前线程在获得通知(notified,通过别的线程调用 notify_*系列的函数)而被唤醒后,wait()函数恢复执行并自动调用lck.lock()对互斥锁加锁。
template void wait (unique_lock& lck, Predicate pred);带条件的被阻塞。wait函数设置了谓词(Predicate),只有当pred条件为false时调用该wait函数才会阻塞当前线程,并且在收到其他线程的通知后只有当pred为true时才会被解除阻塞。因此,等效于while (!pred()) wait(lck);
class MyClass {
int fun(int a) {
...
}
}
int main() {
MyClass MyObject;
int a = 2;
// 第一个参数传入函数名,第二个参数传入类实例的地址(this), 第三个参数开始传入函数参数;
thread my_thread(MyClass::fun, &MyObject, a);
}
多个生产者和多个生产者的问题。生产者不断的向仓库放入产品,消费者不断的从仓库取出产品,仓库的容量是有限的。因此,当仓库处于满状态时,生产者必须等待消费者取出 1 个或多个产品后才能继续生产;同理,当仓库处于空状态时,消费者必须等待生产者将产品放入仓库后才能消费(取出)产品。
使用数组模拟仓库,需要记录下一次生产和消费在数组中的位置。
用生产和消费在数组中的位置判断仓库是否为空或者为满:
假设仓库容量为 N:
当仓库满时,阻塞生产者;当一个消费行为后,仓库非满,唤醒生产者;
当仓库空时,阻塞消费者;当一个生产行为后,仓库非空,唤醒消费者;
因此需要引入,仓库非满条件变量和仓库非空条件变量。
由于生产和消费行为都会修改数据,因此两者操作必须互斥,需引入生产消费互斥锁。
当我们要生产(或消费)一定数量的产品时,需要计数判断是否已经完成工作;多个生产者进行生产时,都会对生产的计数变量进行修改,因此需引入生产计数互斥锁和消费计数互斥锁,保证同时只有一个生产(或消费)进程对计数变量进行修改。
producer.cpp
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <thread>
using namespace std;
const int kProduceItems = 10;
const int kRepositorySize = 4;
template<class T>
class Repository {
public:
T items_buff[kRepositorySize];
mutex mtx; // 生产者消费者互斥量
mutex produce_mutex; // 生产计数互斥量
mutex consume_mutex; // 消费计数互斥量
size_t produce_item_count;
size_t consume_item_count;
size_t produce_position; // 下一个生产的位置
size_t consume_position; // 下一个消费的位置
condition_variable repo_not_full; // 仓库不满条件变量
condition_variable repo_not_empty; // 仓库不空条件变量
Repository() {
produce_item_count = 0;
consume_item_count = 0;
produce_position = 0;
consume_position = 0;
};
void Init() {
fill_n(items_buff, sizeof(items_buff)/sizeof(items_buff[0]), 0);
produce_item_count = 0;
consume_item_count = 0;
produce_position = 0;
consume_position = 0;
}
};
template<class T>
class Factory {
private:
Repository<T> repo;
void ProduceItem(Repository<T> &repo, T item) {
unique_lock<mutex> lock(repo.mtx);
// +1 后判断,因为在初始时,两者位于同一位置(因此仓库中最大存在 kRepositorySize-1 个产品)
while ((repo.produce_position+1) % kRepositorySize == repo.consume_position) {
cout << "Repository is full, waiting..." << endl;
(repo.repo_not_full).wait(lock); // 阻塞时释放锁,被唤醒时获得锁
}
repo.items_buff[repo.produce_position++] = item;
if (repo.produce_position == kRepositorySize)
repo.produce_position = 0;
(repo.repo_not_empty).notify_all(); // 唤醒所有因空阻塞的进程
lock.unlock();
}
T ConsumeItem(Repository<T> &repo) {
unique_lock<mutex> lock(repo.mtx);
while (repo.consume_position == repo.produce_position) {
cout << "Repository is empty, waiting ..." << endl;
(repo.repo_not_empty).wait(lock);
}
T data = repo.items_buff[repo.consume_position++];
if (repo.consume_position == kRepositorySize)
repo.consume_position = 0;
(repo.repo_not_full).notify_all();
lock.unlock();
return data;
}
public:
void Reset() {
repo.Init();
}
void ProduceTask() {
bool ready_to_exit = false;
while (true) {
sleep(1); // 如果不sleep ,运行太快,一个进程会完成所有生产
unique_lock<mutex> lock(repo.produce_mutex);
if (repo.produce_item_count < kProduceItems) {
++(repo.produce_item_count);
T item = repo.produce_item_count;
cout << "producer id: "<< this_thread::get_id() << " is producing "
<< item << "^th item..." << endl;
ProduceItem(repo, item);
} else {
ready_to_exit = true;
}
lock.unlock();
// sleep(1);
if (ready_to_exit)
break;
}
cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << endl;
}
void ConsumeTask() {
bool ready_to_exit =false;
while (true) {
sleep(1); // 如果不sleep ,运行太快,一个进程会消费所有产品
unique_lock<mutex> lock(repo.consume_mutex);
if (repo.consume_item_count < kProduceItems) {
T item = ConsumeItem(repo);
cout << "consumer id: " << this_thread::get_id() << " is consuming "
<< item << "^th item" << endl;
++(repo.consume_item_count);
} else {
ready_to_exit = true;
}
lock.unlock();
// sleep(1);
if (ready_to_exit)
break;
}
cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << endl;
}
};
int main() {
cout << "Main thread id :" << this_thread::get_id() << endl;
class Factory<int> myfactory;
thread producer1(&Factory<int>::ProduceTask, &myfactory);
thread producer2(&Factory<int>::ProduceTask, &myfactory);
thread producer3(&Factory<int>::ProduceTask, &myfactory);
thread consumer1(&Factory<int>::ConsumeTask, &myfactory);
thread consumer2(&Factory<int>::ConsumeTask, &myfactory);
thread consumer3(&Factory<int>::ConsumeTask, &myfactory);
producer1.join();
producer2.join();
producer3.join();
consumer1.join();
consumer2.join();
consumer3.join();
}
all:producer
CC=g++
CPPFLAGS=-Wall -std=c++11 -g
LDFLAGS=-pthread
producer:producer.o
$(CC) $(LDFLAGS) -o $@ $^
producer.o:producer.cpp
$(CC) $(CPPFLAGS) -o $@ -c $^
clean:
rm *.o
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。