概述
前段时间在docker 内写算法,运行效率有点低,就用了一个朋友的线程池,现在把他代码贴出来,做个记录以后用的时候方便,
#include <atomic>
#include <functional>
#include <future>
#include <iostream>
#include <queue>
#include <thread>
#include <vector>
namespace TDThreadPool{
class ThreadPool
{
//设置最大线程池数量
#define FZ_MAX_THREAD_NUM 20
//定义任务类型
using Task = std::function<void()>;
public:
/********************************************************************************************
* @description 构造一个线程池对象
* @author liyoufeng
* @date 2019.01.26
* @param size_t threadInitNum 线程池中初始的线程个数
* @param size_t maxThreadNum 线程池中最大允许的线程个数,默认为FZ_MAX_THREAD_NUM
* @return none
********************************************************************************************/
ThreadPool(size_t threadInitNum = 0, size_t maxThreadNum = FZ_MAX_THREAD_NUM)
: m_maxThreadNum(maxThreadNum)
{
size_t num = AddThread(threadInitNum);
std::cout << "The number of threads successfully added is " << num << std::endl;
}
~ThreadPool()
{
StopThreadPool();
}
public:
/********************************************************************************************
* @description 提交一个任务
* @author liyoufeng
* @date 2019.01.26
* @param FuncType&& func 执行的任务的函数
* @param ArgsType&&... args 执行任务函数需要传入的参数
* @return std::future<decltype(func(args...))> 通过std::future异步获取任务函数func的返回值
* @remark 当线程池对象停止时,如果还继续使用返回值future,可能会崩溃
********************************************************************************************/
template <typename FuncType, typename... ArgsType>
auto CommitOneTask(FuncType&& func, ArgsType&&... args) -> std::future<decltype(func(args...))>
{
//推到出返回值的类型
using TaskReturnType = decltype(func(args...));
auto pTask = std::make_shared<std::packaged_task<TaskReturnType()>>(std::bind(std::forward<FuncType>(func), std::forward<ArgsType>(args)...));
auto retVal = pTask->get_future();
{
std::lock_guard<std::mutex> grdLock(m_lock);
m_taskQueue.emplace([pTask]() {
(*pTask)();
});
}
//如果线程没有空闲的线程,则添加一个线程
if (!IsHaveFreeThread()) {
size_t num = AddThread(1);
std::cout << "The number of threads successfully added is " << num << std::endl;
}
//唤醒开一个等待的线程开始执行任务
m_taskCondition.notify_one();
return retVal;
}
/********************************************************************************************
* @description 添加线程
* @author liyoufeng
* @date 2019.01.26
* @param size_t threadNum 添加线程的个数
* @return size_t 此次添加了的线程个数
********************************************************************************************/
size_t AddThread(size_t threadNum)
{
size_t num = 0;
if (IsStopRun()) {
return num;
}
while (num < threadNum)
{
if (!IsCanAddThread()) {
break;
}
num++;
m_freeThreadNum++;//添加一个新线程,空闲线程数量增加一个
m_threadPool.emplace_back([this]()
{
while (!IsStopRun())
{
Task task;
{
std::unique_lock<std::mutex> unqLock(m_lock);
//如果任务队列为空的话
if (m_taskQueue.empty()){
m_taskCondition.wait(unqLock, [this]() {
//当返回为false的时候会被阻塞。当收到其他线程的通知时,是true的时候,才会解除阻塞
return !m_bIsBlockRun || !m_taskQueue.empty();
});
}
//如果停止运行且任务队列中没有任务,则线程停止运行
if (IsStopRun()) {
return;
}
//暂时不考虑move操作
task=std::move(m_taskQueue.front());
m_taskQueue.pop();
std::cout << "The Current thread id is " << std::this_thread::get_id() << std::endl;
}
m_freeThreadNum--; //获取到一个任务,准备开始执行,空闲线程数减少一个
task();
m_freeThreadNum++; //执行完成一个任务,空闲线程数增加一个
}
});
}
return num;
}
public:
//获取当前线程个数
size_t GetCurThreadNum()const {
return m_threadPool.size();
}
//获取最大允许启动的线程数量
size_t GetMaxThreadNum()const {
return m_maxThreadNum;
}
/********************************************************************************************
* @description 停止线程池,可以手动调用,提前释放线程资源,否则线程池对象销毁时释放线程资源
* @author liyoufeng
* @date 2019.01.26
* @param bool bIsRunAllTasks 是否运行完所有的任务停止 运行完停止,true;否则,false 默认true
* @return void
********************************************************************************************/
void StopThreadPool(bool bIsRunAllTasks=true) {
if (IsStopRun()) {
return;
}
m_bIsRunAllTasks = bIsRunAllTasks;
m_bIsBlockRun = false;
m_taskCondition.notify_all();
for (auto& thread : m_threadPool) {
if (thread.joinable()) {
thread.join();
}
}
std::cout << "The number of threads used is " << GetCurThreadNum() << std::endl;
std::cout << "The number of remaining tasks is " << m_taskQueue.size() << std::endl;
m_threadPool.clear();
m_freeThreadNum = 0;
}
//重置线程池状态,可重复使用
void Reset() {
m_threadPool.clear();
m_bIsBlockRun = true;
m_freeThreadNum = 0;
std::queue<Task> temp;
m_taskQueue.swap(temp);
m_bIsRunAllTasks = true;
}
private:
//是否可以添加线程。可以,true;不可以,false
bool IsCanAddThread()const {
return GetCurThreadNum() < GetMaxThreadNum();
}
//是否有空闲的线程。有,true;没有,false
bool IsHaveFreeThread()const {
return m_freeThreadNum > 0;
}
//线程是否停止运行
bool IsStopRun()const {
return !m_bIsBlockRun && (!m_bIsRunAllTasks || m_taskQueue.empty());
}
private:
std::vector<std::thread> m_threadPool; //定义线程池
std::mutex m_lock; //同步锁
std::condition_variable m_taskCondition; //任务条件阻塞
std::atomic<bool> m_bIsBlockRun = {true}; //线程池中的线程在没有任务的时候阻塞
std::atomic<size_t> m_freeThreadNum = {0}; //空闲线程数量
const size_t m_maxThreadNum; //线程的最大数量
std::queue<Task> m_taskQueue; //任务队列
std::atomic<bool> m_bIsRunAllTasks = {true};//当线程池停止时,是否运行完所有的任务后停止
};
}
最后
以上就是诚心西牛为你收集整理的用C++ 11 做线程池的全部内容,希望文章能够帮你解决用C++ 11 做线程池所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复