我是靠谱客的博主 典雅蜗牛,这篇文章主要介绍线程池原理及代码实现1 线程池原理2 线程池代码,现在分享给大家,希望可以做个参考。

1 线程池原理

线程池一般是要有一个执行队列(线程组),一个任务队列(线程待执行的任务),还需要一个能将执行队列与任务队列联系起来的管理组件;

2 线程池代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #define LL_ADD(item, list) do { item->prev = NULL; item->next = list; list = item; } while(0) #define LL_REMOVE(item, list) do { if (item->prev != NULL) item->prev->next = item->next; if (item->next != NULL) item->next->prev = item->prev; if (list == item) list = item->next; item->prev = item->next = NULL; } while(0) /* 执行队列 */ typedef struct NWORKER { pthread_t thread; int terminate; // 任务少时减少执行线程 struct NWORKQUEUE *workqueue; struct NWORKER *prev; struct NWORKER *next; } nWorker; /* 任务队列 */ typedef struct NJOB { void (*job_function)(struct NJOB *job); // 任务的回调函数,任务自己实现 void *user_data; // 任务传递的参数 // 使用链表将其变为队列 struct NJOB *prev; struct NJOB *next; } nJob; /* 管理组件(线程池) */ typedef struct NWORKQUEUE { struct NWORKER *workers; // 执行队列 struct NJOB *waiting_jobs; // 任务队列 pthread_mutex_t jobs_mtx; // 互斥锁(防止多个线程取同一个任务) pthread_cond_t jobs_cond; // 条件变量(无任务,执行队列休眠等待) } nWorkQueue; typedef nWorkQueue nThreadPool; /********************************************************************************** *********************函数功能:循环从任务队列获取任务并执行 *********************函数参数:ptr 每个线程的私有数据(nWorker) **********************************************************************************/ static void *WorkerThread(void *ptr) { nWorker *worker = (nWorker*)ptr; while (1) { pthread_mutex_lock(&worker->workqueue->jobs_mtx); while (worker->workqueue->waiting_jobs == NULL) { if (worker->terminate) break; pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx); } // 在任务较少时,释放该线程 if (worker->terminate) { pthread_mutex_unlock(&worker->workqueue->jobs_mtx); break; } nJob *job = worker->workqueue->waiting_jobs; if (job != NULL) { // 移除头结点 LL_REMOVE(job, worker->workqueue->waiting_jobs); } pthread_mutex_unlock(&worker->workqueue->jobs_mtx); if (job == NULL) continue; job->job_function(job); } free(worker); pthread_exit(NULL); } /********************************************************************************** *********************函数功能:创建线程池 *********************函数参数:workqueue 内存池管理组件 numWorkers 待创建的线程数量 **********************************************************************************/ int ThreadPoolCreate(nThreadPool *workqueue, int numWorkers) { if (numWorkers < 1) numWorkers = 1; memset(workqueue, 0, sizeof(nThreadPool)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond)); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx)); int i = 0; for (i = 0;i < numWorkers;i ++) { nWorker *worker = (nWorker*)malloc(sizeof(nWorker)); if (worker == NULL) { perror("malloc"); return 1; } memset(worker, 0, sizeof(nWorker)); worker->workqueue = workqueue; int ret = pthread_create(&worker->thread, NULL, WorkerThread, (void *)worker); if (ret) { perror("pthread_create"); free(worker); return 1; } LL_ADD(worker, worker->workqueue->workers); } return 0; } /********************************************************************************** *********************函数功能:任务量少时通知线程执行函数释放线程 *********************函数参数:workqueue 内存池管理组件 **********************************************************************************/ void ThreadPoolShutdown(nThreadPool *workqueue) { nWorker *worker = NULL; for (worker = workqueue->workers;worker != NULL;worker = worker->next) { worker->terminate = 1; } pthread_mutex_lock(&workqueue->jobs_mtx); workqueue->workers = NULL; workqueue->waiting_jobs = NULL; pthread_cond_broadcast(&workqueue->jobs_cond); pthread_mutex_unlock(&workqueue->jobs_mtx); } /********************************************************************************** *********************函数功能:向任务队列插入一个任务 *********************函数参数:workqueue 内存池管理组件 job 要插入的任务 **********************************************************************************/ void ThreadPoolQueue(nThreadPool *workqueue, nJob *job) { pthread_mutex_lock(&workqueue->jobs_mtx); LL_ADD(job, workqueue->waiting_jobs); pthread_cond_signal(&workqueue->jobs_cond); pthread_mutex_unlock(&workqueue->jobs_mtx); } /********************************************************************************** ***********************************线程池测试接口********************************** **********************************************************************************/ #define MAX_THREAD 10 #define COUNTER_SIZE 1000 void counter(nJob *job) { int index = *(int*)job->user_data; printf("index : %d, selfid : %lun", index, pthread_self()); free(job->user_data); free(job); } int main(int argc, char *argv[]) { nThreadPool pool; ThreadPoolCreate(&pool, MAX_THREAD); int i = 0; for (i = 0;i < COUNTER_SIZE;i ++) { nJob *job = (nJob*)malloc(sizeof(nJob)); if (job == NULL) { perror("malloc"); exit(1); } job->job_function = counter; job->user_data = malloc(sizeof(int)); *(int*)job->user_data = i; ThreadPoolQueue(&pool, job); } getchar(); printf("n"); }

最后

以上就是典雅蜗牛最近收集整理的关于线程池原理及代码实现1 线程池原理2 线程池代码的全部内容,更多相关线程池原理及代码实现1内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(238)

评论列表共有 0 条评论

立即
投稿
返回
顶部