赞
踩
任务队列控制的线程模型主要是通过任务队列上的信号来控制线程池中的线程调度.
线程池的实现
本例程共由5个文件构成:tpool.h,tpool.c,log.c,log.h,testpool.c.其中tpool.h,tpool.c是实现线程池的核心文件,在tpool.h中定义了线程池和工作线程的数据结构及创建线程池和添加工作线程等方法,tpool.c中为具体的实现代码.log.c,log.h提供了一种记录文件的生成手段,能在文件中记录自定义的信息.testpool.c为线程池的测试程序.
该例子由多个源文件组成,编译命令如下:gcc -g -pthread -o testpool testpool.c
tpool.c log.c
下面给出这个例子的源代码,首先是线程池的定义文件tpool.h:
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include
#include
typedef struct tpool_work
{
void (*handler_routine)();
void *arg;
struct tpool_work *next;
} tpool_work_t;
typedef struct tpool
{
int num_threads;
int max_queue_size;
int do_not_block_when_full;
pthread_t *threads;
int cur_queue_size;
tpool_work_t *queue_head;
tpool_work_t *queue_tail;
pthread_mutex_t queue_lock;
pthread_cond_t queue_not_full;
pthread_cond_t queue_not_empty;
pthread_cond_t queue_empty;
int queue_closed;
int shutdown;
} tpool_t;
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int
do_not_block_when_full);
extern int tpool_add_work(tpool_t *pool, void
(*routine)(), void *arg);
extern int tpool_destroy(tpool_t *pool, int finish);
#endif
当线程池退出后,需要释放所用的资源,包括以下五个步骤:
设置线程退出标记;
禁止添加新任务到任务链表;
设置线程池销毁标记;
等待所有已经建立的线程退出;
释放线程池所占的内存空间.
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur;
lprintf(log, INFO,
"destroy pool begin!\n");
if((rtn =
pthread_mutex_lock(&(pool->queue_lock))) != 0)
{
lprintf(log,FATAL,"pthread mutex lock
failure\n");
return -1;
}
lprintf(log, INFO,
"destroy pool begin 1!\n");
if(pool->queue_closed
|| pool->shutdown)
{
if((rtn =
pthread_mutex_unlock(&(pool->queue_lock))) !=
0)
{
lprintf(log,FATAL,"pthread mutex lock failure\n");
return
-1;
}
return 0;
}
lprintf(log, INFO,
"destroy pool begin 2!\n");
pool->queue_closed =
1;
if(finish)
{
while(pool->cur_queue_size !=
0)
{
if((rtn =
pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock)))
!= 0)
{
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
lprintf(log, INFO,
"destroy pool begin 3!\n");
pool->shutdown =
1;
if((rtn =
pthread_mutex_unlock(&(pool->queue_lock))) !=
0)
{
lprintf(log,FATAL,"pthread mutex unlock
failure\n");
return -1;
}
lprintf(log, INFO,
"destroy pool begin 4!\n");
if((rtn =
pthread_cond_broadcast(&(pool->queue_not_empty))) !=
0)
{
lprintf(log,FATAL,"pthread_cond_boradcast
%d\n",rtn);
return -1;
}
if((rtn =
pthread_cond_broadcast(&(pool->queue_not_full)))
!= 0)
{
lprintf(log,FATAL,"pthread_cond_boradcast
%d\n",rtn);
return -1;
}
for(i = 0; i <
pool->num_threads; i++)
{
if((rtn =
pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return
-1;
}
}
free(pool->threads);
while(pool->queue_head != NULL)
{
cur = pool->queue_head->next;
pool->queue_head =
pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO,
"destroy pool end!\n");
return 0;
}
函数tpool_thread定义了工作线程的函数,其中真正与实际任务有关的只有一行代码:
(*(my_work->handler_routine))(my_work->arg);
即执行my_work->handler_routine指针指向的函数,并传入参数my_work->arg.其他的步骤都是为执行这个任务而进行的各种设置和准备.
void *tpool_thread(void *tpool)
{
tpool_work_t
*my_work;
tpool_t *pool = (struct
tpool *)tpool;
for(;;)
{
pthread_mutex_lock(&(pool->queue_lock));
while((pool->cur_queue_size == 0) &&
(!pool->shutdown))
{
pthread_cond_wait(&(pool->queue_not_empty),
&(pool->queue_lock));
}
if(pool->shutdown)
{
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL);
}
my_work = pool->queue_head;
pool->cur_queue_size--;
if(pool->cur_queue_size == 0)
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
if((!pool->do_not_block_when_full)
&&\
(pool->cur_queue_size ==
(pool->max_queue_size - 1)))
{
pthread_cond_broadcast(&(pool->queue_not_full));
}
if(pool->cur_queue_size ==
0)
{
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
(*(my_work->handler_routine))(my_work->arg);
free(my_work);
}
return(NULL);
}
工作状态的记录
为了便于记录线程池的工作状态,还实现了一个记录服务器.该记录服务器实现了一中类似syslog的日志功能,总共就三个函数,使用十分简单.
log.h是该记录服务器的头文件.
#ifndef __LOG_H
#define __LOG_H
#include
#include
#define LOGLINE_MAX 1024
#define DEBUG 1
#define INFO 2
#define WARN 3
#define ERROR 4
#define FATAL 5
#define LOG_TRUNC 1<<0
#define LOG_NODATE 1<<1
#define LOG_NOLF 1<<2
#define LOG_NOLVL 1<<3
#define LOG_DEBUG 1<<4
#define LOG_STDERR 1<<5
#define LOG_NOTID 1<<6
typedef struct
{
int fd;
sem_t sem;
int flags;
} log_t;
int lprintf( log_t *log, unsigned int level, char *fmt, ...
);
log_t *log_open( char *fname, int flags );
void log_close( log_t *log );
#endif
该记录服务器和线程池没有直接的关系,如果有需要,可以很方便的该记录服务器移植到自己的项目中.
lprintf函数的功能是打印记录,log指针指向了一个通过log_open函数打开的记录文件,所有的记录信息都将保存在这个文件中.通过level参数指定记录内容的分类,在log.h中定义了6种分类,分别是:DEBUG,INFO,WARN,ERROR和FATAL,便于对大量的记录信息进行分类.
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ...
)
{
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char
line[LOGLINE_MAX];
static char
threadnum[10];
int cnt;
static char *levels[6] =
{ "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ",
"[error] ", "[fatal] " };
if(!log) return
-1;
if(
!(log->flags&LOG_DEBUG) && level == DEBUG ) return
0;
fd=log->fd;
if(
!(log->flags&LOG_NODATE) )
{
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
if(
!(log->flags&LOG_NOTID) )
{
sprintf(threadnum, "(TID:%lu) ",
pthread_self());
}
cnt = snprintf(line,
sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL ? ""
:
(level
> FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID ? "" :
threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt,
sizeof(line)-cnt, fmt, ap);
va_end(ap);
line[sizeof(line)-1] =
'\0';
if(
!(log->flags&LOG_NOLF) )
{
}
sem_wait(&log->sem);
rc = write(fd, line,
strlen(line));
if
(log->flags&LOG_STDERR)
write(2, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno =
0;
return rc;
}
log_open函数的作用是打开一个记录文件,其作用与fopen函数类似.
log_t *log_open( char *fname, int flags
)
{
log_t *log =
malloc(sizeof(log_t));
if(!log)
{
fprintf(stderr, "log_open: Unable to
malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-")
)
{
log->fd = 2;
}
else
{
log->fd = open(fname,
O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ?
O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1
)
{
fprintf(stderr, "log_open: Opening logfile %s:
%s", fname, strerror(errno));
goto log_open_b;
}
if(
sem_init(&log->sem, 0, 1) == -1 )
{
fprintf(stderr, "log_open: Could not initialize
log semaphore.");
goto log_open_c;
}
return log;
log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
log_close函数的作用是关闭一个打开的记录文件.通常在函数退出的时候执行这个函数,以保证所有记录信息都正确的写入记录文件.
void log_close( log_t *log )
{
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
log_open函数的作用是打开一个记录文件,其作用与fopen函数类似.
log_t *log_open( char *fname, int flags
)
{
log_t *log =
malloc(sizeof(log_t));
if(!log)
{
fprintf(stderr, "log_open: Unable to
malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-")
)
{
log->fd = 2;
}
else
{
log->fd = open(fname,
O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ?
O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1
)
{
fprintf(stderr, "log_open: Opening logfile %s:
%s", fname, strerror(errno));
goto log_open_b;
}
if(
sem_init(&log->sem, 0, 1) == -1 )
{
fprintf(stderr, "log_open: Could not initialize
log semaphore.");
goto log_open_c;
}
return log;
log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
log_close函数的作用是关闭一个打开的记录文件.通常在函数退出的时候执行这个函数,以保证所有记录信息都正确的写入记录文件.
void log_close( log_t *log )
{
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
线程池的测试
#include
#include "log.h"
#include "tpool.h"
log_t *log;
void thread(void *arg)
{
char * ptr=(char *)arg;
sleep(1);
printf("hello world! %s\n",ptr);
}
int main(int argc, char *argv[])
{
tpool_t *pool;
log=log_open("test.log",
0);
pool=tpool_init(100,200,1);
int i;
* 添加100个任务*/
for (i = 0;
i<100;i++)
tpool_add_work(pool,thread,"test!");
sleep(10);
tpool_destroy(pool,1);
log_close(log);
pthread_exit(NULL);
}
该例子演示了如何建立一个线程池,以及如何记录程序的运行状态,有一定的使用意义,稍加修改就可以应用到实际的项目中.能理解其中的设计思想和技巧,对自己编程能力的提高有很大的提高
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。