我是靠谱客的博主 精明镜子,这篇文章主要介绍线程池分析及原理 C语言,现在分享给大家,希望可以做个参考。

线程池的作用:

  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 第三:提高线程的可管理性。
线程池的组成部分:

1、线程池管理器 : 用于创建并管理线程池
2、工作线程: 线程池中线程
3、任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行。
4、任务队列:用于存放没有处理的任务。提供一种缓冲机制。

对程序进行整体分析:
  • 1、在主函数中:
    • -> 先对线程池进行初始化,初始化各个数据并创建多个空闲线程,等待使用。
    • -> 传入任务,对任务进行打包并且传给任务队列。
    • -> 在打包函数中(threadpool_add)又设置条件变量(queue_not_empty),任务队列不为空的信号,让阻塞等待的线程进行接受。
    • -> 线程执行完任务后,删除该任务,又陷入while循环去等待传入任务队列不为空的信号
    • -> 最后对数据碎片进行回收处理
  • 2、threadpool_create函数对线程池所需要的数据进行初始化,并创建空闲线程。
  • 3、线程池中最重要的函数就是threadpool_thread
    • ->当无任务进入,并且不关闭线程池的条件下,去阻塞等待,直到队列不为空
    • ->判断是否有需要销毁的线程
    • ->判断是否需要关闭线程池
    • ->从任务队列中获取任务
    • ->对执行完的任务进行出队
    • ->告知threadpool_add线程任务队列不为满,可以添加新任务
    • ->执行任务
  • 4、threadpool_thread 和thread_add 两个函数配合进行,当任务队列不为空时,会对threadpool_thread函数发出信号,去处理任务,当任务队列不为满时,会让thread_add添加新任务
  • 5、记得使用完线程池要对数据进行销毁
  • 6、pthread_kill 函数
    该函数可以用于向指定的线程发送信号
    传递的pthread——kill的signal参数一般都是大于0的,这是系统默认或自定义的都有相应的处理程序。signal == 0时,是一个被保留的信号,一般用这个保留的信号测试线程是否存在。
    函数返回0: 调用成功。
    返回ESRCH:线程不存在。
    返回EINVAL:信号不合法。
复制代码
1
2
3
4
5
6
7
8
int kill_ret = pthread_kill(thread_id,0); if(kill_ret == ESRCH) printf("指定的线程不存在或者已经终止n"); else if(kill_ret == EINVAL) printf("调用传递一个无用的信号n"); else printf("线程存在n);
代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
#include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<string.h> #include<pthread.h> #include<signal.h> #include<errno.h> #include<assert.h> #define DEFAULT_TIME 10 // 10s检测一次 #define MIN_WAIT_TASK_NUM 10 //如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池 #define DEFAULT_THREAD_VARY 10 //每次创建和销毁线程的个数 #define true 1 #define false 0 typedef struct { void *(*function)(void * ); //函数指针,回调函数 void * arg;//上面函数的参数 }threadpool_task_t; // 各子线程任务结构体 //描述线程池相关信息 typedef struct threadpool_t { pthread_mutex_t lock; //用于锁住本结构体,和条件变量一起使用 pthread_mutex_t thread_counter; //记录忙状态线程个数的锁 pthread_cond_t queue_not_full; //当任务队列满时,添加任务的线程阻塞,等待此条件变量 pthread_cond_t queue_not_empty; //任务队列里不为空时,通知线程池中等待任务的线程 pthread_t * threads; //存放线程池中每个线程的tid 数组 pthread_t adjust_tid; // 存放管理线程tid threadpool_task_t *task_queue; //任务队列 int min_thr_num; //线程池最小线程数 int max_thr_num; //线程池最大线程数 int live_thr_num; //当前存活线程个数 int busy_thr_num; //忙状态线程个数 int wait_exit_thr_num; //要销毁的线程个数 int queue_front; //task_queue 队头下标 int queue_rear; // 队尾下标 int queue_size; // 队中实际任务数 int queue_max_size; // 队列可容纳任务数上限 int shutdown; // 标志位,线程池使用状态,true或false }threadpool_t; // 最重要的是线程函数,里面包含主要步骤 // 当无任务进入,并且不关闭线程池的条件下,去阻塞等待,直到队列不为空 // ->判断是否有需要销毁的线程 // ->判断是否需要关闭线程池 // ->从任务队列中获取任务 // ->对执行完的任务进行出队 // ->告知其他线程任务队列不为满,可以添加新任务 // ->执行任务 //对于条件变量相关的函数 //检测条件变天个cond,如果cond没有被设置,表示条件还不满足,别人还没有对con进行设置,此时pthread_cond_wait会进行休眠阻塞,直到别的线程设置cond表示条件准备好后,才会被休眠 //当调用pthread_cond_wait函数等待条件满足的线程只有一个时,就用pthread_cond_signal来唤醒,如果说有好多线程都调用pthread_cond_wait等待时,用pthread_cond_broadcast,它可将所有调用pthread_cond_wait而休眠的线程都唤醒, // //锁会与sleep产生冲突 void * threadpool_thread(void *threadpool); //线程池中工作线程要做的事情 void *adjust_thread(void * threadpool); //管理者线程要做的事情 int threadpool_free(threadpool_t * pool); //销毁 threadpool_t * threadpool_create(int min_thr_num,int max_thr_num,int queue_max_size) { int i; threadpool_t * pool = NULL; do { if((pool = ( threadpool_t * )malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail"); break; //跳出do while } pool->min_thr_num = min_thr_num ; //线程池最小线程数 pool->max_thr_num = max_thr_num ; //线程池最大线程数 pool->busy_thr_num = 0; //忙状态的线程数初始值为0 pool->live_thr_num = min_thr_num; //活着的线程数,初值= 最小线程数 pool->wait_exit_thr_num = 0; //要销毁的线程个数初始值也初始为0 pool->queue_size = 0; //任务队列实际元素个数初始值为0 pool->queue_max_size = queue_max_size; //任务队列最大元素个数 pool->queue_front = 0; pool->queue_rear = 0; pool->shutdown = false; //不关闭线程池 //根据最大线程上限数,给工作线程数组开辟空间,并归零 pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num ); //线程池中线程最大个数 if(pool->threads == NULL) { printf("malloc threads fail"); break; } memset(pool->threads,0,sizeof(pthread_t) * max_thr_num); //队列开辟空间 pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size); if(pool->task_queue == NULL) { printf("malloc task_queue fail"); break; } //初始化互斥锁,条件变量 if(pthread_mutex_init(&(pool->lock),NULL) !=0 || pthread_mutex_init(&(pool->thread_counter),NULL) !=0 || pthread_cond_init(&(pool->queue_not_empty),NULL) != 0 || pthread_cond_init(&(pool->queue_not_full),NULL) != 0) { printf("init the lock or cond fail"); break; } //启动min_thr_num 个work thread for(i = 0;i < min_thr_num;i++) { pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool); //pool指向当前线程池 printf("start thread 0x%x...n",(unsigned int )pool->threads[i]);//打印线程id } pthread_create(&(pool->adjust_tid),NULL,adjust_thread,(void *)pool); //启动管理者线程 pthread_detach(pool->adjust_tid);//把管理线程设置为分离的,系统帮助回收资源 printf("***************dajust_thread start******************n"); sleep(1); //等待线程创建完成再回到主函数中 return pool; }while(0); threadpool_free(pool); //前面代码调用失败时,释放pool存储空间 return NULL; } //线程池中各个工作线程 void * threadpool_thread(void * threadpool) { threadpool_t * pool = ( threadpool_t * )threadpool; threadpool_task_t task; while(true) { //刚创建出线程,等待队列里有任务,否则阻塞等待任务队列里有任务后在唤醒接受任务 pthread_mutex_lock(&(pool->lock)); //queue_size == 0 说明没有任务,调wait阻塞在条件变量上,若有任务,跳过while while((pool->queue_size == 0) && (!pool->shutdown)) //线程池没有任务且不关闭线程池 { printf("thread 0x%x is waitingn",(unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_not_empty),&(pool->lock)); //线程阻塞在这个条件变量上 //清楚指定书目的空闲线程,如果要结束的线程个数大于0,结束线程 if(pool->wait_exit_thr_num > 0) //要销毁的线程个数大于0 { pool->wait_exit_thr_num --; //如果线程池里线程个数大于最小值时可以结束当前线程 if(pool->live_thr_num > pool->min_thr_num ) { printf("thread 0x%x is exitingn",(unsigned int )pthread_self()); pool->live_thr_num --; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } } } //如果指定为true,要关闭线程池里每个线程,自行退出处理 if(pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); printf("thread 0x%x is exitingn",(unsigned int)pthread_self()); pthread_exit(NULL); //线程自行结束 } //从任务队列里获取任务,是一个出队操作 task.function = pool->task_queue[pool->queue_front].function; task.arg = pool->task_queue[pool->queue_front].arg; pool->queue_front = ( pool->queue_front + 1 ) % pool->queue_max_size; //对头指针向后移动 pool->queue_size --; //任务队列中出了一个元素,还有位置,唤醒阻塞在这个条件变量上的线程。现在通知可以有新的任务添加进来 pthread_cond_broadcast(&(pool->queue_not_full)); //queue_not_full 另一个条件变量 //任务取出后,立即将线程池锁解锁 pthread_mutex_unlock(&(pool->lock)); //执行任务 printf("thread 0x%x start workingn",(unsigned int)pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); //忙状态线程数变量锁 pool->busy_thr_num ++; //忙状态线程数+1 pthread_mutex_unlock(&(pool->thread_counter)); (task.function)(task.arg); //执行回调函数任务,相当于process(arg) //任务结束处理 printf("thread 0x%x end workingnn",(unsigned int )pthread_self()); usleep(10000); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num --; //处理掉一个任务,忙状态线程数-1 pthread_mutex_unlock(&(pool->thread_counter)); } pthread_exit(NULL); } int is_thread_alive(pthread_t tid) { int kill_rc = pthread_kill(tid,0); //发0号信号,测试线程是否存活 if(kill_rc == ESRCH) { return false; } return true; } void * adjust_thread(void * threadpool) { int i; threadpool_t * pool = ( threadpool_t *)threadpool; while(!(pool->shutdown)) //线程池没有关闭 { sleep(DEFAULT_TIME); //定时对线程池管理 printf("10s is finish,start test thread pooln"); pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; //关注任务数 int live_thr_num = pool->live_thr_num; //存活线程数 pthread_mutex_unlock(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; //忙着的线程数 printf("busy_thr_num is %dn",busy_thr_num); pthread_mutex_unlock(&(pool->thread_counter)); //创建新线程,算法: 任务书大于最小线程池个数,且存活的线程数少于最大线程个数时。如30>=10 && 40 < 100 if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) { printf("create nuw threadn"); pthread_mutex_lock(&(pool->lock)); int add = 0; //一次增加DEFAULT_THREAD个线程 for(i = 0;i< pool->max_thr_num && add < DEFAULT_THREAD_VARY &&pool->live_thr_num < pool->max_thr_num ; i++) { if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) { pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool); add ++; pool->live_thr_num; } } pthread_mutex_unlock(&(pool->lock)); } //销毁多余的空闲线程,忙线程×2 小于存货的线程数,且存活的线程数大于最小线程数时 if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num ) { printf("delete pthreadn"); //一次销毁DEFAULT_THREAD个线程。随机10个即可 pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; //要销毁的线程数设为10 pthread_mutex_unlock(&(pool->lock)); for(i = 0;i< DEFAULT_THREAD_VARY; i++) { //通知处在空闲状态的线程,他们会自行终止 pthread_cond_signal(&(pool->queue_not_empty)); } } } return NULL; } //向任务队列中,添加一个任务 int threadpool_add(threadpool_t * pool,void *(*function) (void * arg),void *arg) { pthread_mutex_lock(&(pool->lock)); //为真,队列已经满,调wait阻塞 while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) { pthread_cond_wait(&(pool->queue_not_full),&(pool->lock)); } if(pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); } //清空工作线程,调用的灰调函数的参数arg if(pool->task_queue[pool->queue_rear].arg != NULL) { free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; } //添加任务到任务队列里 pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; pool->queue_size ++; //添加完任务后,队列不为空,唤醒阻塞在为空的那个条件变量上中的线程 pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; } int threadpool_free(threadpool_t * pool) { if(pool == NULL) { return -1; } if(pool->task_queue) //释放任务队列 { free(pool->task_queue); } if(pool->threads) // 书房各种锁 { free(pool->threads); pthread_mutex_unlock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_mutex_unlock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0; } int threadpool_destroy(threadpool_t * pool) { int i; if(pool == NULL) return -1; pool->shutdown = true; for(i = 0;i < pool->live_thr_num;i++) { //通知所有的空闲线程 pthread_cond_broadcast(&(pool->queue_not_empty)); } for(i = 0; i< pool->live_thr_num ; i ++) { pthread_join(pool->threads[i],NULL); } threadpool_free(pool); return 0; } //线程池中的线程,模拟处理任务 void * process(void * arg) { printf("thread 0x%x working on task %dn",(unsigned int)pthread_self(),*(int *)arg); printf("task %d is end n",*(int *)arg); return NULL; } int main() { threadpool_t *thp = threadpool_create(3,100,100); //创建线程池,池里最小三个线程,最大100,任务队列最大100 printf("pool initedn"); int num[20],i; for(i = 0;i < 20;i++) { num[i] = i; printf("add task %d n",i); threadpool_add(thp,process,(void *)&num[i]); //向任务队列中添加任务 } sleep(10); //等子线程完成任务 printf("clean up thread pool n"); sleep(1); threadpool_destroy(thp); return 0; }

最后

以上就是精明镜子最近收集整理的关于线程池分析及原理 C语言的全部内容,更多相关线程池分析及原理内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(105)

评论列表共有 0 条评论

立即
投稿
返回
顶部