我是靠谱客的博主 典雅蜗牛,最近开发中收集的这篇文章主要介绍线程池原理及代码实现1 线程池原理2 线程池代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1 线程池原理

线程池一般是要有一个执行队列(线程组),一个任务队列(线程待执行的任务),还需要一个能将执行队列与任务队列联系起来的管理组件;

2 线程池代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#define LL_ADD(item, list) do { 	
	item->prev = NULL;				
	item->next = list;				
	list = item;					
} while(0)

#define LL_REMOVE(item, list) do {						
	if (item->prev != NULL) item->prev->next = item->next;	
	if (item->next != NULL) item->next->prev = item->prev;	
	if (list == item) list = item->next;					
	item->prev = item->next = NULL;							
} while(0)

/* 执行队列 */
typedef struct NWORKER {
	pthread_t thread;
	int terminate; // 任务少时减少执行线程
	struct NWORKQUEUE *workqueue;
	struct NWORKER *prev;
	struct NWORKER *next;
} nWorker;

/* 任务队列 */
typedef struct NJOB {
	void (*job_function)(struct NJOB *job);	// 任务的回调函数,任务自己实现
	void *user_data;	// 任务传递的参数
		
	// 使用链表将其变为队列
	struct NJOB *prev;
	struct NJOB *next;
} nJob;

/* 管理组件(线程池) */
typedef struct NWORKQUEUE {
	struct NWORKER *workers;	// 执行队列
	struct NJOB *waiting_jobs;	// 任务队列
	pthread_mutex_t jobs_mtx;	// 互斥锁(防止多个线程取同一个任务)
	pthread_cond_t jobs_cond;	// 条件变量(无任务,执行队列休眠等待)
} nWorkQueue;

typedef nWorkQueue nThreadPool;

/**********************************************************************************
*********************函数功能:循环从任务队列获取任务并执行
*********************函数参数:ptr    每个线程的私有数据(nWorker)
**********************************************************************************/
static void *WorkerThread(void *ptr) 
{
	nWorker *worker = (nWorker*)ptr;

	while (1) {
		pthread_mutex_lock(&worker->workqueue->jobs_mtx);

		while (worker->workqueue->waiting_jobs == NULL) {
			if (worker->terminate) break;
			pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
		}

		// 在任务较少时,释放该线程
		if (worker->terminate) {
			pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
			break;
		}
		
		nJob *job = worker->workqueue->waiting_jobs;
		if (job != NULL) {
			// 移除头结点
			LL_REMOVE(job, worker->workqueue->waiting_jobs);
		}
		
		pthread_mutex_unlock(&worker->workqueue->jobs_mtx);

		if (job == NULL) continue;

		job->job_function(job);
	}

	free(worker);
	pthread_exit(NULL);
}

/**********************************************************************************
*********************函数功能:创建线程池
*********************函数参数:workqueue    内存池管理组件
							  numWorkers   待创建的线程数量
**********************************************************************************/
int ThreadPoolCreate(nThreadPool *workqueue, int numWorkers) 
{
	if (numWorkers < 1) numWorkers = 1;
	memset(workqueue, 0, sizeof(nThreadPool));
	
	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
	
	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));

	int i = 0;
	for (i = 0;i < numWorkers;i ++) {
		nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
		if (worker == NULL) {
			perror("malloc");
			return 1;
		}

		memset(worker, 0, sizeof(nWorker));
		worker->workqueue = workqueue;

		int ret = pthread_create(&worker->thread, NULL, WorkerThread, (void *)worker);
		if (ret) {
			
			perror("pthread_create");
			free(worker);

			return 1;
		}

		LL_ADD(worker, worker->workqueue->workers);
	}
	return 0;
}

/**********************************************************************************
*********************函数功能:任务量少时通知线程执行函数释放线程
*********************函数参数:workqueue    内存池管理组件
**********************************************************************************/
void ThreadPoolShutdown(nThreadPool *workqueue) 
{
	nWorker *worker = NULL;

	for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
		worker->terminate = 1;
	}

	pthread_mutex_lock(&workqueue->jobs_mtx);

	workqueue->workers = NULL;
	workqueue->waiting_jobs = NULL;

	pthread_cond_broadcast(&workqueue->jobs_cond);

	pthread_mutex_unlock(&workqueue->jobs_mtx);
	
}

/**********************************************************************************
*********************函数功能:向任务队列插入一个任务
*********************函数参数:workqueue    内存池管理组件
							  job          要插入的任务
**********************************************************************************/
void ThreadPoolQueue(nThreadPool *workqueue, nJob *job) 
{
	pthread_mutex_lock(&workqueue->jobs_mtx);

	LL_ADD(job, workqueue->waiting_jobs);
	
	pthread_cond_signal(&workqueue->jobs_cond);
	pthread_mutex_unlock(&workqueue->jobs_mtx);	
}



/**********************************************************************************
***********************************线程池测试接口**********************************
**********************************************************************************/
#define MAX_THREAD			10
#define COUNTER_SIZE		1000

void counter(nJob *job) 
{
	int index = *(int*)job->user_data;
	printf("index : %d, selfid : %lun", index, pthread_self());
	
	free(job->user_data);
	free(job);
}

int main(int argc, char *argv[]) 
{
	nThreadPool pool;
	ThreadPoolCreate(&pool, MAX_THREAD);
	
	int i = 0;
	for (i = 0;i < COUNTER_SIZE;i ++) {
		nJob *job = (nJob*)malloc(sizeof(nJob));
		if (job == NULL) {
			perror("malloc");
			exit(1);
		}		
		job->job_function = counter;
		job->user_data = malloc(sizeof(int));
		*(int*)job->user_data = i;

		ThreadPoolQueue(&pool, job);		
	}
	getchar();
	printf("n");
}

最后

以上就是典雅蜗牛为你收集整理的线程池原理及代码实现1 线程池原理2 线程池代码的全部内容,希望文章能够帮你解决线程池原理及代码实现1 线程池原理2 线程池代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(315)

评论列表共有 0 条评论

立即
投稿
返回
顶部