概述
一.概述:
coroutine是一个非常简单好用的协程库, 其依赖于ucontext, 可用于实现简单的协程应用. 本文对coroutine进行简单的分析
二.结构体分析:
coroutine在设计中分为两个组件, 一个是调度器 schedule
, 一个是协程调度实体 coroutine
, coroutine
用于标明协程实例的相关信息, schedule
用于对当前环境中的协程运行状态进行记录.
// coroutine.c
// 协程调度器
struct schedule {
char stack[STACK_SIZE];
ucontext_t main; // 正在running的协程在执行完后需切换到的上下文,由于是非对称协程,所以该上下文用来接管协程结束后的程序控制权
int nco; // 调度器中已保存的协程数量
int cap; // 调度器中协程的最大容量
int running; // 调度器中正在running的协程id
struct coroutine **co; // 连续内存空间,用于存储所有协程任务
};
在调度器结构体中, 包含了一个栈 schedule.stack
一个 main
上下文, 用于当running的协程执行完毕后接管程序控制权, 还有一个 coroutine*
数组, 用于保存当前所有的协程任务, co
的默认大小为16, 可以通过修改 DEFAULT_COROUTINE
进行修改
// 协程任务类型
struct coroutine {
coroutine_func func; // 协程函数
void *ud; // 协程函数的参数(用户数据)
ucontext_t ctx; // 协程上下文
struct schedule * sch; // 协程所属的调度器
// ptrdiff_t定义在stddef.h(cstddef)中,通常被定义为long int类型,通常用来保存两个指针减法操作的结果.
ptrdiff_t cap; // 协程栈的最大容量
ptrdiff_t size; // 协程栈的当前容量
int status; // 协程状态(COROUTINE_DEAD/COROUTINE_READY/COROUTINE_RUNNING/COROUTINE_SUSPEND)
char *stack; // 协程栈
};
注意对于每一个协程都有一个 char* stack
的私有栈, 关于这个栈的相关问题, 后续会说明.
三.API函数:
struct schedule * coroutine_open(void); // 创建协程调度器
void coroutine_close(struct schedule *); // 关闭协程调度器
int coroutine_new(struct schedule *, coroutine_func, void *ud); // 创建协程任务,将其加入调度器中
void coroutine_resume(struct schedule *, int id); // 恢复协程号为id的协程任务
int coroutine_status(struct schedule *, int id); // 根据协程任务id返回协程的当前状态
int coroutine_running(struct schedule *); // 返回调度器S中正在running的协程任务id
void coroutine_yield(struct schedule *); // 保存当前上下文后中断当前协程的执行
四.流程:
下面我们通过一个例子来观察一下coroutine的整体流程:
#define COROUTINE_DEAD 0
#define COROUTINE_READY 1
#define COROUTINE_RUNNING 2
#define COROUTINE_SUSPEND 3
struct args {
int n;
};
static void foo(struct schedule * S, void *ud) {
struct args * arg = ud;
int start = arg->n;
int i;
for (i=0;i<5;i++) {
printf("coroutine %d : %dn",coroutine_running(S) , start + i);
coroutine_yield(S);
}
}
static void test(struct schedule *S) {
struct args arg1 = { 0 };
struct args arg2 = { 100 };
int co1 = coroutine_new(S, foo, &arg1);
int co2 = coroutine_new(S, foo, &arg2);
printf("main startn");
while (coroutine_status(S,co1) && coroutine_status(S,co2)) {
coroutine_resume(S,co1);
coroutine_resume(S,co2);
}
printf("main endn");
}
int main() {
struct schedule * S = coroutine_open();
test(S);
coroutine_close(S);
return 0;
}
当我们需要创建协程时我们需要调用 coroutine_open
函数创建并初始化一个协程调度器, 下面我们看一下 coroutine_open
都干了什么
// 创建协程调度器schedule
struct schedule *
coroutine_open(void) {
struct schedule *S = malloc(sizeof(*S)); // 从堆上为调度器分配内存空间
S->nco = 0; // 初始化调度器的当前协程数量
S->cap = DEFAULT_COROUTINE; // 初始化调度器的最大协程数量
S->running = -1;
S->co = malloc(sizeof(struct coroutine *) * S->cap); // 为调度器中的协程分配存储空间
memset(S->co, 0, sizeof(struct coroutine *) * S->cap);
return S;
}
这个函数主要是分配一个调度器实体, 并进行初始化.
创建完 Schedule
后, 调用 test()
函数, 在 test()
函数中首先调用 coroutine_new
函数创建了两个协程实例, 下面我们看一下 coroutine_new
做了哪些事情:
// 创建协程任务、并将其加入调度器中
int coroutine_new(struct schedule *S, coroutine_func func, void *ud) {
// 创建协程任务(分配内存空间)并初始化
struct coroutine *co = _co_new(S, func , ud);
// 将协程任务co加入调度器S,并返回该协程任务的id
if (S->nco >= S->cap) {
// 调整调度器S中协程的最大容量,然后将协程任务co加入调度器S,并返回该协程任务的id
int id = S->cap;
S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *));
memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap);
S->co[S->cap] = co;
S->cap *= 2;
++S->nco;
return id;
} else {
// 将协程任务co加入调度器S,并返回该协程任务的id
int i;
for (i=0;i<S->cap;i++) {
int id = (i+S->nco) % S->cap; //注意这里的循环顺序
if (S->co[id] == NULL) { //从当前nco开始,找到第一个为NULL的地方
S->co[id] = co;
++S->nco;
return id;
}
}
}
assert(0);
return -1;
}
这个函数主要做了两件事:
- 首先创建一个协程任务(分配内存空间)并初始化 [调用
_co_new
函数] - 其次将该协程结构体加入到
shedule.co
数组中
其中_co_new
函数如下:
// 创建协程任务(分配内存空间)并初始化
struct coroutine *
_co_new(struct schedule *S , coroutine_func func, void *ud) {
struct coroutine * co = malloc(sizeof(*co));
co->func = func; // 初始化协程函数
co->ud = ud; // 初始化用户数据
co->sch = S; // 初始化协程所属的调度器
co->cap = 0; // 初始化协程栈的最大容量
co->size = 0; // 初始化协程栈的当前容量
co->status = COROUTINE_READY; // 初始化协程状态
co->stack = NULL; // 初始化协程栈
return co;
}
可以看到每一个初始化的协程任务结构体, 其状态为 COROUTINE_READY
(就绪). 且其私有Stack设置为空.
下面继续回到 test()
函数 , 当我们创建了两个协程任务后, 开始进行循环, 调用 int coroutine_status(struct schedule * S, int id)
从 S
中获得指定 id
对应的协程实例的状态, 如果两个协程任务的状态都不为 COROUTINE_DEAD
, 则可以进行调度.
void coroutine_resume(struct schedule * S, int id)
函数用于恢复协程号为id的协程任务, 具体实现如下:
// 恢复协程号为id的协程任务
void
coroutine_resume(struct schedule * S, int id) {
assert(S->running == -1);
assert(id >=0 && id < S->cap);
struct coroutine *C = S->co[id]; //获取当前id对应的协程结构体
if (C == NULL)
return;
int status = C->status; //获取statue
switch(status) {
case COROUTINE_READY: //READY就绪可被调度
getcontext(&C->ctx); // 获取程序当前上下文
C->ctx.uc_stack.ss_sp = S->stack; // 设置上下文C->ctx的栈
C->ctx.uc_stack.ss_size = STACK_SIZE; // 设置上下文C->ctx的栈容量
C->ctx.uc_link = &S->main; // 设置上下文C->ctx执行完后恢复到S->main上下文, 否则当前线程因没有上下文可执行而退出
S->running = id;
C->status = COROUTINE_RUNNING;
uintptr_t ptr = (uintptr_t)S;
makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32)); // 修改上下文C->ctx, 新的上下文中执行函数mainfunc
swapcontext(&S->main, &C->ctx); // 保持当前上下文到S->main, 切换当前上下文为C->ctx
break;
case COROUTINE_SUSPEND:
memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size); // 拷贝协程栈C->stack到S->stack
S->running = id; // 设置当前运行的协程id
C->status = COROUTINE_RUNNING; // 修改协程C的状态
swapcontext(&S->main, &C->ctx); // 保存当前上下文到S->main, 切换当前上下文为C->ctx
break;
default:
assert(0);
}
}
如果你的状态是 READY
说明是第一次被调度, 首先会调用getcontext
函数获取当前的上下文, 然后会将该协程设置上下文对应的栈, 设置为 S->stack
, 接着调用makecontext
函数, 将 C->ctx
上下文与 mainfunc
函数进行绑定, 并把指向S
的指针作为参数传递给mainfunc(void*)
.
下面调用 swapcontext(S->main,C->ctx)
, 将当前的上下文保存在 S->main
中, 并切换到 C->ctx
中, 也就是 mainfunc
函数 , 下面我们看一下这个函数:
// 所有新协程第一次执行时的入口函数(其中执行协程,并处理善后工作等)
static void
mainfunc(uint32_t low32, uint32_t hi32) {
uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32);
struct schedule *S = (struct schedule *)ptr;
int id = S->running;
struct coroutine *C = S->co[id];
C->func(S,C->ud); //调用func
_co_delete(C); //执行完毕后删除C协程
S->co[id] = NULL;
--S->nco;
S->running = -1;
}
这个函数是一个过渡函数, 在这个函数中调用真正的工作函数 C->func
, 执行完毕后删除协程. 我们绑定的执行函数为 foo()
函数, foo函数就是打印一个数, 然后调用 coroutine_yield
函数,中断当前协程的执行.
// 保存上下文后中断当前协程的执行,然后由调度器中的main上下文接管程序执行权
void
coroutine_yield(struct schedule * S) {
int id = S->running;
assert(id >= 0);
struct coroutine * C = S->co[id];
assert((char *)&C > S->stack);
_save_stack(C,S->stack + STACK_SIZE); // 保存协程栈
C->status = COROUTINE_SUSPEND; // 修改协程状态
S->running = -1; // 修改当前执行的协程id为-1
swapcontext(&C->ctx , &S->main); // 保存当前协程的上下文到C->ctx, 切换当前上下文到S->main
}
在这个函数中, 最重要的就是 _save_stack
保存协程栈, 参数 top
为 S->stack+STACK_SIZE
也就是栈底.
static void
_save_stack(struct coroutine *C, char *top) {
char dummy = 0;
assert(top - &dummy <= STACK_SIZE);
if (C->cap < top - &dummy) {
// 为协程栈分配内存空间
free(C->stack);
C->cap = top-&dummy;
C->stack = malloc(C->cap);
}
C->size = top - &dummy;
memcpy(C->stack, &dummy, C->size); // TODO - 不是很明白为什么这种方式可以保存协程栈
}
这里的dummy是一个占位符, 用于获取当前栈使用的地址, top - &dummy
表示剩余空间大小, 因为存不下了. 重新调整栈的大小. 然后调用 memcpy
函数将 C栈的内容, 拷贝到起始位置. 无论是重新分配了, 还是没有都没有关系的.
执行完 _save_stack
后, 将该协程状态设置为 SUSPEND
暂停状态, 让切换为 S->main
上下文, 此时又回到了 coroutine_resume
函数中, 执行结束后, 会第二次调用 coroutine_resume
函数, 创建第二个协程, 与上面一样, 当打印出第一个字符串后, 退回 到corountine_resume
中, 然后继续 where 循环, 但此时在进入 corountine_resume
函数时, 协程的状态已经变成了SUSPEND
, 进入第二个switch语句, 将c->stack
拷贝到 S->stack
中(拷贝在后面). 然后交换上下文, 继续执行协程.
最后
以上就是敏感皮皮虾为你收集整理的coroutine源码分析的全部内容,希望文章能够帮你解决coroutine源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复