概述
线程池包括任务队列,线程数组,管理模块几个部分,依次实现这几个模块
typedef struct tk_task{
void (*func)(void*);
void* arg;
struct tk_task* next;
// 任务链表(下一节点指针)
}tk_task_t;
任务队列,用单链表的方式实现
线程池的管理模块用以下
typedef struct threadpool{
pthread_mutex_t lock;
// 互斥锁
pthread_cond_t cond;
// 条件变量
pthread_t *threads;
// 线程指针
tk_task_t *head;
// 任务链表头节点
int thread_count;
// 线程数
int queue_size;
// 任务链表长
int shutdown;
// 关机模式
int started;
// 开机模式
}tk_threadpool_t;
要实现以下几个方法
tk_threadpool_t* threadpool_init(int thread_num);
int threadpool_add(tk_threadpool_t* pool, void (*func)(void *), void* arg);
int threadpool_destroy(tk_threadpool_t* pool, int gracegul);
初始化方法要来初始化任务节点,创建线程数组,设置一系列初始化等操作
在创建线程时,给线程绑定上一个函数,这个函数定义了线程需要执行的操作
// 初始化线程池
tk_threadpool_t *threadpool_init(int thread_num){
// 分配线程池
tk_threadpool_t* pool;
if((pool = (tk_threadpool_t *)malloc(sizeof(tk_threadpool_t))) == NULL)
goto err;
// threads指针指向线程数组(存放tid),数组大小即为线程数
pool->thread_count = 0;
pool->queue_size = 0;
pool->shutdown = 0;
pool->started = 0;
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thread_num);
// 分配并初始化task头结点
pool->head = (tk_task_t*)malloc(sizeof(tk_task_t));
if((pool->threads == NULL) || (pool->head == NULL))
goto err;
pool->head->func = NULL;
pool->head->arg = NULL;
pool->head->next = NULL;
// 初始化锁
if(pthread_mutex_init(&(pool->lock), NULL) != 0)
goto err;
// 初始化条件变量
if(pthread_cond_init(&(pool->cond), NULL) != 0)
goto err;
// 创建线程
for(int i = 0; i < thread_num; ++i){
if(pthread_create(&(pool->threads[i]), NULL, threadpool_worker, (void*)pool) != 0){
threadpool_destory(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
err:
if(pool)
threadpool_free(pool);
return NULL;
}
线程绑定的函数是
void *threadpool_worker(void *arg){
if(arg == NULL)
return NULL;
tk_threadpool_t *pool = (tk_threadpool_t *)arg;
tk_task_t *task;
while(1){
// 对线程池上锁
pthread_mutex_lock(&(pool->lock));
// 没有task且未停机则阻塞
while((pool->queue_size == 0) && !(pool->shutdown))
pthread_cond_wait(&(pool->cond), &(pool->lock));
// 立即停机模式、平滑停机且没有未完成任务则退出
if(pool->shutdown == immediate_shutdown)
break;
else if((pool->shutdown == graceful_shutdown) && (pool->queue_size == 0))
break;
// 得到第一个task
task = pool->head->next;
// 没有task则开锁并进行下一次循环
if(task == NULL){
pthread_mutex_unlock(&(pool->lock));
continue;
}
// 存在task则取走并开锁
pool->head->next = task->next;
pool->queue_size--;
pthread_mutex_unlock(&(pool->lock));
// 设置task中func参数
(*(task->func))(task->arg);
free(task);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return NULL;
}
线程首先对整个线程池进行加锁,如果发现线程池已经加锁了,那么阻塞等待。如果发现任务队列为空,那么条件等待。线程池从任务队列拿一个任务,执行任务队列中的函数。
下面是像条件变量中添加任务。
int threadpool_add(tk_threadpool_t* pool, void (*func)(void *), void *arg){
int rc, err = 0;
if(pool == NULL || func == NULL)
return -1;
if(pthread_mutex_lock(&(pool->lock)) != 0)
return -1;
// 已设置关机
if(pool->shutdown){
err = tk_tp_already_shutdown;
goto out;
}
// 新建task并注册信息
tk_task_t *task = (tk_task_t *)malloc(sizeof(tk_task_t));
if(task == NULL)
goto out;
task->func = func;
task->arg = arg;
// 新task节点在head处插入
task->next = pool->head->next;
pool->head->next = task;
pool->queue_size++;
rc = pthread_cond_signal(&(pool->cond));
out:
if(pthread_mutex_unlock(&pool->lock) != 0)
return -1;
return err;
}
最后
以上就是冷傲母鸡为你收集整理的linux C实现线程池的全部内容,希望文章能够帮你解决linux C实现线程池所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复