我是靠谱客的博主 谨慎小丸子,最近开发中收集的这篇文章主要介绍sylar库学习线程、协程、调度管理器sylar库学习线程与协程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

sylar库学习线程与协程

知识点

thread_local关键字,这个关键字在我理解看来是一个类似于static的关键字,但区别在于这个关键字所描述的数据的划分是线程,在单个线程内它与static没有区别,而不同的线程中thread_local关键字是独立的每个线程都有一份单独的,互不干扰的数据。因此这一关键字是构造多线程调度的关键,在线程类,协程类,调度器类都需要用到。
其中线程和线程执行其他函数之前,都是用一个函数将其他函数包装起来,以此来设置函数执行之前的参数

线程

由于使用的是Linux作为操作系统,要将程序中所得到的线程号与系统中使用ps命令查出来的编号相对应,最好使用linux系统中自带的库。也就是pthread
线程类主要是包装了
pthread_create(thread)
pthread_detach(~thread)
pthread_join(join)
pthread_setname_np(run)
这几个函数,这些都是C语言风格的函数,以pthread_t数据结构来进行函数的控制。
其中为了构造函数之前

Thread::Thread(std::function<void()> cb, const std::string &name):m_cb(cb),m_name(name) {
  if (name.empty()) {
    m_name ="UNKNOW";
  }
  int rt = pthread_create(&m_thread, nullptr, &Thread::run, this); //线程创建,此处开始就立即执行
                                                                   // 线程可能在构造函数返回之前就跑起来
  if (rt) {
    SYLAR_LOG_ERROR(g_logger)
        << "pthread_create thread fail, rt=" << rt << "name= " << name;
    throw std::logic_error("pthread_create error");//逻辑错误由于程序内部逻辑而导致的错误
  }
  m_semaphore.wait();//可以等到跑起来在唤醒
}
void *Thread::run(void *arg) {//线程函数参数必须是void* POSIX线程库标准,run为静态成员,this指针必须显式传入
  Thread* thread = (Thread *)arg;
  t_thread = thread;
  t_thread_name = thread->m_name;
  thread->m_id = sylar::GetThreadId();
  pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());//为线程设置唯一名称
  std::function<void()> cb;
  cb.swap(thread->m_cb);
  thread->m_semaphore.notify();//线程执行到这的时候析构函数才能结束
  cb();
  return 0;
}

通过加了一个锁保证了执行顺序的问题,构造函数一直在等待,知道设置完函数,设置完名字,开始运行函数的时候才能析构完成
callback一下,其中的线程指针和线程名就是使用的thread_local来进行区分的。

协程

相比于线程只是运行一个函数来说,协程就是一个讲究人啊。其中设定了状态(设计模式中就有一个状态模式,咱这也是使用了设计模式的人上人了)。
协程的包装主要是控制的ucontext_t数据结构
其中
需要有三个参数需要设置
uc_link//后续上下文,一般置空指针
uc_stack.ss_sp//栈空间地址->对应malloc申请内存大小
uc_stack.ss_size//栈空间大小

包装有
getcontext //初始化,设置值(Fiber)
makecontext//类似于执行函数(Fiber或reset)
swapcontext//上下文切换,非常重要的函数。协程通过这个函数进行切换

其中总体结构为,设置一个主协程,在使用的时候通过主协程来进行控制

协程 <——>主协程<——>协程

每个线程中需要一个主协程在中间进行调度。
因此 thread_local关键字又开始派上用场了,

static thread_local Fiber *t_fiber = nullptr;//当前执行线程
static thread_local Fiber::ptr t_threadFiber = nullptr;//主线程

那么就有一个问题,创建协程的时候一定需要有一个主协程,因此可以用写一个GetThis函数,每次要交换之前获取一下,有则直接获取,无则创建一

Fiber::ptr Fiber::GetThis() {
  if (t_fiber) {//若当前没有正在运行的协程,因此必无主协程
    return t_fiber->shared_from_this();//有就直接返回当前指针weakptr
  }
  Fiber::ptr main_fiber(new Fiber);//创建时已经将指针放入了t_fiber!
  SYLAR_ASSERT(t_fiber == main_fiber.get())
  t_threadFiber = main_fiber;
  return t_fiber->shared_from_this();
}

协程的其他函数都是基本围绕着swapcontext函数展开的主要是
swapin 将主协程切换到当前协程
swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)
swapout 将当前协程切换到主协程
swapcontext(&m_ctx, &t_threadFiber->m_ctx)
swapin 比较好理解切入后状态都变为EXEC(正在执行)
协程切出后需要变成什么状态就更值得商榷。

enum State { INIT, HOLD, EXEC, TERM, READY,EXCEPT };
				//  初始化  保持   执行 结束 准备 异常 

通过状态来判断下次调用时需要做什么工作,和线程不同的是他不swapin代码就不会执行

void Fiber::CallerMainFunc() {
    Fiber::ptr cur = GetThis();//这里赋值了一次引用计数+1
    SYLAR_ASSERT(cur);
    try {
        cur->m_cb();
        cur->m_cb = nullptr;
        cur->m_state = TERM;
    } catch (std::exception& ex) {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
            << " fiber_id=" << cur->getId()
            << std::endl
            << sylar::BacktraceToString();
    } catch (...) {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except"
            << " fiber_id=" << cur->getId()
            << std::endl
            << sylar::BacktraceToString();
    }

    auto raw_ptr = cur.get();//此处为什么要用指针也很有说法
    cur.reset();
    raw_ptr->back();//执行此处又回到了主协程,若直接用cur会卡到此处不能释放,去执行主协程了,而由于cb()已经执行完了,
    	//这里应该是需要销毁的状态,若直接使用只能指针会导致引用计数永远有1而不能释放
    //永远不会执行到下面这一行,那么栈空间就不会被销毁
    SYLAR_ASSERT2(false, "never reach fiber_id=" + std::to_string(raw_ptr->getId()));

}

调度管理器

调度管理器,核心自然是为了调度,调度的是什么呢,他应该是任务,或者是要执行的函数,因此要将两者存储成一种形式是有必要的。下面这一个类就是用来描述这种形式的。需要注意的是,这一形式需要可以以不同的形式进行初始化,可以直接使用函数,或者直接使用协程来进行构造,还可以通过reset直接清空掉,或者通过无参函数先占个位置,在与要执行的线程号绑定在一起方便调度。//注意,这是结构体,所有参数都是公共的,因此可以直接赋值。

  struct FiberAndThread {
    Fiber::ptr fiber;
    std::function<void()> cb;
    int thread;
    FiberAndThread(Fiber::ptr f, int thr) : fiber(f), thread(thr) {}
    FiberAndThread(Fiber::ptr *f, int thr) : thread(thr) { fiber.swap(*f); }
    FiberAndThread(std::function<void()> f, int thr) : cb(f), thread(thr) {}
    FiberAndThread(std::function<void()> *f, int thr) : thread(thr) {
      cb.swap(*f);
    }
    FiberAndThread() : thread(-1) {}//方便初始化
    void reset() {
      fiber = nullptr;
      cb = nullptr;
      thread = -1;
    }
  };

对于调度管理器还有一个核心问题,那就是这个调度管理器应该在哪一个线程上呢?这一个线程没有主协程应该怎么办?又应该在哪一个协程上呢?
线程其实好解决,那个线程构造调度管理器就使用当前线程,因此创建管理器之前,首先要为创建管理器所在的线程上调用Fiber::GetThis()。目的是没有的话就创建一个主协程。那么应该在那一个协程上呢,很显然不会是一个线程的主协程,因为主协程忙着呢还有其他事要干,因此需要在创建一个协程,专职调度 。当然还有更简单粗暴的办法,创建一个线程专门给调度管理器使用!就不需要管这么多了。

static thread_local Scheduler *t_scheduler = nullptr;
static thread_local Fiber* t_fiber = nullptr;

很明显这表示了正在执行的管理器,和正在执行的协程。由于可能需要管理多个线程,因此没有线程指针也是可以理解的

解决了上述两个问题之后就到了令人头疼的正式调度程序了。在开始之前,怎么能忘记thread_local关键字呢

首先程序有两个可能的入口
1是通过start函数中的线程进入的
2是通过主协程里保存的函数进入的

对函数来说
首先是验证部分
当线程运行此函数时
第一步:遍历其所有的FiberAndThread 结构体并找到当前线程要运行的任务,若任务正在执行(EXEC)则跳过。
第二步:若要运行的协程
则判断其状态
(若协程不是结束TERM或异常EXCEPT 则执行线程(此处状态主要为HOLD,READY,INIT)
执行完后若是READY状态则再放回调度管理器内,要是没执行完就出来了则变为HOLD,(由于HOLD比较特殊,不能再由调度管理器代管,不然会再下一次循环中执行出来,因此将状态转为HOLD后就不再记录可以直接清除)
清除当前fiber

当前需要执行的是函数的话
就创建一个协程,将函数装进去,执行此协程再执行上述步骤
若是的(一般是经过上面执行完的Fiber会被reset为空)
则进行空闲状态的处理
空闲线程会在(stop函数执行后,且任务列表为空,且没有活跃线程,rootFiber未执行的时候将run函数结束掉
具体函数如下

void Scheduler::run() {
  SYLAR_LOG_INFO(g_logger)<<m_name<<"run";
  setThis();
  if(sylar::GetThreadId() != m_rootThread) {//判断是否为主线程
    t_fiber=Fiber::GetThis().get();
  }
  Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle,this)));
  Fiber::ptr cb_fiber;

  FiberAndThread ft;
  while (true) {
    ft.reset();
    bool tickle_me = false;
    bool is_active=false;
    {//从消息队列中取出一个需要执行的消息
      MutexType::Lock lock(m_mutex);
      auto it = m_fibers.begin();
      while (it != m_fibers.end()) {//找到是否是线程id
        if (it->thread != -1 && it->thread != sylar::GetThreadId()) {//期望的线程id不是当前线程id
          ++it;
          tickle_me = true;//通知其他线程
          continue;
        }
        SYLAR_ASSERT(it->fiber || it->cb);
        if (it->fiber && it->fiber->getState() == Fiber::EXEC) {//正在执行什么都不做
          ++it;
          continue;
        }
        ft = *it;
        m_fibers.erase(it);
        ++m_activeThreadCount;
        is_active=true;
        break;
      }
     // tickle_me|=it!=m_fibers.end();
    }
    if (tickle_me) {
      tickle();
    }
    if (ft.fiber && (ft.fiber->getState() != Fiber::TERM&&ft.fiber->getState()!=Fiber::EXCEPT)) {
      ft.fiber->swapIn();
      --m_activeThreadCount;
      if (ft.fiber->getState() == Fiber::READY) {
        schedule(ft.fiber);
      } else if (ft.fiber->getState() != Fiber::TERM &&
                 ft.fiber->getState() != Fiber::EXCEPT) {
        ft.fiber->m_state=Fiber::HOLD;
      }
      ft.reset();
    } else if (ft.cb) {
      if (cb_fiber) {
        cb_fiber->reset(ft.cb);
      } else {
        cb_fiber.reset(new Fiber(ft.cb));
      }
      ft.reset();
      cb_fiber->swapIn();
      --m_activeThreadCount;
      if (cb_fiber->getState() == Fiber::READY) {
        schedule(cb_fiber);
        cb_fiber.reset();
      } else if (cb_fiber->getState() == Fiber::EXCEPT ||
                 cb_fiber->getState() == Fiber::TERM) {
        cb_fiber->reset(nullptr);
      } else {//if (cb_fiber->getState() != Fiber::TERM) {
        cb_fiber->m_state = Fiber::HOLD;
        cb_fiber.reset();
      }
    } else {
      if (is_active) {
        --m_activeThreadCount;
        continue;
      }
      if (idle_fiber->getState() == Fiber::TERM) {
        SYLAR_LOG_INFO(g_logger)<<"idle fiber term";
        break;
      }
      ++m_idleThreadCount;
      idle_fiber->swapIn();
       --m_idleThreadCount;
      if (idle_fiber->getState() != Fiber::TERM &&
          idle_fiber->getState() != Fiber::EXCEPT) {//若回来的状态不等于结束状态时
        idle_fiber->m_state = Fiber::HOLD;
      }
     
    }
  }
}

最后

以上就是谨慎小丸子为你收集整理的sylar库学习线程、协程、调度管理器sylar库学习线程与协程的全部内容,希望文章能够帮你解决sylar库学习线程、协程、调度管理器sylar库学习线程与协程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部