赞
踩
互斥量mutex
#include <iostream> #include <pthread.h> #include <unistd.h> const int N = 5; int tickets = 1000; int cnt = 0; // 被去抢的次数 void* ThreadRun(void* args) { int id = *(int *)args; while (true) { if (tickets > 0) { usleep(10000); printf("%d线程抢到了票: %d\n", id, tickets); tickets -- ; } else { break; } cnt ++ ; } std::cout << "总共抢的票数" << cnt << std::endl; } int main() { pthread_t tid[N]; for (int i = 0; i < N; i ++ ) { int* id = new int(i); pthread_create(&tid[i], nullptr, ThreadRun, (void*)id); } for (int i = 0; i < N; i ++ ) { pthread_join(tid[i], nullptr); } return 0; }
为什么会出现抢到负数的情况?
-- 操作并不是原子操作,而是对应三条汇编指令:
所以一个线程在执行 -- 操作时可能被切走,但是每个线程执行的时候都会将自己的上下文数据拷贝到寄存器中,退出时也会将寄存器的数据保留下来,此时线程被切走,保留现在的tickets,再次被执行的时候,它仍然以自己的票数作为基准,所以会出现抢多了的情况。
怎么才能保证不会发生这样不合理的事件呢?
给临界区代码加锁,可以实现线程访问临界区的原子性。
那申请锁的过程是原子性的吗?答案是的,因为申请锁但是关键语句只有一条汇编代码,要么执行,要么没有执行,所以是原子性的,保证了临界区的安全。
Linux上提供的这把锁叫互斥量。
初始化互斥量
初始化互斥量有两种方法:
方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量
销毁互斥量需要注意:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量, 那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
互斥量的实现
创建一个抢票的类,封装互斥量,对临界区代码进行加锁。
#include <iostream> #include <pthread.h> #include <unistd.h> const int N = 5; int tickets = 1000; int cnt = 0; // 被去抢的次数 class Tickets { public: Tickets() : tickets(1000) { pthread_mutex_init(&mtx, nullptr); } ~Tickets() { pthread_mutex_destroy(&mtx); } bool GetTickets() { bool res = true; pthread_mutex_lock(&mtx); if (tickets > 0) { usleep(1000); printf("%lld线程抢到了票: %d\n", pthread_self(), tickets); tickets -- ; } else { printf("票已经被抢光了\n"); res = false; } pthread_mutex_unlock(&mtx); return res; } private: pthread_mutex_t mtx; int tickets; }; void* ThreadRun(void* args) { Tickets* t = (Tickets*)args; bool flag = true; while (flag) { flag = t->GetTickets(); } } int main() { pthread_t tid[N]; Tickets* t = new Tickets(); for (int i = 0; i < N; i ++ ) { pthread_create(&tid[i], nullptr, ThreadRun, t); } for (int i = 0; i < N; i ++ ) { pthread_join(tid[i], nullptr); } return 0; }
这样就不会抢多余的票了
线程安全: 多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见的线程不安全的情况
常见的线程安全的情况
常见不可重入的情况
常见可重入的情况
可重入与线程安全联系
可重入与线程安全区别
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁四个必要条件
也就是说,有这四个条件不一定存在死锁,有死锁一定存在这四个条件。
避免死锁
概念:
同步概念与竞态条件
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
1. 调用的时候,会首先自动释放锁,然后再挂起等待,保证挂起的时候锁被释放
2. 返回的时候,会首先自动竞争锁,持有锁之后返回,保证返回的时候拥有锁
唤醒等待
1. 唤起在该条件变量下等待的所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);
2. 唤起在该条件变量下等待的一个线程(等待队列的第一个)
int pthread_cond_signal(pthread_cond_t *cond);
#include <iostream> #include <unistd.h> #include <pthread.h> #include <string> // 定义一把全局的锁和一个全局的条件变量 pthread_mutex_t mtx; pthread_cond_t cond; void* ctrl(void* args) { std::string name = (char*)args; while (true) { std::cout << name << "send task" << std::endl; pthread_cond_signal(&cond); sleep(2); } } void* work(void* args) { int id = *(int*)args; delete (int*)args; while (true) { pthread_cond_wait(&cond, &mtx); std::cout << id << "号线程开始工作" << std::endl; } } int main() { pthread_cond_init(&cond, nullptr); pthread_mutex_init(&mtx, nullptr); const int N = 5; pthread_t master; pthread_t worker[N]; pthread_create(&master, nullptr, ctrl, (void*)"master"); for (int i = 0; i < N; i ++ ) { int* id = new int(i); pthread_create(&worker[i], nullptr, work, (void*)id); } pthread_join(master,nullptr); for (int i = 0; i < N; i ++ ) pthread_join(worker[i], nullptr); pthread_mutex_destroy(&mtx); pthread_cond_destroy(&cond); return 0; }
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
消费者只用关心缓冲区,生产者也只用关心缓冲区
简化到代码上:
一个缓冲区,可以是一个容器
生产者和消费者 ---- 两个执行流
生产者和消费者之间的3种关系
CPTest.cc 代码
#include "BlockQueue.hpp" using namespace ns_blockqueue; #include <unistd.h> #include <time.h> void* producter(void* args) { BlockQueue<int>* bq = (BlockQueue<int>*)args; while (true) { int t = rand()%10 + 1; bq->Push(t); std::cout << "send task: " << t << std::endl; } } void* consumer(void* args) { BlockQueue<int>* bq = (BlockQueue<int>*)args; while (true) { int c = 0; bq->Pop(&c); std::cout << "号线程开始消费" << c << std::endl; sleep(1); } } int main() { srand((unsigned long)time(nullptr)); const int N = 5; pthread_t master; pthread_t worker[N]; BlockQueue<int> *bq = new BlockQueue<int>; pthread_create(&master, nullptr, producter, (void*)bq); for (int i = 0; i < N; i ++ ) { int* id = new int(i); pthread_create(&worker[i], nullptr, consumer, (void*)bq); } pthread_join(master,nullptr); for (int i = 0; i < N; i ++ ) pthread_join(worker[i], nullptr); return 0; }
BlockQueue.hpp 代码
#pragma once #include <iostream> #include <queue> #include <pthread.h> namespace ns_blockqueue { const int default_cap = 5; template <class T> class BlockQueue { private: std::queue<T> bq_; // 阻塞队列 int cap_; pthread_mutex_t mtx_; // 1. 当队列满的时候,需要消费者来消费,所以需要消费者在这个条件变量下面等待 // 2. 当队列空的时候,需要生产者来生产,所以需要生产者在这个条件变量下面等待 pthread_cond_t is_full_; pthread_cond_t is_empty_; public: BlockQueue(int cap = default_cap) : cap_(cap) { pthread_mutex_init(&mtx_, nullptr); pthread_cond_init(&is_full_, nullptr); pthread_cond_init(&is_empty_, nullptr); } ~BlockQueue() { pthread_mutex_destroy(&mtx_); pthread_cond_destroy(&is_full_); pthread_cond_destroy(&is_empty_); } bool IsFull() { return bq_.size() == cap_; } bool IsEmpty() { return bq_.size() == 0; } private: void LockQueue() { pthread_mutex_lock(&mtx_); } void UnlockQueue() { pthread_mutex_unlock(&mtx_); } void ProducterWait() { pthread_cond_wait(&is_empty_, &mtx_); } void ComsumerWait() { pthread_cond_wait(&is_full_, &mtx_); } void WakeupComsumer() { pthread_cond_signal(&is_full_); } void WakeupProducter() { pthread_cond_signal(&is_empty_); } public: void Push(const T &in) { LockQueue(); while (IsFull()) { // 这里用while循环等待,防止出现等待之后条件不满足继续的情况 ProducterWait(); } bq_.push(in); UnlockQueue(); WakeupComsumer(); // 唤醒消费者消费 } void Pop(T *out) { LockQueue(); while (IsEmpty()) { ComsumerWait(); } *out = bq_.front(); bq_.pop(); UnlockQueue(); WakeupProducter(); } }; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。