赞
踩
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
Compile and link with -pthread.
pthread_create()
库函数用于创建一个线程,线程创建成功时返回0,创建失败时返回出错编号。
参数说明:
thread
:指向新创建的线程的线程标识符(线程ID)。attr
:用来设置创建线程的属性,传递 NULL
表示创建默认属性的线程。start_routine
:一个函数指针,代表线程运行函数的起始地址,相当于新线程的主函数。arg
:指向线程运行函数的参数。#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> void *thread_fun(void *arg) { printf("My pid is %d, my tid is %lu.\n", getpid(), *((unsigned long int *)arg)); return NULL; } int main() { pthread_t tid; printf("My pid is %d, my tid is %lu.\n", getpid(), pthread_self()); // pthread_self()用于获取当前线程的线程id if (pthread_create(&tid, NULL, thread_fun, (void *)(&tid)) != 0) { perror("pthread_create error"); exit(1); } sleep(1); return 0; }
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# gcc main.c -o main -pthread
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# ./main
My pid is 277800, my tid is 139821755344704.
My pid is 277800, my tid is 139821755340544.
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others#
#include <pthread.h>
int pthread_cancel(pthread_t thread);
Compile and link with -pthread.
pthread_cancel()
库函数用于线程的被动退出,它会发送终止请求给目标线程,发送成功返回0,否则返回非0值。要注意的是,发送成功并不意味着目标线程一定会终止,目标线程收到终止信号后可以终止也可以忽略,这完全由其自己决定。
#include <pthread.h>
void pthread_exit(void *retval);
Compile and link with -pthread.
pthread_exit()
库函数用于线程的主动退出。retval
参数为指定的返回值,其他线程可以通过 pthread_join()
函数获取该返回值。当然,线程函数也可以调用 return()
使线程退出,但不能使用 exit()
退出,因为进程中的任一线程调用 exit()
会终止该进程的所有线程。
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
int pthread_detach(pthread_t thread);
Compile and link with -pthread.
线程等待的目的是保证线程的资源能够被回收,线程主要有两种状态:
默认情况下,线程会被创建成可结合态的。可以通过调用 pthread_detach()
库函数将可结合态线程转化为分离态线程,由操作系统负责资源的回收。此外,也可以由主线程调用 pthread_join()
等待子线程的终止,待其终止后由主线程引导子线程的销毁并回收资源,其中 retval
参数保存的是指向被等待线程的执行函数返回值的指针的地址,如果不需要采集返回值,可以将其置为 NULL
。
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> void *thread1_fun(void *arg) { pthread_exit(NULL); } void *thread2_fun(void *arg) { char *msg = (char *) malloc(10); strcpy(msg, "Message."); sleep(1); pthread_exit((void *) msg); } int main() { pthread_t tid1, tid2; if (pthread_create(&tid1, NULL, thread1_fun, NULL) != 0) { perror("pthread_create error"); exit(1); } if (pthread_create(&tid2, NULL, thread2_fun, NULL) != 0) { perror("pthread_create error"); exit(1); } pthread_detach(tid1); // 将子线程转换为分离态,由系统自动回收资源 void *ret; pthread_join(tid2, &ret); // 等待子线程的结束,在其结束后回收其资源 printf("%s\n", (char *) ret); free(ret); return 0; }
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# gcc main.c -o main -pthread
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# ./main
Message.
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others#
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
Link with -pthread.
sem_init()
库函数用于初始化一个信号量。
参数说明:
sem
:新创建的信号量的地址。pshared
:用于指示信号量的共享方式,如果为 0
那么信号量将被同一进程内的线程共享,如果是非零值,那么信号量将在进程之间共享。value
:指定新创建的信号量的初始值。#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
Link with -pthread.
以上三个库函数用于执行P操作,其中 sem_wait()
为阻塞等待,sem_trywait()
函数为非阻塞等待,执行失败会直接返回并将errno置为 EAGAIN
,sem_timedwait()
函数也为阻塞等待,但可以通过 abs_timeout
参数设置等待时间。
#include <semaphore.h>
int sem_post(sem_t *sem);
Link with -pthread.
sem_post()
库函数用于执行V操作,调用成功时返回值为0,否则返回值为-1并设置errno。
#include <semaphore.h>
int sem_destroy(sem_t *sem);
Link with -pthread.
sem_destroy()
函数用于销毁一个匿名信号量,在Linux中省略这个函数不会带来异常,但为了安全性和可移植性,还是应该在合适的时机销毁信号量。
通过匿名信号量实现互斥:
#include <pthread.h> #include <semaphore.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> sem_t sem[1]; int sum = 0; void *thread1_fun(void *arg); void *thread2_fun(void *arg); int main() { pthread_t tid1, tid2; /* 初始化信号量 */ sem_init(&sem[0], 0, 1); /* 创建子线程 */ if (pthread_create(&tid1, NULL, thread1_fun, NULL) != 0) { perror("pthread_create(&tid1, NULL, thread1_fun, NULL"); exit(1); } if (pthread_create(&tid2, NULL, thread2_fun, NULL) != 0) { perror("pthread_create(&tid2, NULL, thread2_fun, NULL"); exit(1); } /* 等待子线程退出 */ pthread_join(tid1, NULL); pthread_join(tid2, NULL); /* 销毁信号量 */ sem_destroy(&sem[0]); return 0; } void *thread1_fun(void *arg) { for (int i = 0; i < 5; i++) { sem_wait(&sem[0]); // P printf("[1] sum = %d.\n", sum); sum++; printf("[1] sum = %d.\n", sum); sem_post(&sem[0]); // V sleep(1); } pthread_exit(NULL); } void *thread2_fun(void *arg) { for (int i = 0; i < 5; i++) { sem_wait(&sem[0]); // P printf("[2] sum = %d.\n", sum); sum++; printf("[2] sum = %d.\n", sum); sem_post(&sem[0]); // V sleep(1); } pthread_exit(NULL); }
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# gcc main.c -o main -pthread root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# ./main [2] sum = 0. [2] sum = 1. [1] sum = 1. [1] sum = 2. [2] sum = 2. [2] sum = 3. [1] sum = 3. [1] sum = 4. [2] sum = 4. [2] sum = 5. [1] sum = 5. [1] sum = 6. [2] sum = 6. [2] sum = 7. [1] sum = 7. [1] sum = 8. [2] sum = 8. [2] sum = 9. [1] sum = 9. [1] sum = 10. root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others#
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
pthread_mutex_init()
与 pthread_mutex_destroy()
均为POSIX库函数,分别用于互斥锁的初始化和销毁。其中 mutex
参数为互斥锁变量的地址值,attr
参数表示要创建的互斥锁的属性,一般置为 NULL
表示使用默认属性。
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
以上三个POSIX库函数用于实现互斥锁的加锁和解锁操作,其中 mutex
参数为待操作的锁变量的地址,加锁操作和解锁操作一般需要成对出现以避免死锁。
pthread_mutex_lock()
和 pthread_mutex_trylock()
的区别:
用 pthread_mutex_lock()
加锁时,如果mutex已经被锁住,当前尝试加锁的线程就会被阻塞,直到互斥锁被其他线程释放。而 pthread_mutex_trylock()
函数则不同,如果mutex已经被锁住,它将立即返回错误码 EBUSY
,而不是阻塞等待。
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> pthread_mutex_t mutex; int sum = 0; void *thread_add_fun(void *arg); void *thread_show_fun(void *arg); int main() { pthread_t tid_add, tid_show; /* 初始化互斥锁 */ pthread_mutex_init(&mutex, NULL); /* 创建show线程 */ if (pthread_create(&tid_show, NULL, thread_show_fun, NULL) != 0) { perror("pthread_create error"); exit(1); } sleep(1); // 让show线程优先执行 /* 创建add线程 */ if (pthread_create(&tid_add, NULL, thread_add_fun, NULL) != 0) { perror("pthread_create error"); exit(1); } /* 等待子线程退出 */ pthread_join(tid_show, NULL); pthread_join(tid_add, NULL); /* 销毁互斥锁 */ pthread_mutex_destroy(&mutex); return 0; } /* 负责将全局变量增加到3 */ void *thread_add_fun(void *arg) { while (1) { pthread_mutex_lock(&mutex); sum = sum + 1; printf("[add] sum = %d.\n", sum); if (sum == 3) { /* 完成自加任务后把锁释放,然后退出 */ pthread_mutex_unlock(&mutex); break; } else { pthread_mutex_unlock(&mutex); sleep(1); // 给show线程获得锁测试变量的机会 }· } } /* 负责当sum增加到3后输出YES信息 */ void *thread_show_fun(void *arg) { /* 循环等待直到sum到达3 */ while (1) { pthread_mutex_lock(&mutex); if (sum == 3) { /* 满足条件后输出并把锁释放,然后退出 */ printf("[show][YES] sum = %d.\n", sum); pthread_mutex_unlock(&mutex); break; } else { printf("[show][NO] sum = %d.\n", sum); pthread_mutex_unlock(&mutex); sleep(1); // 给add线程获得锁增加变量的机会 } } }
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# gcc main.c -o main -pthread
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# ./main
[show][NO] sum = 0.
[show][NO] sum = 0.
[add] sum = 1.
[show][NO] sum = 1.
[add] sum = 2.
[show][NO] sum = 2.
[add] sum = 3.
[show][YES] sum = 3.
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others#
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_init()
与 pthread_cond_destroy()
均为POSIX库函数,分别用于条件变量的初始化和销毁。其中 cond
参数为条件变量的地址值,attr
参数表示要创建的条件变量的属性,由于该属性在实际中没有被实现,所以它的值通常是 NULL
。
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
以上两个POSIX库函数用于等待条件变量的激活,其中 pthread_cond_timedwait()
函数用于计时等待,如果在 abstime
参数指定的时间前条件变量仍没有满足,会直接返回 ETIMEOUT
并结束等待。
其中,mutex
参数为用于保护条件变量的互斥锁。实际上,条件的检测是在互斥锁的保护下进行的,线程在改变条件状态之前必须首先锁住互斥量。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,这些线程将重新锁定互斥锁并重新测试条件是否满足。
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
以上两个POSIX库函数用于激活条件变量,其中 pthread_cond_signal()
用于激活一个等待条件变量成立的线程,当存在多个等待线程时,会按照入队顺序激活其中的一个,而 pthread_cond_broadcast()
则会激活所有等待的线程。
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> pthread_mutex_t mutex; pthread_cond_t cond; int sum = 0; void *thread_add_fun(void *arg); void *thread_show_fun(void *arg); int main() { pthread_t tid_add, tid_show; /* 初始化互斥锁和条件变量 */ pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); /* 创建show线程 */ if (pthread_create(&tid_show, NULL, thread_show_fun, NULL) != 0) { perror("pthread_create error"); exit(1); } sleep(1); // 让show线程优先执行 /* 创建add线程 */ if (pthread_create(&tid_add, NULL, thread_add_fun, NULL) != 0) { perror("pthread_create error"); exit(1); } /* 等待子线程退出 */ pthread_join(tid_show, NULL); pthread_join(tid_add, NULL); /* 销毁互斥锁 */ pthread_mutex_destroy(&mutex); return 0; } /* 负责将全局变量增加到3 */ void *thread_add_fun(void *arg) { while (1) { pthread_mutex_lock(&mutex); sum = sum + 1; printf("[add] sum = %d.\n", sum); if (sum == 3) { /* 完成自加任务后通知show线程并把锁释放,然后退出 */ pthread_mutex_unlock(&mutex); pthread_cond_signal(&cond); break; } else { pthread_mutex_unlock(&mutex); sleep(1); } } } /* 负责当sum增加到3后输出YES信息 */ void *thread_show_fun(void *arg) { pthread_mutex_lock(&mutex); printf("[show][NO] sum = %d.\n", sum); /* 通过条件变量等待直到sum到达3 */ pthread_cond_wait(&cond, &mutex); /* 满足条件后输出并把锁释放 */ printf("[show][YES] sum = %d.\n", sum); pthread_mutex_unlock(&mutex); }
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# gcc main.c -o main -pthread
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others# ./main
[show][NO] sum = 0.
[add] sum = 1.
[add] sum = 2.
[add] sum = 3.
[show][YES] sum = 3.
root@iZwz9fsfltolu74amg1v0rZ:/home/atreus/Documents/others#
通过执行结果可以看出,show线程在获得锁后发现sum不等于3,于是将锁释放并将自己挂起,add线程获得锁后执行自加,在加到3后唤醒show线程完成输出,这样就通过条件变量避免了show线程的循环等待。
其中,pthread_cond_wait()
函数并非只是单独地等待条件变量,因为我们注意到参数列表中除了条件变量外,还包括一个互斥锁。实际上,pthread_cond_wait()
会依次执行以下操作:
基于 POSIX 线程库的线程池简单实现:
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #define THREAD_MAX_NUM 3 // 线程池最大线程数 #define TASK_MAX_NUM 10 // 任务队列最大任务数 /* 任务队列结构体定义 */ typedef struct task_t { void *(*fun)(void *arg); // 指向作业函数的指针,该函数返回一个void型指针 void *arg; // 指向作业函数的参数的指针 struct task_t *next; // 指向作业队列中的下一个作业的指针 } task_t, *task_queue_t; /* 线程池结构体定义 */ typedef struct thread_pool_t { pthread_t *thread_id_list; // 线程池中所有线程的id int task_cur_num; // 任务队列当前长度 task_t *task_queue_head; // 任务队列头指针 task_t *task_queue_tail; // 任务队列尾指针 pthread_mutex_t mutex; // 用于保证各个线程互斥访问线程池的互斥锁 pthread_cond_t task_queue_empty; // 任务队列空,会阻塞销毁操作 pthread_cond_t task_queue_not_empty; // 任务队列不空,会阻塞执行操作 pthread_cond_t task_queue_not_full; // 任务队列不满,会阻塞添加操作 } thread_pool_t; /** * @brief 初始化线程池 * @param thread_pool 线程池指针的地址 */ void thread_pool_init(thread_pool_t **thread_pool); /** * @brief 销毁线程池 * @param thread_pool 线程池指针 */ void thread_pool_destroy(thread_pool_t *thread_pool); /** * @brief 向线程池中添加任务 * @param thread_pool 线程池指针 * @param fun 交给此任务执行的函数指针 * @param arg 上述函数的参数 */ void thread_pool_add_task(thread_pool_t *thread_pool, void *(*fun)(void *), void *arg); /** * @brief 线程池中线程的运行函数 * @param arg 线程池中线程的参数,主要是线程所属线程池 * @return void* 返回值 */ void *thread_pool_execute(void *arg); void *thread_fun(void *arg) { printf("%d\n", *(int *) arg); return NULL; } int main() { thread_pool_t *thread_pool; thread_pool_init(&thread_pool); int arg[10] = {1, 2, 3, 4, 5, 6, 7, 8, 9}; for (int i = 0; i < 10; i++) { thread_pool_add_task(thread_pool, thread_fun, (void *) (arg + i)); } thread_pool_destroy(thread_pool); } void thread_pool_init(thread_pool_t **pool) { *pool = (thread_pool_t *) calloc(1, sizeof(thread_pool_t)); /* 初始化线程池线程线程列表 */ (*pool)->thread_id_list = (pthread_t *) calloc(THREAD_MAX_NUM, sizeof(pthread_t)); /* 初始化任务队列 */ (*pool)->task_cur_num = 0; (*pool)->task_queue_head = (*pool)->task_queue_tail = NULL; /* 初始化互斥锁和条件变量 */ pthread_mutex_init(&((*pool)->mutex), NULL); pthread_cond_init(&((*pool)->task_queue_empty), NULL); pthread_cond_init(&((*pool)->task_queue_not_empty), NULL); pthread_cond_init(&((*pool)->task_queue_not_full), NULL); /* 创建线程 */ for (int i = 0; i < THREAD_MAX_NUM; i++) { pthread_create((*pool)->thread_id_list + i, NULL, thread_pool_execute, (void *) *pool); } printf("Threadpoll initialized successfully. [thread_pool.c]\n"); } void thread_pool_destroy(thread_pool_t *pool) { pthread_mutex_lock(&pool->mutex); while (pool->task_cur_num != 0) { // 等待所有的任务被处理完毕 pthread_cond_wait(&pool->task_queue_empty, &pool->mutex); } pool->task_cur_num = -1; // 通知所有线程结束运行 pthread_mutex_unlock(&pool->mutex); pthread_cond_broadcast(&pool->task_queue_not_empty); // 唤醒所有因为任务队列空而阻塞的线程 for (int i = 0; i < THREAD_MAX_NUM; i++) { pthread_join(pool->thread_id_list[i], NULL); } pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->task_queue_empty); pthread_cond_destroy(&pool->task_queue_not_empty); pthread_cond_destroy(&pool->task_queue_not_full); printf("Threadpoll was successfully destroyed. [thread_pool.c]\n"); } void thread_pool_add_task(thread_pool_t *pool, void *(*fun)(void *), void *arg) { task_t *new_task = (task_t *) calloc(1, sizeof(task_t)); pthread_mutex_lock(&pool->mutex); // 互斥访问线程池 while (pool->task_cur_num == TASK_MAX_NUM) { // 任务队列满 pthread_cond_wait(&pool->task_queue_not_full, &pool->mutex); } /* 初始化新的任务节点 */ new_task->fun = fun; new_task->arg = arg; new_task->next = NULL; /* 向线程池的任务队列中添加新任务 */ if (pool->task_queue_head == NULL) { // 任务队列空 pool->task_queue_head = pool->task_queue_tail = new_task; // 添加新的任务 pthread_cond_broadcast(&pool->task_queue_not_empty); // 唤醒因为任务队列空而阻塞的线程 } else { pool->task_queue_tail->next = new_task; // 添加新的任务 pool->task_queue_tail = new_task; // 移动任务队列尾指针 } pool->task_cur_num++; // 任务数量加一 pthread_mutex_unlock(&pool->mutex); } void *thread_pool_execute(void *arg) { thread_pool_t *pool = (thread_pool_t *) arg; // 通过参数获取自己所属的线程池 task_t *task = NULL; while (1) { pthread_mutex_lock(&pool->mutex); // 互斥访问线程池 while (pool->task_cur_num == 0) { // 任务队列空 pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); } if (pool->task_cur_num == -1) { pthread_mutex_unlock(&pool->mutex); pthread_exit(NULL); } task = pool->task_queue_head; // 取出任务队列中的首个任务 pool->task_cur_num--; // 任务队列任务数减一 pthread_cond_broadcast(&pool->task_queue_not_full); // 继续向任务队列添加任务 if (pool->task_cur_num == 0) { // 任务队列空 pool->task_queue_head = pool->task_queue_tail = NULL; pthread_cond_broadcast(&pool->task_queue_empty); } else { pool->task_queue_head = pool->task_queue_head->next; } pthread_mutex_unlock(&pool->mutex); (*task->fun)(task->arg); // 执行所取出的任务的函数 free(task); // 释放内存 task = NULL; // 避免悬空指针 } }
atreus@MacBook-Pro % clang main.c -o main
atreus@MacBook-Pro % ./main
Threadpoll initialized successfully. [thread_pool.c]
1
4
5
6
7
8
9
0
2
3
Threadpoll was successfully destroyed. [thread_pool.c]
atreus@MacBook-Pro %
区别:
切换:
如何选择:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。