IO协程调度模块
- epoll相关
- 内核事件表
- epoll_wait函数
- LT和ET模式
- EPOLLONESHOT事件
- IO协程调度模块概述
- IO协程调度器具体实现
- 总结
epoll相关
内核事件表
epoll是Linux特有的I/O复用函数。其使用一组函数来完成任务,将用户关心的文件描述符上的事件放在内核里的一个时间表中,无需像select和poll每次调用都要重复传入文件描述符集或事件集。epoll需要使用一个额外的文件描述符,来比唯一标识内核中的事件表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27#include <sys/epoll.h> // 创建文件描述符 // size只是提示事件表大小 int epoll_create(int size); // 该函数返回将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表 // 操作epoll的内核事件表 // fd:要操作的文件描述符 // op:指定操作类型:EPOLL_CTL_ADD(往事件表注册fd上的事件);EPOLL_CTL_MOD(修改fd上的注册事件);EPOLL_CTL_DEL(删除fd上的事件) // event:指定事件,是epoll_event结构指针类型 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // 调用成功时候返回0,失败返回-1并设置errno // epoll_event结构定义 struct epoll_event{ __uint32_t events; // epoll事件 epoll_data_t data; // 用户数据 }; // epoll_data_t是一个联合体,fd指定事件从属的目标描述符;ptr指定fd相关用户数据。 typedef union epoll_data{ void* ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
epoll_wait函数
epoll_wait是在一段超时时间内等待一组文件描述符上的事件。
1
2
3
4
5
6
7
8
9#include <sys/epoll.h> // maxevents指定最多监听多少个事件 // timeout定义超时时间 int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout); // 成功时返回就绪的文件描述符的个数,失败时返回-1,并设置errno; // epoll_wait函数如果检测到时间,就将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组中,该数组只用于输出epoll_wait检测到的就绪事件。 // select和poll即传入用户注册的事件,又输出内核检测到的就绪事件
举例:
1
2
3
4
5
6
7
8// 如何索引epoll返回的就绪文件描述符 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); // 遍历就绪的ret文件描述符 for(int i=0;i<ret;i++){ int sockfd = events[i].data.fd; // 处理sockfd }
LT和ET模式
epoll对文件描述符的操作有两种:LT(电平触发模式)和ET(边缘触发模式)。其中默认位LT,其相当于效率较高的poll。当往epoll内河事件表注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式是epoll的高效工作模式。
采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此通知给应用程序后,应用程序可以不立即处理该事件。当应用程序下次调用epoll_wait时,epoll_wait还会再次想用应用程序告知此事件,直到该事件被处理。
采用ET工作模式的文件描述符,当epoll_wait检测到其上有时间通知应用程序后,应用程序必须立即处理该事件,后续epoll_wait将不再向应用程序通知这一事件。因此ET模式降低了同一个epoll事件被重复出发的次数,效率比LT模式高。
EPOLLONESHOT事件
即使使用ET模式,一个socket上的某个事件还是可能被触发多次,因此在并发程序中,若一个线程读取完某个socket上的数据后开始处理这些数据,若是该socket又有新数据可读,此时另外的线程被唤醒读取新数据,导致两个线程同时操作一个socket局面。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统中最多出发其注册的的一个可读、可写或异常事件,且只能触发一次。这样,一个线程处理某个socket时,其他线程无法操作该socket。同hi,注册了EPOLLONESHOT事件的socket一旦被讴歌线程处理完毕,该线程将立即重置这个socket上的EPOLLONESHOT事件,确保这个socket下次可读时,EPOLLIN事件能被触发。
IO协程调度模块概述
在sylar的IO协程调度模块中,其直接继承了之前的协程调度器,基于epoll实现。其主要增加了IO事件调度功能、idle和tickle的重写、epoll事件的分类和取消事件的功能。具体的整个IO协程调度去阅读不是很难,但个人感觉如果对epoll这些编程不熟悉的话,对整个的理解是有点困难的,可以从一些简单的程序开始阅读理解,比如《Linux高性能编程》这本书看一看。
整的来看其实IO协程调度模块就是对之前的协程调度器进行了改在,然后结合epoll实现了IO事件的添加、取消、调度等功能。这里是使用的非阻塞IO,将嵌套字设置成非阻塞状态,然后与回调函数绑定,基于epoll_wait等待事件的发生。
在和别人讨论的过程中,很多大佬都表示IO这里是整个sylar框架中最有用或者最值得学习的内容,而且也是整个框架的核心部分,但是本人能力有限,无法更加深入的探究这个模块的精髓,只能结合程序去理解整个模块的设计以及大致功能。
IO协程调度器具体实现
首先sylar基于epoll对事件的定义,重新定义了IO事件,这里这里sylar仅关心读时间、写事件,其他的epoll事件都会将其归类于读写两类中
1
2
3
4
5
6enum Event{ NONE = 0x0, // 无事件 READ = 0x1, // 读事件 WRITE = 0x4 // 写事件 };
随后定义了socket事件上下文,每一个socket fd都对应了一个FdContext,包括fd、fd的事件、fd事件上下文,其中事件上下文用于保存这个事件的回调函数、执行回调函数的调度器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// socket事件上下文类 struct FdContext{ typedef Mutex MutexType; // 事件上下文类 // fd的每一个事件都有一个事件上下文,保存相关内容 struct EventContext{ Scheduler* scheduler = nullptr; // 事件执行调度器 Fiber::ptr fiber; // 事件的协程 std::function<void()> cb; // 事件的回调函数 }; // 获取上下文类 EventContext& getContext(Event event); // 重置事件上下文类 void resetContext(EventContext& ctx); // 触发事件 void triggerEvent(Event event); EventContext read; // 读事件 EventContext write; // 写事件 int fd; // 事件关联的句柄 Event events = NONE; // 已注册的事件 MutexType mutex; };
接着就是IOManager的几个主要方法和参数,首先是其private参数:
1
2
3
4
5
6
7
8
9
10
11// epoll 文件句柄 int m_epfd = 0; // pipe 文件句柄,fd[0]读端,fd[1]写端 int m_tickleFds[2]; // 当前等待执行的事件数量:原子操作 std::atomic<size_t> m_pendingEventCount = {0}; // IOManager的Mutex RWMutexType m_mutex; // socket事件上下文的容器 std::vector<FdContext*> m_fdContexts;
由于IOManager继承Schedule,所以其构造函数在首先要初始化一个scheduler,随后添加了支持epoll的功能,将其设置为非阻塞边缘触发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = ""); IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) :Scheduler(threads, use_caller, name) { // 创建文件描述符,返回将作用其他所有epoll系统调用的第一个参数,以指定要访问的内河事件表 m_epfd = epoll_create(5000); SYLAR_ASSERT(m_epfd > 0); // 创建一个管道pipe,获取m_tickleFds[2]。 int rt = pipe(m_tickleFds); SYLAR_ASSERT(!rt); // 注册pipe读句柄的可读事件,用于tickle调度协程,通过epoll_event.data.fd保存描述符 epoll_event event; // 将&event后面sizeof(epooll_event)的地址置0; memset(&event, 0, sizeof(epoll_event)); event.events = EPOLLIN | EPOLLET; event.data.fd = m_tickleFds[0]; // 设置文件句柄 m_tickleFds[0],非阻塞方式,配合边缘触发 rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); SYLAR_ASSERT(!rt); // 操作epoll内核事件表,EPOLL_CTL_ADD:往事件表注册fd上的事件 // 将管道的读描述符加入到epoll多路复用,如果管道可读,idle中的epoll_wait会返回 rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); SYLAR_ASSERT(!rt); contextResize(32); start(); // 开启调度,也就是说当IOManager创建时,就自动开始调度 }
而析构函数则是停止调度,关闭句柄,释放空间。
1
2
3
4
5
6
7
8
9
10
11
12
13IOManager::~IOManager(){ stop(); // 关闭调度 close(m_epfd); close(m_tickleFds[0]); close(m_tickleFds[1]); for(size_t i=0;i<m_fdContexts.size();i++){ if(m_fdContexts[i]){ delete m_fdContexts[i]; } } }
相比于schedule,IOManager是增加了添加事件、删除事件、取消事件等功能的,其中删除事件是不会触发事件的,但是取消事件和取消所有事件都会触发一次事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172// 1 success, 0 retry, -1 error int IOManager::addEvent(int fd, Event event, std::function<void()> cb) { // 找到fd对应的FdContext,如果不存在,则分配一个 FdContext* fd_ctx = nullptr; RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() > fd) { fd_ctx = m_fdContexts[fd]; lock.unlock(); } else { lock.unlock(); RWMutexType::WriteLock lock2(m_mutex); contextResize(fd * 1.5); fd_ctx = m_fdContexts[fd]; } // 同一个fd不允许重复添加相同的事情 FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(SYLAR_UNLIKELY(fd_ctx->events & event)) { SYLAR_LOG_ERROR(g_logger) << "addEvent assert fd=" << fd << " event=" << (EPOLL_EVENTS)event << " fd_ctx.event=" << (EPOLL_EVENTS)fd_ctx->events; SYLAR_ASSERT(!(fd_ctx->events & event)); } // 将新事件加入到epoll_wait,使用epoll_event的私有指针存储FdContext // EPOLL_CTL_MOD:修改fd上的事件 int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; epoll_event epevent; epevent.events = EPOLLET | fd_ctx->events | event; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events=" << (EPOLL_EVENTS)fd_ctx->events; return -1; } // 将待执行IO事件加一 ++m_pendingEventCount; // 找到这个fd的event事件对应的EventContext,对其中的scheduler,cb,fiber进行赋值 fd_ctx->events = (Event)(fd_ctx->events | event); FdContext::EventContext& event_ctx = fd_ctx->getContext(event); SYLAR_ASSERT(!event_ctx.scheduler && !event_ctx.fiber && !event_ctx.cb); // 复制scheduler和回调函数,如果回调函数为空,则将当前协程设为回调执行体 event_ctx.scheduler = Scheduler::GetThis(); if(cb) { event_ctx.cb.swap(cb); } else { event_ctx.fiber = Fiber::GetThis(); SYLAR_ASSERT2(event_ctx.fiber->getState() == Fiber::EXEC ,"state=" << event_ctx.fiber->getState()); } return 0; } bool IOManager::delEvent(int fd, Event event){ // 找到fd对应的FdContext RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(SYLAR_UNLIKELY(!(fd_ctx->events & event))) { return false; } // 清除指定的事件,表示不关心这个事件了,如果清除之后结果为0,则从epoll_wait中删除该文件描述符 Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 待执行事件数减1 --m_pendingEventCount; // 重置该fd对应的event事件上下文 fd_ctx->events = new_events; FdContext::EventContext& event_ctx = fd_ctx->getContext(event); fd_ctx->resetContext(event_ctx); return true; } bool IOManager::cancelEvent(int fd, Event event){ RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(SYLAR_UNLIKELY(!(fd_ctx->events & event))) { return false; } Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 取消会触发事件 fd_ctx->triggerEvent(event); --m_pendingEventCount; return true; } bool IOManager::cancelAll(int fd){ RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!fd_ctx->events) { return false; } int op = EPOLL_CTL_DEL; epoll_event epevent; epevent.events = 0; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 取消所有事件(读、写) if(fd_ctx->events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(fd_ctx->events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } SYLAR_ASSERT(fd_ctx->events == 0); return true; }
最后便是tickle、idle、stopping这些方法的重写,其中tikcle会向m_tickleFds写一个字节作为通知,而idle则是本人认为最重要的一部分,已经做了大量的注释大家可以直接参考:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149// 写pipe让idle协程从epoll_wait退出,待idle协程yield之后Scheduler::run就可以调度其他任务 // 如果当前没有空闲调度线程,那就没必要发通知 void IOManager::tickle() { if(!hasIdleThreads()) { // 若当前没有空闲调度线程,就不发送通知 return; } int rt = write(m_tickleFds[1], "T", 1); SYLAR_ASSERT(rt == 1); } bool IOManager::stopping(uint64_t& timeout) { timeout = getNextTimer(); // 对于IOManager而言,必须等所有待调度的IO事件都执行完了才可以退出 return timeout == ~0ull && m_pendingEventCount == 0 && Scheduler::stopping(); } bool IOManager::stopping() { uint64_t timeout = 0; return stopping(timeout); } /** * @brief idle协程 * 对于IO协程调度来说,应阻塞在等待IO事件上,idle退出的时机是epoll_wait返回,对应的操作是tickle或注册的IO事件就绪 * 调度器无调度任务时会阻塞idle协程上,对IO调度器而言,idle状态应该关注两件事 * 一是有没有新的调度任务,对应Schduler::schedule(),如果有新的调度任务,那应该立即退出idle状态,并执行对应的任务; * 二是关注当前注册的所有IO事件有没有触发,如果有触发,那么应该执行IO事件对应的回调函数 */ void IOManager::idle() { SYLAR_LOG_DEBUG(g_logger) << "idle"; // 一次epoll_wait最多检测256个就绪事件,如果就绪事件超过了这个数,那么会在下轮epoll_wati继续处理 const uint64_t MAX_EVNETS = 256; epoll_event* events = new epoll_event[MAX_EVNETS](); std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){ delete[] ptr; }); while(true) { uint64_t next_timeout = 0; if(SYLAR_UNLIKELY(stopping(next_timeout))) { SYLAR_LOG_INFO(g_logger) << "name=" << getName() << " idle stopping exit"; break; } int rt = 0; // do while结构至少会执行一次 do { static const int MAX_TIMEOUT = 3000; if(next_timeout != ~0ull) { next_timeout = (int)next_timeout > MAX_TIMEOUT ? MAX_TIMEOUT : next_timeout; } else { next_timeout = MAX_TIMEOUT; } // 阻塞在epoll_wait上,等待事件发生 rt = epoll_wait(m_epfd, events, MAX_EVNETS, (int)next_timeout); if(rt < 0 && errno == EINTR) { } else { break; } } while(true); std::vector<std::function<void()> > cbs; // 获取所需要执行的定时器的回调函数列表 listExpiredCb(cbs); if(!cbs.empty()) { // 调度需要执行的定时器回调函数 schedule(cbs.begin(), cbs.end()); cbs.clear(); } // 遍历所有发生的事件,根据epoll_event的私有指针找到对应的FdContext,进行事件处理 for(int i = 0; i < rt; ++i) { epoll_event& event = events[i]; if(event.data.fd == m_tickleFds[0]) { // ticklefd[0]用于通知协程调度,这时只需要把管道里的内容读完即可 // 本轮idle结束Scheduler::run会重新执行协程调度 uint8_t dummy[256]; while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0); continue; } // 通过epoll_event的私有指针获得FdContext FdContext* fd_ctx = (FdContext*)event.data.ptr; FdContext::MutexType::Lock lock(fd_ctx->mutex); /** * EPOLLERR: 出错,比如写读端已经关闭的pipe * EPOLLHUP: 套接字对端关闭 * 出现这两种事件,应该同时触发fd的读和写事件,否则有可能出现注册的事件永远执行不到的情况 */ if(event.events & (EPOLLERR | EPOLLHUP)) { event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; } int real_events = NONE; if(event.events & EPOLLIN) { real_events |= READ; } if(event.events & EPOLLOUT) { real_events |= WRITE; } if((fd_ctx->events & real_events) == NONE) { continue; } // 剔除已经发生的事件,将剩下的事件重新加入epoll_wait, // 如果剩下的事件为0,表示这个fd已经不需要关注了,直接从epoll中删除 int left_events = (fd_ctx->events & ~real_events); int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | left_events; int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); if(rt2) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):" << rt2 << " (" << errno << ") (" << strerror(errno) << ")"; continue; } // 处理已经发生的事件,也就是让调度器调度指定的函数或协程 if(real_events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(real_events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } } /** * 一旦处理完所有的事件,idle协程yield,这样可以让调度协程(Scheduler::run)重新检查是否有新任务要调度 * 上面triggerEvent实际也只是把对应的fiber重新加入调度,要执行的话还要等idle协程退出 */ Fiber::ptr cur = Fiber::GetThis(); auto raw_ptr = cur.get(); cur.reset(); raw_ptr->swapOut(); } }
总结
关于IOManager的用法在后面经常使用,具体使用方法可以多结合测试案例去了解,本人觉得整个IOManager的源码阅读不算困难,主要是要理清设计的目的这些,我个人也只是这两天才慢慢看懂在做什么,之前因为schedule模块看的也不是很清晰,所以这里一直是云里雾里,所以学习IOManager这一模块必须对前面的协程调度非常熟悉,又有一定的epoll相关知识,阅读和理解起来才会相对轻松点~
最后
以上就是着急板栗最近收集整理的关于[源码阅读]——Sylar服务器框架:IO协程调度模块的全部内容,更多相关[源码阅读]——Sylar服务器框架内容请搜索靠谱客的其他文章。
发表评论 取消回复