概述
简介
在嵌入式系统环境下,由于系统资源和任务的特点,多线程成了实现多任务处理的重要方式.在一些常见的应用环境中,如Web服务器,Email服务器以及数据库服务器等都具有一个共同点:单位时间内必须处理很多并发的连接请求,但处理时间却相对较短.传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数及其频繁,那么服务器将处于不停的创建线程,销毁线程的状态.
线程池是采用多线程解决方案来提高系统性能的一个最为出色的模型,它通过预创建一定数量的工作线程来对系统进行并发处理,使得系统的运行效率提高.因为线程池能使得系统采用较为轻量的,可控的系统资源实现系统并发处理能力的最大化,所以许多应用软件都采用了线程池模型.
除此之外,线程是能够限制创建的进程个数.通常线程池所允许的并发线程具有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将等待.而传统方案中,如果同时请求数据为200,那么最坏情况下,系统可能需要产生200个线程.尽管这不是一个很大的数目,但是也有系统达不到这个要求.
线程池模型有许多种,常见的有三种:任务队列控制的线程池模型,工作线程控制的线程池模型,主控线程控制的线程池模型.本文将给出一中任务队列控制的线程池模型的具体实现.
任务队列控制的线程池模型是通过任务队列来对线程池进行并发调度,如下图所示.线程池是由预创建的一个任务队列和一组工作线程组成,其中任务队列中存放工作对象.线程池启动后,工作线程将采用轮询的方式从任务队列中获取任务对象,由于初始化时任务队列中不存在任务对象,这时的信号量为0,所有的工作线程都处于阻塞状态.主控线程将任务对象放入任务队列中,并将信号量加1,这样信号量就会唤醒一个阻塞中的工作线程(操作系统层面决定唤醒哪个阻塞的工作线程).工作线程唤醒后从任务队列中获取一个任务对象并执行该任务,执行完后,工作线程将再次访问信号量,如果信号信号量大于0,那么工作线程将继续从任务队列中获取任务对象并执行,知道信号量等于0,这时的工作线程将再次被阻塞.
任务队列控制的线程模型主要是通过任务队列上的信号来控制线程池中的线程调度.
线程池的实现
本例程共由5个文件构成:tpool.h,tpool.c,log.c,log.h,testpool.c.其中tpool.h,tpool.c是实现线程池的核心文件,在tpool.h中定义了线程池和工作线程的数据结构及创建线程池和添加工作线程等方法,tpool.c中为具体的实现代码.log.c,log.h提供了一种记录文件的生成手段,能在文件中记录自定义的信息.testpool.c为线程池的测试程序.
该例子由多个源文件组成,编译命令如下:gcc -g -pthread -o testpool testpool.c tpool.c log.c
下面给出这个例子的源代码,首先是线程池的定义文件tpool.h:
/*-------------------------------------------------------------------------
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include
#include
/*工作线程链表*/
typedef struct tpool_work
{
void (*handler_routine)(); /*任务函数指针*/
void *arg; /*任务函数参数*/
struct tpool_work *next; /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool
{
int num_threads; /*最大线程数*/
int max_queue_size; /*最大任务链表数*/
int do_not_block_when_full; /*当链表满时是否阻塞*/
pthread_t *threads; /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head; /*链表头*/
tpool_work_t *queue_tail; /*链表尾*/
pthread_mutex_t queue_lock; /*链表互斥量*/
pthread_cond_t queue_not_full; /*链表条件量-未满*/
pthread_cond_t queue_not_empty; /*链表条件量-非空*/
pthread_cond_t queue_empty; /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */
相关阅读:
在这个文件中定义了线程池的结构,工作线程函数的原型以及线程池的基本操作.线面给出的tpool.c是线程池的实现.函数tpool_init完成了线程池的初始化操作,包括对内存的设置,对线程属性的设置和池中线程的预创建.
/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include
#include
#include
#include
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads, /*线程池线程个数*/
int max_queue_size, /*最大任务数*/
int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL)
{
lprintf(log, FATAL, "Unable to malloc() thread pool!n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads; /*工作线程个数*/
pool->max_queue_size = max_queue_size; /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full; /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
{
lprintf(log, FATAL,"Unable to malloc() thread info arrayn");
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */
for(i = 0; i != num_worker_threads; i++)
{
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0)
{
lprintf(log,FATAL,"pthread_create %sn",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread %d!n",i);
}
lprintf(log, INFO, "init pool end!n");
return pool;
}
函数tpool_add_work为线程池添加了一个工作线程.因为预创建的线程是不能做任何工作的,只有分配了适当的任务后,才会使预创建的线程真正的工作起来.
int tpool_add_work(tpool_t *pool, /*线程池指针*/
void (*routine)(void *), /*工作线程函数指针*/
void *arg) /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failuren");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) &&
(pool->do_not_block_when_full))
{
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failuren");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed)))
{
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0)
{
lprintf(log,FATAL,"pthread cond wait failuren");
return -1;
}
}
if(pool->shutdown || pool->queue_closed)
{
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failuren");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
{
lprintf(log,FATAL,"unable to create work structn");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0)
{
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0)
{
lprintf(log,FATAL,"pthread broadcast errorn");
return -1;
}
}
else
{
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failuren");
return -1;
}
return 0;
}
当线程池退出后,需要释放所用的资源,包括以下五个步骤:
设置线程退出标记;
禁止添加新任务到任务链表;
设置线程池销毁标记;
等待所有已经建立的线程退出;
释放线程池所占的内存空间.
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur; /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failuren");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!n");
if(pool->queue_closed || pool->shutdown)
{
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failuren");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!n");
pool->queue_closed = 1;
if(finish)
{
while(pool->cur_queue_size != 0)
{
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0)
{
lprintf(log,FATAL,"pthread_cond_wait %dn",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0)
{
lprintf(log,FATAL,"pthread mutex unlock failuren");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0)
{
lprintf(log,FATAL,"pthread_cond_boradcast %dn",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full))) != 0)
{
lprintf(log,FATAL,"pthread_cond_boradcast %dn",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++)
{
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %dn",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL)
{
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!n");
return 0;
}
函数tpool_thread定义了工作线程的函数,其中真正与实际任务有关的只有一行代码:
(*(my_work->handler_routine))(my_work->arg);
即执行my_work->handler_routine指针指向的函数,并传入参数my_work->arg.其他的步骤都是为执行这个任务而进行的各种设置和准备.
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;
for(;;)
{/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock));
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止 */
while((pool->cur_queue_size == 0) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown)
{
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL); /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&
(pool->cur_queue_size == (pool->max_queue_size - 1)))
{
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0)
{
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work);
}
return(NULL);
}
工作状态的记录
为了便于记录线程池的工作状态,还实现了一个记录服务器.该记录服务器实现了一中类似syslog的日志功能,总共就三个函数,使用十分简单.
log.h是该记录服务器的头文件.
/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H
#include
#include
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO 2
#define WARN 3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC 1<<0
#define LOG_NODATE 1<<1
#define LOG_NOLF 1<<2
#define LOG_NOLVL 1<<3
#define LOG_DEBUG 1<<4
#define LOG_STDERR 1<<5
#define LOG_NOTID 1<<6
typedef struct
{
int fd;
sem_t sem;
int flags;
} log_t;
/*
* 功能描述: 记录打印函数,将记录打印至记录文件logfile。
* 参数: log_t - log_open()函数的返回值
* level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
* fmt - 记录的内容,格式同printf()函数
* 返回值: 成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数: fname - 记录文件logfile的文件名
* flags - 记录格式的选项
* LOG_TRUNC - 截断打开的记录文件
* LOG_NODATE - 忽略记录中的每一行
* LOG_NOLF - 自动为每条记录新开一行.
* LOG_NOLVL - 不记录消息的等级
* LOG_STDERR - 将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );
/*
* 功能描述:关闭记录文件
* 参数: * log - 记录文件的指针
*/
void log_close( log_t *log );
#endif
该记录服务器和线程池没有直接的关系,如果有需要,可以很方便的该记录服务器移植到自己的项目中.
lprintf函数的功能是打印记录,log指针指向了一个通过log_open函数打开的记录文件,所有的记录信息都将保存在这个文件中.通过level参数指定记录内容的分类,在log.h中定义了6种分类,分别是:DEBUG,INFO,WARN,ERROR和FATAL,便于对大量的记录信息进行分类.
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... )
{
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ", "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) )
{
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='