五、30-35课
协程调度模块
Scheduler为管理协程模块,use_caller表示是否复用当前线程
如果复用需要将Scheduler::run()放入协程执行
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14Scheduler —— —— template<>scheduler() | start() | run() | stop() 用户通过模板函数template<>scheduler()将待执行任务或协程放入管理器中 管理器启动后,为每个线程绑定run函数 在run函数中,为管理器运行逻辑,依次从m_fibers中取出FiberAndThread的智能指针,判断任务为cb还是fiber,统一交由协程执行,执行完毕后回到run函数中,再次取出下一个FiberAndThread stop函数中,依次通知不同线程
1) Scheduler()
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name) :m_name(name) { SYLAR_ASSERT(threads > 0); SYLAR_LOG_INFO(g_logger) << "Scheduler() "; // 复用该线程 if(use_caller) { // 创建主协程 sylar::Fiber::GetThis(); --threads; SYLAR_ASSERT(GetThis() == nullptr); t_scheduler = this; // 协程运行run方法 m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, true)); sylar::Thread::SetName(m_name); // 记录当前线程中的执行fiber t_fiber = m_rootFiber.get(); m_rootThread = sylar::GetThreadId(); // 当前线程加入线程池 m_threadIds.push_back(m_rootThread); } else { m_rootThread = -1; } m_threadCount = threads; }
2) start()
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19void Scheduler::start() { MutexType::Lock lock(m_mutex); if(!m_stopping) { return ; } m_stopping = false; SYLAR_ASSERT(m_threads.empty()); // 申请给定大小的线程池 m_threads.resize(m_threadCount); for(size_t i = 0;i<m_threadCount;++i) { // 每个线程绑定Scheduler::run函数 m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this) , m_name + "_" + std::to_string(i))); // 记录线程id m_threadIds.push_back(m_threads[i]->getId()); } lock.unlock(); }
3) run()
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114void Scheduler::run() { set_hook_enable(true); setThis(); if(sylar::GetThreadId() != m_rootThread) { t_fiber = Fiber::GetThis().get(); } // 空闲协程 后续子类重写Scheduler::idle方法实现epoll_wait阻塞 Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this))); Fiber::ptr cb_fiber; // 记录协程、线程、回调函数 FiberAndThread ft; SYLAR_LOG_INFO(g_logger) << "run"; while(true) { ft.reset(); bool tickle_me = false; bool is_active = false; { MutexType::Lock lock(m_mutex); auto it = m_fibers.begin(); while(it != m_fibers.end()) { // 当前线程不是指定执行线程,跳过 if(it->thread != -1 && it->thread != sylar::GetThreadId()) { ++it; tickle_me = true; continue; } SYLAR_ASSERT(it->fiber || it->cb); // 当前协程正在执行,跳过 if(it->fiber && it->fiber->getState() == Fiber::EXEC) { ++it; continue; } // 取出当前任务,需要执行 ft = *it; m_fibers.erase(it); ++m_activeThreadCount; is_active = true; break; } } if(tickle_me) { tickle(); } // 取出的是协程任务,使用协程执行 if(ft.fiber && (ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT)) { // 进入任务协程执行,本协程挂起 ft.fiber->swapIn(); --m_activeThreadCount; // 执行完后状态为READY,该协程继续放入管理器 if(ft.fiber->getState() == Fiber::READY) { scheduler(ft.fiber); } // 执行完后状态不是TERM和EXCEPT,更改状态为HOLD,下次再次执行 else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) { ft.fiber->m_state = Fiber::HOLD; } ft.reset(); } // 取出的是回调函数,放入协程执行 else if(ft.cb) { if(cb_fiber) { // 复用协程 cb_fiber->reset(ft.cb); } else { cb_fiber.reset(new Fiber(ft.cb)); } ft.reset(); // 挂起当前协程,进入任务协程 cb_fiber->swapIn(); --m_activeThreadCount; // 重置任务 状态为READY,该协程继续放入管理器 if(cb_fiber->getState() == Fiber::READY) { scheduler(cb_fiber); // 复用协程,重置 cb_fiber.reset(); } // 执行完毕或出错,协程置空 else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) { cb_fiber->reset(nullptr); } // 其他状态时,协程挂起,重置协程,等待下次使用 else{ cb_fiber->m_state = Fiber::HOLD; cb_fiber.reset(); } } // 协程任务,状态为执行超时或出错 else { if(is_active) { --m_activeThreadCount; continue; } if(idle_fiber->getState() == Fiber::TERM) { SYLAR_LOG_INFO(g_logger) << "idle fiber term"; break; } ++m_idleThreadCount; // 进入空闲协程 阻塞 idle_fiber->swapIn(); --m_idleThreadCount; if(idle_fiber->getState() != Fiber::TERM && idle_fiber->getState() != Fiber::EXCEPT) { // 挂起 idle_fiber->m_state = Fiber::HOLD; } } } }
4) stop()
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45void Scheduler::stop() { m_autoStop = true; if(m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber:: TERM || m_rootFiber->getState() == Fiber::INIT)) { SYLAR_LOG_INFO(g_logger) << this << "stopped"; m_stopping = true; if(stopping()) { return ; } } if(m_rootThread != -1) { SYLAR_ASSERT(GetThis() == this); } else { SYLAR_ASSERT(GetThis() != this); } m_stopping = true; for(size_t i=0;i<m_threadCount;++i) { tickle(); } if(m_rootFiber) { tickle(); } if(m_rootFiber) { if(!stopping()) { // 执行主协程 m_rootFiber->call(); } } std::vector<Thread::ptr> thrs; { MutexType::Lock lock(m_mutex); thrs.swap(m_threads); } // 依次唤醒线程,等待执行完毕 for(auto& x:thrs) { x->join(); } }
5) template<>scheduler()
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39// 单个任务存入管理器,thread=-1不指定运行线程 template<class FiberOrCb> void scheduler(FiberOrCb fc, int thread = -1) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); need_tickle = schedulerNoLock(fc, thread); } if(need_tickle) { // 子类重写,实现唤醒epoll_wait tickle(); } } // 实现迭代器存入管理器 template<class InputIterator> void scheduler(InputIterator begin, InputIterator end, int thread = -1) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); while(begin != end) { need_tickle = schedulerNoLock((&*begin), thread) || need_tickle; ++begin; } } if(need_tickle) { tickle(); } } template<class FiberOrCb> bool schedulerNoLock(FiberOrCb fc, int thread) { bool need_tickle = m_fibers.empty(); // 创建FiberAndThread FiberAndThread ft(fc, thread); if(ft.fiber || ft.cb) { m_fibers.push_back(ft); } return need_tickle; }
复制代码
1
2
3
4
5
6
7
8/* 使用 Scheduler sc(3, false, "test"); sc.start(); sc.scheduler(&test_fiber); sc.stop(); */
全部实现可见仓库,欢迎点赞。
最后
以上就是友好眼睛最近收集整理的关于Sylar_网络框架学习——协程调度模块(五)的全部内容,更多相关Sylar_网络框架学习——协程调度模块(五)内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复