概述
1 线程池原理
线程池一般是要有一个执行队列(线程组),一个任务队列(线程待执行的任务),还需要一个能将执行队列与任务队列联系起来的管理组件;
2 线程池代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#define LL_ADD(item, list) do {
item->prev = NULL;
item->next = list;
list = item;
} while(0)
#define LL_REMOVE(item, list) do {
if (item->prev != NULL) item->prev->next = item->next;
if (item->next != NULL) item->next->prev = item->prev;
if (list == item) list = item->next;
item->prev = item->next = NULL;
} while(0)
/* 执行队列 */
typedef struct NWORKER {
pthread_t thread;
int terminate; // 任务少时减少执行线程
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
/* 任务队列 */
typedef struct NJOB {
void (*job_function)(struct NJOB *job); // 任务的回调函数,任务自己实现
void *user_data; // 任务传递的参数
// 使用链表将其变为队列
struct NJOB *prev;
struct NJOB *next;
} nJob;
/* 管理组件(线程池) */
typedef struct NWORKQUEUE {
struct NWORKER *workers; // 执行队列
struct NJOB *waiting_jobs; // 任务队列
pthread_mutex_t jobs_mtx; // 互斥锁(防止多个线程取同一个任务)
pthread_cond_t jobs_cond; // 条件变量(无任务,执行队列休眠等待)
} nWorkQueue;
typedef nWorkQueue nThreadPool;
/**********************************************************************************
*********************函数功能:循环从任务队列获取任务并执行
*********************函数参数:ptr 每个线程的私有数据(nWorker)
**********************************************************************************/
static void *WorkerThread(void *ptr)
{
nWorker *worker = (nWorker*)ptr;
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
// 在任务较少时,释放该线程
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
// 移除头结点
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
/**********************************************************************************
*********************函数功能:创建线程池
*********************函数参数:workqueue 内存池管理组件
numWorkers 待创建的线程数量
**********************************************************************************/
int ThreadPoolCreate(nThreadPool *workqueue, int numWorkers)
{
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++) {
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL) {
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, WorkerThread, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
/**********************************************************************************
*********************函数功能:任务量少时通知线程执行函数释放线程
*********************函数参数:workqueue 内存池管理组件
**********************************************************************************/
void ThreadPoolShutdown(nThreadPool *workqueue)
{
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/**********************************************************************************
*********************函数功能:向任务队列插入一个任务
*********************函数参数:workqueue 内存池管理组件
job 要插入的任务
**********************************************************************************/
void ThreadPoolQueue(nThreadPool *workqueue, nJob *job)
{
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/**********************************************************************************
***********************************线程池测试接口**********************************
**********************************************************************************/
#define MAX_THREAD 10
#define COUNTER_SIZE 1000
void counter(nJob *job)
{
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lun", index, pthread_self());
free(job->user_data);
free(job);
}
int main(int argc, char *argv[])
{
nThreadPool pool;
ThreadPoolCreate(&pool, MAX_THREAD);
int i = 0;
for (i = 0;i < COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
ThreadPoolQueue(&pool, job);
}
getchar();
printf("n");
}
最后
以上就是典雅蜗牛为你收集整理的线程池原理及代码实现1 线程池原理2 线程池代码的全部内容,希望文章能够帮你解决线程池原理及代码实现1 线程池原理2 线程池代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复