概述
sylar库学习线程与协程
知识点
thread_local关键字,这个关键字在我理解看来是一个类似于static的关键字,但区别在于这个关键字所描述的数据的划分是线程,在单个线程内它与static没有区别,而不同的线程中thread_local关键字是独立的每个线程都有一份单独的,互不干扰的数据。因此这一关键字是构造多线程调度的关键,在线程类,协程类,调度器类都需要用到。
其中线程和线程执行其他函数之前,都是用一个函数将其他函数包装起来,以此来设置函数执行之前的参数
线程
由于使用的是Linux作为操作系统,要将程序中所得到的线程号与系统中使用ps命令查出来的编号相对应,最好使用linux系统中自带的库。也就是pthread
线程类主要是包装了
pthread_create(thread)
pthread_detach(~thread)
pthread_join(join)
pthread_setname_np(run)
这几个函数,这些都是C语言风格的函数,以pthread_t数据结构来进行函数的控制。
其中为了构造函数之前
Thread::Thread(std::function<void()> cb, const std::string &name):m_cb(cb),m_name(name) {
if (name.empty()) {
m_name ="UNKNOW";
}
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this); //线程创建,此处开始就立即执行
// 线程可能在构造函数返回之前就跑起来
if (rt) {
SYLAR_LOG_ERROR(g_logger)
<< "pthread_create thread fail, rt=" << rt << "name= " << name;
throw std::logic_error("pthread_create error");//逻辑错误由于程序内部逻辑而导致的错误
}
m_semaphore.wait();//可以等到跑起来在唤醒
}
void *Thread::run(void *arg) {//线程函数参数必须是void* POSIX线程库标准,run为静态成员,this指针必须显式传入
Thread* thread = (Thread *)arg;
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = sylar::GetThreadId();
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());//为线程设置唯一名称
std::function<void()> cb;
cb.swap(thread->m_cb);
thread->m_semaphore.notify();//线程执行到这的时候析构函数才能结束
cb();
return 0;
}
通过加了一个锁保证了执行顺序的问题,构造函数一直在等待,知道设置完函数,设置完名字,开始运行函数的时候才能析构完成
callback一下,其中的线程指针和线程名就是使用的thread_local来进行区分的。
协程
相比于线程只是运行一个函数来说,协程就是一个讲究人啊。其中设定了状态(设计模式中就有一个状态模式,咱这也是使用了设计模式的人上人了)。
协程的包装主要是控制的ucontext_t数据结构
其中
需要有三个参数需要设置
uc_link//后续上下文,一般置空指针
uc_stack.ss_sp//栈空间地址->对应malloc申请内存大小
uc_stack.ss_size//栈空间大小
包装有
getcontext //初始化,设置值(Fiber)
makecontext//类似于执行函数(Fiber或reset)
swapcontext//上下文切换,非常重要的函数。协程通过这个函数进行切换
其中总体结构为,设置一个主协程,在使用的时候通过主协程来进行控制
协程 <——>主协程<——>协程
每个线程中需要一个主协程在中间进行调度。
因此 thread_local关键字又开始派上用场了,
static thread_local Fiber *t_fiber = nullptr;//当前执行线程
static thread_local Fiber::ptr t_threadFiber = nullptr;//主线程
那么就有一个问题,创建协程的时候一定需要有一个主协程,因此可以用写一个GetThis函数,每次要交换之前获取一下,有则直接获取,无则创建一
Fiber::ptr Fiber::GetThis() {
if (t_fiber) {//若当前没有正在运行的协程,因此必无主协程
return t_fiber->shared_from_this();//有就直接返回当前指针weakptr
}
Fiber::ptr main_fiber(new Fiber);//创建时已经将指针放入了t_fiber!
SYLAR_ASSERT(t_fiber == main_fiber.get())
t_threadFiber = main_fiber;
return t_fiber->shared_from_this();
}
协程的其他函数都是基本围绕着swapcontext函数展开的主要是
swapin 将主协程切换到当前协程
swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)
swapout 将当前协程切换到主协程
swapcontext(&m_ctx, &t_threadFiber->m_ctx)
swapin 比较好理解切入后状态都变为EXEC(正在执行)
协程切出后需要变成什么状态就更值得商榷。
enum State { INIT, HOLD, EXEC, TERM, READY,EXCEPT };
// 初始化 保持 执行 结束 准备 异常
通过状态来判断下次调用时需要做什么工作,和线程不同的是他不swapin代码就不会执行
void Fiber::CallerMainFunc() {
Fiber::ptr cur = GetThis();//这里赋值了一次引用计数+1
SYLAR_ASSERT(cur);
try {
cur->m_cb();
cur->m_cb = nullptr;
cur->m_state = TERM;
} catch (std::exception& ex) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
<< " fiber_id=" << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
} catch (...) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except"
<< " fiber_id=" << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
auto raw_ptr = cur.get();//此处为什么要用指针也很有说法
cur.reset();
raw_ptr->back();//执行此处又回到了主协程,若直接用cur会卡到此处不能释放,去执行主协程了,而由于cb()已经执行完了,
//这里应该是需要销毁的状态,若直接使用只能指针会导致引用计数永远有1而不能释放
//永远不会执行到下面这一行,那么栈空间就不会被销毁
SYLAR_ASSERT2(false, "never reach fiber_id=" + std::to_string(raw_ptr->getId()));
}
调度管理器
调度管理器,核心自然是为了调度,调度的是什么呢,他应该是任务,或者是要执行的函数,因此要将两者存储成一种形式是有必要的。下面这一个类就是用来描述这种形式的。需要注意的是,这一形式需要可以以不同的形式进行初始化,可以直接使用函数,或者直接使用协程来进行构造,还可以通过reset直接清空掉,或者通过无参函数先占个位置,在与要执行的线程号绑定在一起方便调度。//注意,这是结构体,所有参数都是公共的,因此可以直接赋值。
struct FiberAndThread {
Fiber::ptr fiber;
std::function<void()> cb;
int thread;
FiberAndThread(Fiber::ptr f, int thr) : fiber(f), thread(thr) {}
FiberAndThread(Fiber::ptr *f, int thr) : thread(thr) { fiber.swap(*f); }
FiberAndThread(std::function<void()> f, int thr) : cb(f), thread(thr) {}
FiberAndThread(std::function<void()> *f, int thr) : thread(thr) {
cb.swap(*f);
}
FiberAndThread() : thread(-1) {}//方便初始化
void reset() {
fiber = nullptr;
cb = nullptr;
thread = -1;
}
};
对于调度管理器还有一个核心问题,那就是这个调度管理器应该在哪一个线程上呢?这一个线程没有主协程应该怎么办?又应该在哪一个协程上呢?
线程其实好解决,那个线程构造调度管理器就使用当前线程,因此创建管理器之前,首先要为创建管理器所在的线程上调用Fiber::GetThis()。目的是没有的话就创建一个主协程。那么应该在那一个协程上呢,很显然不会是一个线程的主协程,因为主协程忙着呢还有其他事要干,因此需要在创建一个协程,专职调度 。当然还有更简单粗暴的办法,创建一个线程专门给调度管理器使用!就不需要管这么多了。
static thread_local Scheduler *t_scheduler = nullptr;
static thread_local Fiber* t_fiber = nullptr;
很明显这表示了正在执行的管理器,和正在执行的协程。由于可能需要管理多个线程,因此没有线程指针也是可以理解的
解决了上述两个问题之后就到了令人头疼的正式调度程序了。在开始之前,怎么能忘记thread_local关键字呢
首先程序有两个可能的入口
1是通过start函数中的线程进入的
2是通过主协程里保存的函数进入的
对函数来说
首先是验证部分
当线程运行此函数时
第一步:遍历其所有的FiberAndThread 结构体并找到当前线程要运行的任务,若任务正在执行(EXEC)则跳过。
第二步:若要运行的协程
则判断其状态
(若协程不是结束TERM或异常EXCEPT 则执行线程(此处状态主要为HOLD,READY,INIT)
执行完后若是READY状态则再放回调度管理器内,要是没执行完就出来了则变为HOLD,(由于HOLD比较特殊,不能再由调度管理器代管,不然会再下一次循环中执行出来,因此将状态转为HOLD后就不再记录可以直接清除)
清除当前fiber
)
若当前需要执行的是函数的话
就创建一个协程,将函数装进去,执行此协程再执行上述步骤
若是空的(一般是经过上面执行完的Fiber会被reset为空)
则进行空闲状态的处理
空闲线程会在(stop函数执行后,且任务列表为空,且没有活跃线程,rootFiber未执行的时候将run函数结束掉
具体函数如下
void Scheduler::run() {
SYLAR_LOG_INFO(g_logger)<<m_name<<"run";
setThis();
if(sylar::GetThreadId() != m_rootThread) {//判断是否为主线程
t_fiber=Fiber::GetThis().get();
}
Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle,this)));
Fiber::ptr cb_fiber;
FiberAndThread ft;
while (true) {
ft.reset();
bool tickle_me = false;
bool is_active=false;
{//从消息队列中取出一个需要执行的消息
MutexType::Lock lock(m_mutex);
auto it = m_fibers.begin();
while (it != m_fibers.end()) {//找到是否是线程id
if (it->thread != -1 && it->thread != sylar::GetThreadId()) {//期望的线程id不是当前线程id
++it;
tickle_me = true;//通知其他线程
continue;
}
SYLAR_ASSERT(it->fiber || it->cb);
if (it->fiber && it->fiber->getState() == Fiber::EXEC) {//正在执行什么都不做
++it;
continue;
}
ft = *it;
m_fibers.erase(it);
++m_activeThreadCount;
is_active=true;
break;
}
// tickle_me|=it!=m_fibers.end();
}
if (tickle_me) {
tickle();
}
if (ft.fiber && (ft.fiber->getState() != Fiber::TERM&&ft.fiber->getState()!=Fiber::EXCEPT)) {
ft.fiber->swapIn();
--m_activeThreadCount;
if (ft.fiber->getState() == Fiber::READY) {
schedule(ft.fiber);
} else if (ft.fiber->getState() != Fiber::TERM &&
ft.fiber->getState() != Fiber::EXCEPT) {
ft.fiber->m_state=Fiber::HOLD;
}
ft.reset();
} else if (ft.cb) {
if (cb_fiber) {
cb_fiber->reset(ft.cb);
} else {
cb_fiber.reset(new Fiber(ft.cb));
}
ft.reset();
cb_fiber->swapIn();
--m_activeThreadCount;
if (cb_fiber->getState() == Fiber::READY) {
schedule(cb_fiber);
cb_fiber.reset();
} else if (cb_fiber->getState() == Fiber::EXCEPT ||
cb_fiber->getState() == Fiber::TERM) {
cb_fiber->reset(nullptr);
} else {//if (cb_fiber->getState() != Fiber::TERM) {
cb_fiber->m_state = Fiber::HOLD;
cb_fiber.reset();
}
} else {
if (is_active) {
--m_activeThreadCount;
continue;
}
if (idle_fiber->getState() == Fiber::TERM) {
SYLAR_LOG_INFO(g_logger)<<"idle fiber term";
break;
}
++m_idleThreadCount;
idle_fiber->swapIn();
--m_idleThreadCount;
if (idle_fiber->getState() != Fiber::TERM &&
idle_fiber->getState() != Fiber::EXCEPT) {//若回来的状态不等于结束状态时
idle_fiber->m_state = Fiber::HOLD;
}
}
}
}
最后
以上就是谨慎小丸子为你收集整理的sylar库学习线程、协程、调度管理器sylar库学习线程与协程的全部内容,希望文章能够帮你解决sylar库学习线程、协程、调度管理器sylar库学习线程与协程所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复