概述
Lock.h
#pragma once
#include <mutex>
#include <condition_variable>
typedef std::recursive_mutex RecursiveMutex;
typedef std::lock_guard<RecursiveMutex> RecursiveLock;
typedef std::mutex Mutex;
typedef std::unique_lock<Mutex> AutoLock;
typedef std::condition_variable Condition;
ThreadPool.h
#pragma once
#include "Lock.h"
#include <functional>
#include <vector>
#include <deque>
#include <thread>
class ThreadPool
{
typedef std::vector<std::thread*> ThreadPoolType;
typedef std::function<void()> Task;
typedef std::deque<Task> TaskQueueType;
public:
// static ThreadPool& Instance()
// {
//
static ThreadPool pool;
//
return pool;
// }
void SetQueueMaxSize(size_t nMaxSize)
{
m_nMaxQueueSize = nMaxSize;
}
void Start(int32_t nNumThreads);
void Run(const Task& f);
void Run(Task&& f);
void Stop();
size_t GetTaskQueueSize();
void SetThreadInitCb(const Task& cb)
{
threadInitCb = cb;
}
private:
void ThreadFunc();
Task TaskTake();
bool IsFull()
{
return m_nMaxQueueSize>0 && m_dTaskQueue.size() >= m_nMaxQueueSize;
}
private:
int32_t m_nThreadNum{0};
size_t m_nMaxQueueSize{0};
ThreadPoolType m_vThreads;
TaskQueueType m_dTaskQueue;
Mutex m_Mutex;
Condition m_NotEmptyCond;
Condition m_NotFullCond;
Task threadInitCb;
bool m_bRunning{false};
};
ThreadPool.cpp
#include "ThreadPool.h"
void ThreadPool::Start(int32_t nNumThreads)
{
if(nNumThreads == 0 && threadInitCb)
{
threadInitCb();
return;
}
for(int i=0;i<nNumThreads;++i)
{
auto thrd = new std::thread(std::bind(&ThreadPool::ThreadFunc, this));
m_vThreads.push_back(thrd);
}
m_bRunning = true;
}
void ThreadPool::Run(const Task& f)
{
if(m_vThreads.empty())
{
return f();
}
AutoLock Lock(m_Mutex);
while(IsFull())
{
m_NotFullCond.wait(Lock);
}
m_dTaskQueue.push_back(f);
m_NotEmptyCond.notify_all();
}
void ThreadPool::Run(Task&& f)
{
if(m_vThreads.empty())
{
return f();
}
AutoLock Lock(m_Mutex);
while(IsFull())
{
m_NotFullCond.wait(Lock);
}
m_dTaskQueue.push_back(std::move(f));
m_NotEmptyCond.notify_all();
}
#include <iostream>
void ThreadPool::Stop()
{
AutoLock Lock(m_Mutex);
m_bRunning = false;
m_NotEmptyCond.notify_all();
for(auto it = m_vThreads.begin();it != m_vThreads.end();)
{
it = m_vThreads.erase(it);
}
std::cout <<"stop thread pool" << std::endl;
}
size_t ThreadPool::GetTaskQueueSize()
{
AutoLock Lock(m_Mutex);
return m_dTaskQueue.size();
}
void ThreadPool::ThreadFunc()
{
if(threadInitCb)
{
threadInitCb();
}
while(m_bRunning)
{
Task task = TaskTake();
if(task)
{
task();
}
}
}
ThreadPool::Task ThreadPool::TaskTake()
{
AutoLock Lock(m_Mutex);
while(m_dTaskQueue.empty() && m_bRunning)
{
m_NotEmptyCond.wait(Lock);
}
Task task;
if(!m_dTaskQueue.empty())
{
task = m_dTaskQueue.front();
m_dTaskQueue.pop_front();
m_NotFullCond.notify_all();
}
return task;
}
Test.cpp
#include "ThreadPool.h"
#include <iostream>
using namespace std;
void TestPoolPrintWithParam(const std::string &str)
{
cout << "this is thread pool running" << endl;
if (str.size() > 0)
{
cout << str << endl;
}
}
void TestPoolPrint()
{
cout << "this is thread pool running" << endl;
}
int main(int argc, char const *argv[])
{
ThreadPool threadPool;
threadPool.SetQueueMaxSize(5);
threadPool.Start(3);
threadPool.Run(TestPoolPrint);
threadPool.Run(TestPoolPrint);
for (int i = 0; i < 100; ++i)
{
char buf[32];
snprintf(buf, sizeof buf, "task %d", i);
threadPool.Run(std::bind(TestPoolPrintWithParam, std::string(buf)));
}
threadPool.Stop();
return 0;
}
编译: g++ ThreadPool.cpp ThreadPool.h Lock.h test.cpp -lpthread
最后
以上就是现代钢笔为你收集整理的cpp实现线程池-半同步半异步的全部内容,希望文章能够帮你解决cpp实现线程池-半同步半异步所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复