概述
ceph线程池
线程模型主要有manager/worker, pipeline, peer三种,参见https://computing.llnl.gov/tutorials/pthreads/.ceph的线程池是典型的manager/worker:即有一个manager(主进程或者主线程)负责线程池创建,开启,暂停,恢复,终止等管理操作,相反,线程池中的所有线程都属于worker线程,负责执行真正的任务.
// ceph线程池类
class ThreadPool:
主要数据成员:
CephContext *cct
string name: 线程池的名称
string thread_name: 线程池中的线程的名称
string lockname: 锁的名称
Mutex _lock: 锁
Cond _cond:
Cond _wait_cond:
bool _stop: 表示线程池是否处于终止状态
int _pause: 表示线程池中的线程是否是处于暂停状态
int _draining: 表示任务队列中的任务是否被全部执行完毕
int ioprio_class, ioprio_priority
unsigned _num_threads 线程池中的线程数量
string _thread_num_option
const char **_conf_keys
vector<WorkQueue_ *> work_queues 线程池处理的工作队列集,用vector来表示
vector中的每个元素都是一个队列
int last_work_queue 上一次处理的队列号
set<WorkThread *> _threads; 线程池中的线程集
list<WorkThread *> _old_threads; 线程池中多余的线程,需要调用join_old_threads()来释放
int processing 当前处理的任务数量
主要成员函数:
start() 启动线程池中的所有线程
start_threads():
start_threads():
创建num_threads个线程并添加到线程集_threads中,然后启动每个线程
线程池中的每个线程调用worker()处理任务
worker() 线程池中线程的执行函数
heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str()); // 线程的喂狗操作
首先确保_stop为false即线程池处于工作状态
调用join_old_threads来释放多余的线程集_old_threads
TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
tp_handle.reset_tp_timeout(); // 设置线程超时时间
如果_pause为0并且任务队列work_queues不为空,从任务队列中取出一个任务item进行执行
任务item的执行通过_void_process()来处理,执行完通过_void_process_finish()来处理
任务item执行过程中,主线程有可能因为pause()或者drain()处于等待状态,因此必须唤醒主线程.
任务item执行结束后,主动等待2s然后执行下一条任务
cct->get_heartbeat_map()->reset_timeout(hb, cct->_conf-threadpool_default_timeout, 0); // 清空线程超时时间
cct->get_heartbeat_map()->remove_worker(hb); // 取消喂狗操作
stop() 终止线程池中的所有线程
调用join_old_threads来释放多余的线程集_old_threads
终止当前线程集_threads中的每个线程
清空当前工作队列集_work_queus
pause() 暂停线程池中的所有线程
_pause++,等待所有的线程执行完当前的任务,然后返回
pause_new() 暂停线程池中的所有线程
_pause++,不等待所有的线程执行完当前的任务,然后返回
pause()和pause_new()的主要区别:
pause返回之后,所有的线程都处于空转即暂停状态
pause_new返回之后,某些线程可能仍然处于执行状态,要等待一段时间才能真正暂停
unpause() 恢复线程池中的所有线程
_pause--,同时唤醒正在等待的线程
线程暂停的真正的含义:线程只有两种状态,即执行和终止状态,而不存在所谓的暂停状态
这里的所说的暂停线程,只不过是暂停线程从任务队列中拿任务,让线程处于什么都不做的状态.
恢复线程指恢复线程从任务队列中拿任务进行处理.
暂停线程会执行_pause++,而恢复线程会执行_pause--,在线程的执行函数worker()中,会根据
_pause的值选择
drain() 执行完任务队列中的任务
等待线程执行完所有的任务然后返回
这里任务队列中为空,没有任务可供线程处理
线程池中的线程
struct WorkThread: public Thread
ThreadPool *pool:
void *entry() {
pool->worker(this)
return 0;
}
线程的TPHandle
线程的TPHandle:相当于Linux中的watchdog,主要用于监视ceph线程池中线程的运行状态,防止线程跑偏或者运行时间过长.每个线程在运行之前都会执行喂狗操作,设置超时时间,运行结束之后会清空超时时间,然后取消喂狗.具体的操作参考worker函数
class TPhandle:
heartbeat_handle_d *hb;
time_t grace; // 超时时间,如果线程运行时间超过grace,可以认为线程是unhealth状态,产生告警
time_t suicide_grace; // 自杀时间,如果线程运行时间超过suicide_grace,线程会因断言失败而自杀
void reset_tp_timeout() // reset the grace and suicide_grace
void suspend_tp_timeout() // clear the grace and suicide_grace
线程池的工作队列
struct WorkQueue_定义了工作队列的接口,并定义了几种不同类型的工作队列基类,它们都是模板的,可以处理各种不同的工作任务.例如:BatchWorkQueue, WorkQueueVal,WorkQueue,其中WorkQueue就最常用的一个,其定义了出队,入队.任务队列处理等接口,它们都是虚基类,用户需要定义自己的工作队列,一般情况下只要继承WorkQueue,并实现自己的出队,入队,任务处理即可.
ShardedThreadPool
shardedThreadPool和TheadPool的唯一区别是:后者处理的任务之间都是相互独立的,可以使用线程进行并行处理,而实际上有些任务之间是相互依赖的或者是有序的,例如读写IO之间通常是有序的,我们在进行IO的读写时,一定要注意IO的保序性,否则会出现数据不一致的状态.ShardedThreadPool正是处理些类任务的线程池.ceph只是提供了线程池的接口,没有提供具体的方法,具体的方法需要用户自己的定义,不过,可以参考osd中shardedThreadPool的处理方法,它位于文件src/osd/OSD.h.
最后
以上就是儒雅凉面为你收集整理的ceph threadpool源码分析的全部内容,希望文章能够帮你解决ceph threadpool源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复