当前位置:   article > 正文

C++多线程学习:生产者消费者问题_c++ thread 生产者消费者 卡住

c++ thread 生产者消费者 卡住

多线程相关知识点:

  1. C++11 线程库:http://zh.cppreference.com/w/cpp/thread

  2. 互斥量和锁
    std::unique_lock::lockstd::unique_lock::unlock
    上锁操作,调用它所管理的 Mutex 对象的 lock 函数。如果在调用 Mutex 对象的 lock 函数时该 Mutex 对象已被另一线程锁住,则当前线程会被阻塞,直到它获得了锁。

  3. 条件变量 condition_variable 维基百科

    • void wait (unique_lock& lck); 无条件被阻塞。调用该函数前,当前线程应该已经对unique_lock lck完成了加锁。所有使用同一个条件变量的线程必须在wait函数中使用同一个unique_lock。该wait函数内部会自动调用lck.unlock()对互斥锁解锁,使得其他被阻塞在互斥锁上的线程恢复执行。使用本函数被阻塞的当前线程在获得通知(notified,通过别的线程调用 notify_*系列的函数)而被唤醒后wait()函数恢复执行并自动调用lck.lock()对互斥锁加锁

    • template void wait (unique_lock& lck, Predicate pred);带条件的被阻塞。wait函数设置了谓词(Predicate),只有当pred条件为false时调用该wait函数才会阻塞当前线程,并且在收到其他线程的通知后只有当pred为true时才会被解除阻塞。因此,等效于while (!pred()) wait(lck);

  4. 线程入口函数
    一般情况下,我们使用全局函数或者类的静态函数作为线程函数入口;但是以上2者都不能访问类的非静态成员变量。
    使用类的成员函数,作为线程入口函数。使该线程入口函数能够访问类的成员变量。
class MyClass {
    int fun(int a) {
    ...
    }
}
int main() {
    MyClass MyObject;
    int a = 2;
    // 第一个参数传入函数名,第二个参数传入类实例的地址(this), 第三个参数开始传入函数参数;
    thread my_thread(MyClass::fun, &MyObject, a); 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

生产者消费者问题

多个生产者和多个生产者的问题。生产者不断的向仓库放入产品,消费者不断的从仓库取出产品,仓库的容量是有限的。因此,当仓库处于满状态时,生产者必须等待消费者取出 1 个或多个产品后才能继续生产;同理,当仓库处于空状态时,消费者必须等待生产者将产品放入仓库后才能消费(取出)产品。

使用数组模拟仓库,需要记录下一次生产和消费在数组中的位置。
用生产和消费在数组中的位置判断仓库是否为空或者为满:
假设仓库容量为 N:

  • (produce_position+1)%N == consume_position 满
    因为初始位置都是 0,当两者相差一个位置时,定义满状态。(最多存储N-1个)
  • consume_position == produce_position 空

当仓库满时,阻塞生产者;当一个消费行为后,仓库非满,唤醒生产者;
当仓库空时,阻塞消费者;当一个生产行为后,仓库非空,唤醒消费者;
因此需要引入,仓库非满条件变量仓库非空条件变量

由于生产和消费行为都会修改数据,因此两者操作必须互斥,需引入生产消费互斥锁
当我们要生产(或消费)一定数量的产品时,需要计数判断是否已经完成工作;多个生产者进行生产时,都会对生产的计数变量进行修改,因此需引入生产计数互斥锁消费计数互斥锁,保证同时只有一个生产(或消费)进程对计数变量进行修改。


C++源码

producer.cpp

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <thread>
using namespace std;
const int kProduceItems = 10;
const int kRepositorySize = 4;
template<class T>
class Repository {
public:
    T items_buff[kRepositorySize];
    mutex mtx; // 生产者消费者互斥量
    mutex produce_mutex; // 生产计数互斥量
    mutex consume_mutex; // 消费计数互斥量
    size_t produce_item_count;
    size_t consume_item_count;
    size_t produce_position; // 下一个生产的位置
    size_t consume_position; // 下一个消费的位置
    condition_variable repo_not_full; // 仓库不满条件变量
    condition_variable repo_not_empty; // 仓库不空条件变量
    Repository() {
        produce_item_count = 0;
        consume_item_count = 0;
        produce_position = 0;
        consume_position = 0;
    };
    void Init() {
        fill_n(items_buff, sizeof(items_buff)/sizeof(items_buff[0]), 0);
        produce_item_count = 0;
        consume_item_count = 0;
        produce_position = 0;
        consume_position = 0;
    }
};
template<class T> 
class Factory {
private:
    Repository<T> repo;

    void ProduceItem(Repository<T> &repo, T item) {
        unique_lock<mutex> lock(repo.mtx);
        // +1 后判断,因为在初始时,两者位于同一位置(因此仓库中最大存在 kRepositorySize-1 个产品)
        while ((repo.produce_position+1) % kRepositorySize == repo.consume_position) { 
            cout << "Repository is full, waiting..." << endl;
            (repo.repo_not_full).wait(lock); // 阻塞时释放锁,被唤醒时获得锁
        }
        repo.items_buff[repo.produce_position++] = item;
        if (repo.produce_position == kRepositorySize)
            repo.produce_position = 0;
        (repo.repo_not_empty).notify_all(); // 唤醒所有因空阻塞的进程
        lock.unlock();
    }
    T ConsumeItem(Repository<T> &repo) {
        unique_lock<mutex> lock(repo.mtx);
        while (repo.consume_position == repo.produce_position) {
            cout << "Repository is empty, waiting ..." << endl;
            (repo.repo_not_empty).wait(lock);
        }
        T data = repo.items_buff[repo.consume_position++];
        if (repo.consume_position == kRepositorySize)
            repo.consume_position = 0;
        (repo.repo_not_full).notify_all();
        lock.unlock();
        return data;
    }
public:
    void Reset() {
        repo.Init();
    }
    void ProduceTask() {
        bool ready_to_exit = false;
        while (true) {
            sleep(1); // 如果不sleep ,运行太快,一个进程会完成所有生产
            unique_lock<mutex> lock(repo.produce_mutex);

            if (repo.produce_item_count < kProduceItems) {
                ++(repo.produce_item_count);
                T item = repo.produce_item_count;
                cout << "producer id: "<< this_thread::get_id() << " is producing " 
                     << item << "^th item..." << endl;
                ProduceItem(repo, item);
            } else {
                ready_to_exit = true;
            }

            lock.unlock();
            // sleep(1);
            if (ready_to_exit)
                break;
        }       
        cout << "Producer thread " << std::this_thread::get_id()
             << " is exiting..." << endl;
    }

    void ConsumeTask() {
        bool ready_to_exit =false;
        while (true) {
            sleep(1); // 如果不sleep ,运行太快,一个进程会消费所有产品
            unique_lock<mutex> lock(repo.consume_mutex);

            if (repo.consume_item_count < kProduceItems) {
                T item = ConsumeItem(repo);
                cout << "consumer id: " << this_thread::get_id() << " is consuming "
                     << item << "^th item" << endl;
                ++(repo.consume_item_count);
            } else {
                ready_to_exit = true;
            }

            lock.unlock();
            // sleep(1);
            if (ready_to_exit)
                break;
        }
        cout << "Consumer thread " << std::this_thread::get_id()
             << " is exiting..." << endl;
    }
};

int main() {
    cout << "Main thread id :" << this_thread::get_id() << endl;
    class Factory<int> myfactory;
    thread producer1(&Factory<int>::ProduceTask, &myfactory);
    thread producer2(&Factory<int>::ProduceTask, &myfactory);
    thread producer3(&Factory<int>::ProduceTask, &myfactory);

    thread consumer1(&Factory<int>::ConsumeTask, &myfactory);
    thread consumer2(&Factory<int>::ConsumeTask, &myfactory);
    thread consumer3(&Factory<int>::ConsumeTask, &myfactory);

    producer1.join();
    producer2.join();
    producer3.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

makefile

all:producer
CC=g++
CPPFLAGS=-Wall -std=c++11 -g
LDFLAGS=-pthread
producer:producer.o
    $(CC) $(LDFLAGS) -o $@ $^
producer.o:producer.cpp
    $(CC) $(CPPFLAGS) -o $@ -c $^
clean:
    rm *.o
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

参考资料:C++11 并发指南九(综合运用: C++11 多线程下生产者消费者模型详解)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/267923
推荐阅读
相关标签
  

闽ICP备14008679号