概述
线程模块
- 线程模块概述
- Linux线程库
- 基础功能
- 信号量
- 互斥锁
- 条件变量
- 线程同步机制包装类
- 线程池
- 详细线程池实现
- 其他
线程模块概述
线程是程序中完成一个独立任务的完整执行序列,即一个可调度的实体。线程可分为内核线程和用户线程。一个进程可拥有M个内核线程N个用户线程,且M<=N。按照M:N的取值,线程的实现方式可分为:完全在用户空间实现、完全由内核调度和双层调度。
完全在用户空间实现的线程的优点:创建和调度线程都无需内核的干预,速度加偶爱,且不占用额外的内核资源,既是一个进程有多个线程,也不会对系统造成明显的影响。缺点:对于多处理器,一个进程的多个线程无法运行在不同的CPU上。
完全由内核调度的模式优缺点和完全在用户空间实现的优缺点正好互换。双层调度则结合了前面两种优点:不但不会消耗过多的内核资源,而且线程切换速度较快,可充分利用多处理器的优势。
在sylar的线程模块中,其整体较为简单,主要是基于pthread实现了线程的基础功能实现和线程同步方面的封装。
Linux线程库
基础功能
#include <pthread.h>
// 创建一个线程
int pthread_creat(pthread_t* thread, const pthread_addr_t* attr, void* (*start_routine)(void*), void* arg);
// pthread_t定义
#include <bits/pthreadtypes.h>
typedef unsigned long int pthread_t;
// 线程函数在结束时调用,确保安全、干净的退出
void pthread_exit(void* retal);
// 调用函数来回收线程,即等待其他线程结束
void pthread_join(pthread_t thread, void** retval);
// 异常终止一个线程,即取消线程
int pthread_cancel(pthread_t thread);
// 接收到取消请求的目标线程可以决定是否允许被取消已经如何取消
int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcancelstate(int type, int *oldtype);
// 初始化线程属性对象
int pthread_attr_init(pthread_attr_t* attr);
// 销毁线程属性对象,被销毁的线程属性对象只有再次初始化之后才能使用
int pthread_attr_destroy(pthread_attr_t* attr);
信号量
#include <semaphore.h>
// 初始化一个未命名的信号量
int sem_init(sem_t* sem, int pshared, unsigned int value);
// 小蕙心好凉,以释放其占用的内核资源
int sem_destory(sem_t* sem);
// 以原子操作的方式将信号量减一,若信号量为0,则sem_wait将被阻塞,知道这个信号量有非0值
int sem_wait(sem_t* sem);
// 与sem_wait类似,不过它始终立即返回,无论被操作的信号量是否有非0值
int sem_trywait(sem_t* sem);
// 以原子操作的方式将信号量值加一
int sem_post(sem_t* sem);
互斥锁
#include <pthread.h>
// 初始化互斥锁,也可以用:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr);
// 销毁互斥锁,是放弃占用的内核资源
int pthread_mutex_destroy(pthread_mutex_t* mutex);
// 给互斥锁加锁,若互斥锁已经被所,则其调用被阻塞
int pthread_mutex_lock(phtread_mutex_t* mutex);
// 与lock类似,其始终立即返回
int pthread_mutex_trylock(pthread_mutex_t* mutex);
// 给互斥锁解锁
int pthread_mutex_unlock(pthread_mutex_t* mutex);
条件变量
// 条件变量:用于线程之间共享同步数据的值
// 提供了一个线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程
#include <pthread.h>
// 初始化条件变量
int pthread_cond_init(pthread_cont_t* cond, const pthread_condattr_t* cond_attr);
// 销毁条件变量
int pthread_cond_destory(pthread_cond_t* cond);
// 以广播的方式唤醒所有等待目标条件变量的线程
int pthread_cond_broadcast(pthread_cond_t* cond);
// 唤醒一个等待目标条件变量的线程
int pthread_cond_signal(pthread_cond_t* cond);
// 用于等待目标条件变量
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
线程同步机制包装类
以互斥锁为例
/**
* @brief 互斥量
*/
class Mutex : Noncopyable {
public:
/// 局部锁
typedef ScopedLockImpl<Mutex> Lock;
/**
* @brief 构造函数
*/
Mutex() {
pthread_mutex_init(&m_mutex, nullptr);
}
/**
* @brief 析构函数
*/
~Mutex() {
pthread_mutex_destroy(&m_mutex);
}
/**
* @brief 加锁
*/
void lock() {
pthread_mutex_lock(&m_mutex);
}
/**
* @brief 解锁
*/
void unlock() {
pthread_mutex_unlock(&m_mutex);
}
private:
/// mutex
pthread_mutex_t m_mutex;
};
/**
* @brief 空锁(用于调试)
*/
class NullMutex : Noncopyable{
public:
/// 局部锁
typedef ScopedLockImpl<NullMutex> Lock;
/**
* @brief 构造函数
*/
NullMutex() {}
/**
* @brief 析构函数
*/
~NullMutex() {}
/**
* @brief 加锁
*/
void lock() {}
/**
* @brief 解锁
*/
void unlock() {}
};
线程池
在sylar的线程模块中,本人好像没有看到线程池相关内容,在test中,其通过std::vector<sylar::Thread::ptr> thrs
简单的实现了一个线程池,下面程序为阅读《Linux高性能服务器编程》中的半同步/半反应堆线程池的实现,可供参考。
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "./locker.h"
// 线程池类
template<typename T>
class threadpool{
public:
threadpool(int thread_number=8,int max_requests=10000);
~threadpool();
// 往请求队列中添加任务
bool append(T* request);
private:
// 工作线程运行函数,不断从工作队列中取出任务并执行
static void* worker(void* arg);
void run();
private:
int m_thread_number; // 线程数
int m_max_requests; // 请求队列中允许的最大请求数
pthread_t* m_threads; // 线程池的组数
std::list<T*> m_workqueue; // 请求队列
locker m_queuelocker; // 请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理
bool m_stop; // 是否结束线程
};
template<typename T>
threadpool<T>::threadpool(int thread_number, int max_requests):
m_thread_number(thread_number),m_max_requests(max_requests),
m_stop(false),m_threads(NULL){
if((thread_number<=0)||(max_requests<=0)){
throw std::exception();
}
m_threads = new pthread_t(m_thread_number);
if(!m_threads){
throw std::exception();
}
// 创建thread_number个线程,并设置为脱离线程
for(int i=0;i<thread_number;i++){
printf("create the %dth threadn", i);
if(pthread_create(m_threads+1, NULL, worker, this)!=0){
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i])){
delete [] m_threads;
throw std::exception();
}
}
}
template<typename T>
threadpool<T>::~threadpool(){
delete [] m_threads;
m_stop = true;
}
template<typename T>
bool threadpool<T>::append(T* requests){
// 操作工作队列要加锁
m_queuelocker.lock();
if(m_queuelocker.size()>m_max_requests){
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(requests);
m_queuelocker.unlock();
m_queuestat.post(); // 添加信号量
return true;
}
template<typename T>
void* threadpool<T>::worker(void* arg){
threadpool* pool = (threadpool*)arg;
pool->run();
return pool;
}
template<typename T>
void threadpool<T>::run(){
while(!m_stop){
m_queuestat.wait();
m_queuelocker.lock();
if(m_workqueue.empty()){
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if(!request){
continue;
}
request->process();
}
}
#endif // !THREADPOOL_H
详细线程池实现
TaskQueue.h
#ifndef __TASKQUEUE_H__
#define __TASKQUEUE_H__
#include <queue>
#include <pthread.h>
using callback = void(*)(void* arg);
template <typename T>
struct Task{
Task<T>(){
function = nullptr;
arg = nullptr;
}
Task<T>(callback f, void* arg){
this->arg = (T*)arg;
function = f;
}
callback function;
T* arg;
};
template <typename T>
class TaskQueue{
public:
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task<T> task);
void addTask(callback f, void* arg);
// 取出任务
Task<T> takeTask();
// 获取任务个数
inline size_t taskNumber() { return m_taskQ.size(); }
private:
pthread_mutex_t m_mutex;
std::queue<Task<T>> m_taskQ;
};
#endif // !__TASKQUEUE_H__
TaskQueue.cpp
#include "TaskQueue.h"
template <typename T>
TaskQueue<T>::TaskQueue(){
pthread_mutex_init(&m_mutex, NULL);
}
template <typename T>
TaskQueue<T>::~TaskQueue(){
pthread_mutex_destroy(&m_mutex);
}
// 添加任务
template <typename T>
void TaskQueue<T>::addTask(Task<T> task){
pthread_mutex_lock(&m_mutex);
m_taskQ.push(task);
pthread_mutex_unlock(&m_mutex);
}
template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg){
pthread_mutex_lock(&m_mutex);
m_taskQ.push(Task<T>(f, arg));
pthread_mutex_unlock(&m_mutex);
}
// 取出任务
template <typename T>
Task<T> TaskQueue<T>::takeTask(){
Task<T> t;
pthread_mutex_lock(&m_mutex);
if(!m_taskQ.empty()){
t = m_taskQ.front();
m_taskQ.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
}
ThreadPool.h
#ifndef __THREDPOOL_H__
#define __THREDPOOL_H__
#include "TaskQueue.h"
#include "TaskQueue.cc"
template <typename T>
class ThreadPool{
public:
ThreadPool(int min, int max);
~ThreadPool();
void addTask(Task<T> task);
int getbusyNum();
int getliveNum();
private:
static void* worker(void* arg);
static void* manager(void* arg);
void threadExit();
private:
TaskQueue<T>* taskQ;
pthread_t managerID;
pthread_t* threadIDs;
int minNum;
int maxNum;
int busyNum;
int liveNum;
int exitNum;
pthread_mutex_t mutexPool;
pthread_cond_t notEmpyt;
bool shutdown; // 是否要销毁线程池
static const int NUMBER = 2;
};
#endif
ThreadPool.cpp
#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <string>
#include <unistd.h>
template <typename T>
ThreadPool<T>::ThreadPool(int min, int max){
// 实例化
do{
taskQ = new TaskQueue<T>;
if(taskQ == nullptr){
std::cout << "malloc threadIDs fail.." << std::endl;
break;
}
threadIDs = new pthread_t[max];
if(threadIDs == nullptr){
std::cout << "malloc threadIDs fail.." << std::endl;
break;
}
memset(threadIDs, 0, sizeof(pthread_t)*max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;
exitNum = 0;
if(pthread_mutex_init(&mutexPool, NULL) != 0 ||
// 条件变量初始化
pthread_cond_init(¬Empyt, NULL) != 0){
std::cout << "mutex or condition init fail.." << std::endl;
break;
}
shutdown = false;
// 线程创建
pthread_create(&managerID, NULL, manager, this);
for(int i=0;i<min;i++){
pthread_create(&threadIDs[i], NULL, worker, this);
}
return;
}while (0);
if(threadIDs)
delete[] threadIDs;
if(taskQ)
delete taskQ;
}
template <typename T>
ThreadPool<T>::~ThreadPool(){
shutdown = true;
pthread_join(managerID, NULL);
for(int i=0;i<liveNum;i++){
// 发送一个信号给另外一个处于阻塞等待状态的线程
pthread_cond_signal(¬Empyt);
}
if(taskQ){
delete taskQ;
}
if(threadIDs){
delete [] threadIDs;
}
// 销毁互斥锁
pthread_mutex_destroy(&mutexPool);
// 销毁条件变量
pthread_cond_destroy(¬Empyt);
}
template <typename T>
void ThreadPool<T>::addTask(Task<T> task){
if(shutdown){
return;
}
taskQ->addTask(task);
pthread_cond_signal(¬Empyt);
}
template <typename T>
int ThreadPool<T>::getbusyNum(){
pthread_mutex_lock(&mutexPool);
int busyN = this->busyNum;
pthread_mutex_unlock(&mutexPool);
return busyN;
}
template <typename T>
int ThreadPool<T>::getliveNum(){
pthread_mutex_lock(&mutexPool);
int liveN = this->liveNum;
pthread_mutex_unlock(&mutexPool);
return liveN;
}
template <typename T>
void* ThreadPool<T>::worker(void* arg){
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while(true){
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while(pool->taskQ->taskNumber() == 0 &&
!pool->shutdown){
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpyt, &pool->mutexPool);
// 判断是否销毁线程
if(pool->exitNum > 0){
pool->exitNum--;
if(pool->liveNum > pool->minNum){
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
}
}
// 判断线程池是否关闭
if(pool->shutdown){
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
// 从任务队列中取出一个任务
Task<T> task = pool->taskQ->takeTask();
// 开始工作,并解锁
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexPool);
std::cout <<"thread " << std::to_string(pthread_self()) << " start working..." << std::endl;
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
std::cout << "thread " <<std::to_string(pthread_self())<< " end working..." <<std::endl;
// 结束任务
pthread_mutex_lock(&pool->mutexPool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexPool);
}
return NULL;
}
template <typename T>
void* ThreadPool<T>::manager(void* arg){
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while(!pool->shutdown){
sleep(3);
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->taskQ->taskNumber();
int liveNum = pool->liveNum;
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
// 创建线程
if(queueSize>liveNum && liveNum<pool->maxNum){
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i=0;i<pool->maxNum&&counter<NUMBER&&pool->liveNum<pool->maxNum;i++){
if(pool->threadIDs[i]==0){
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
if(busyNum*2<liveNum && liveNum>pool->minNum){
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
for(int i=0;i<NUMBER;i++){
pthread_cond_signal(&pool->notEmpyt);
}
}
}
return NULL;
}
template <typename T>
void ThreadPool<T>::threadExit(){
// 获得县城自身ID
pthread_t tid = pthread_self();
for(int i=0;i<maxNum;i++){
if(threadIDs[i] == tid){
threadIDs[i] = 0;
std::cout << "threadExit() call"
<< std::to_string(tid)
<< " exiting..." << std::endl;
break;
}
}
// 终止线程
pthread_exit(NULL);
}
Test.cpp
#include "../src/ThreadPool.h"
#include "../src/ThreadPool.cc"
#include <unistd.h>
#include <iostream>
#include <string.h>
#include <string>
void taskFunc(void* arg){
int num = *(int*)arg;
std::cout << "thread " << std::to_string(pthread_self())
<< " is working, number = " << num <<std::endl;
sleep(1);
}
int main(){
ThreadPool<int> pool(3,10);
for(int i=0;i<100;i++){
int* num = new int(i+100);
pool.addTask(Task<int>(taskFunc, num));
}
sleep(20);
return 0;
}
其他
在sylar的服务器中,感觉其重点不在线程方面,而是在协程、协程调度、IO协程调度、定时器模块等相关内容,其主要基于协程进行后续的工作,后面的程序本人可能看起来比较吃力,也不知道这种看的方法对不对,如果有更好的学习方法什么的也希望有大佬可以提点意见~
最后
以上就是精明蜜粉为你收集整理的[源码阅读]——Sylar服务器框架:线程模块线程模块概述Linux线程库线程同步机制包装类线程池其他的全部内容,希望文章能够帮你解决[源码阅读]——Sylar服务器框架:线程模块线程模块概述Linux线程库线程同步机制包装类线程池其他所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复