任务队列和线程池队列(C++11)

C/C++代码 blackfeather


任务队列可以认为是执行同一个方法来处理数据的队列,指定回调函数。

线程池就是先开辟好多个线程,然后将要执行的方法+参数丢到线程池中,支持返回值获取。


均依赖了无锁队列库 https://github.com/cameron314/concurrentqueue

本来不想依赖三方组件的,但是测试发现这个库效率是真的高,比带锁的队列快非常多。

而且vs中的std::queue是有BUG的,pop后不释放内存,也有一定的隐患(急死强迫症)


任务队列库TaskQueue:

#pragma once

#include <functional>
#include <vector>
#include <thread>
#include <future>
#include <atomic>
#include <chrono>
#include <mutex>

#include "queue/blockingconcurrentqueue.h" //for tsBlockQuque

template <class T, class P = void*>
class CTaskQueue
{
private:
	typedef std::function<void(P &param)> functionTaskInit;
	typedef std::function<void(P &param)> functionTaskExit;
	typedef std::function<bool(T &data)> functionTaskCallback;
	typedef std::function<bool(T &data, P &param)> functionTaskExCallback;
	//退出的时候用的回调
	typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback;

	//退出标记
	bool m_bExit;
	//检测队列的等待时间,毫秒
	int m_nWaitTime;
	//延时模式开关
	bool m_bDelayMode;
	//延时模式下,启动新工作线程需要加锁
	std::mutex m_mtStartWorkThread;
	//最大线程数
	size_t m_nMaxThreadCount;
	//添加的总数量的数量
	std::atomic<uint64_t> m_nTotalCount;
	//已处理完毕的数量
	std::atomic<uint64_t> m_nProcessedCount;
	//任务队列
	tsBlockQuque <T> m_taskQueue;

	//单参数的回调
	functionTaskCallback m_TaskCallBack;

	//带扩展参数的回调
	functionTaskInit m_TaskInitCallBack;
	functionTaskExit m_TaskExitCallBack;
	functionTaskExCallback m_TaskExCallBack;

	//工作线程
	std::vector <std::future<int>> m_arrWorkThreads;

	//启动工作线程
	void _StartWorkThread(size_t nThreadCount)
	{
		std::lock_guard<std::mutex> lock(m_mtStartWorkThread);

		if (m_arrWorkThreads.size() >= m_nMaxThreadCount)
			return;
		
		for (size_t i = 0; i < nThreadCount; i++)
		{
			m_arrWorkThreads.emplace_back(std::async(std::launch::async, [this]()->int{

				P param;
				if (m_TaskInitCallBack)
					m_TaskInitCallBack(param);

				while (true)
				{
					T data;
					if (m_taskQueue.wait_dequeue_timed(data, std::chrono::milliseconds(m_nWaitTime)))
					{
						//找到一组数据 回调
						if (m_TaskCallBack)
						{
							if (!m_TaskCallBack(data))
							{
								//处理失败了 重新丢回队列里面去
								m_taskQueue.enqueue(data);
								continue;
							}
						}
						else if (m_TaskExCallBack)
						{
							if (!m_TaskExCallBack(data, param))
							{
								//处理失败了 重新丢回队列里面去
								m_taskQueue.enqueue(data);
								continue;
							}
						}

						m_nProcessedCount++;
						continue;
					}

					//结束标记并且队列没有数据了 返回
					if (m_bExit)
						break;
				}

				if (m_TaskExitCallBack)
					m_TaskExitCallBack(param);

				return 0;
			}));
		}

		//创建完毕工作线程后,重置一下标记
		if (m_bDelayMode && m_arrWorkThreads.size() >= m_nMaxThreadCount)
			m_bDelayMode = false;
	}
public:

	//禁止拷贝和移动
	CTaskQueue(const CTaskQueue&) = delete;
	CTaskQueue& operator=(const CTaskQueue&) = delete;

	/*
	构造函数:
		bDelayMode: 延时模式
			true:延时启动工作线程,有需要的时候启动
			false:立即启动工作线程
		nWaitTime:等待队列的超时时间,毫秒
	*/
	CTaskQueue(bool bDelayMode = false, int nWaitTime = 200)
	{
		m_bExit = false;
		m_nTotalCount = 0;
		m_nProcessedCount = 0;
		m_bDelayMode = bDelayMode;
		m_nWaitTime = nWaitTime;
	}
	~CTaskQueue()
	{
		m_bExit = true;

		for (auto& fu : m_arrWorkThreads)
		{
			try
			{
				fu.get();
			}
			catch (...){}
		}
	}

	/*
	简单的回调方法的初始化
	参数:
		线程数
		回调方法
	*/
	void InitTaskQueue(size_t nMaxThreadCount, functionTaskCallback fnTaskCallback)
	{
		m_TaskCallBack = fnTaskCallback;
		m_TaskExCallBack = nullptr;
		m_TaskInitCallBack = nullptr;
		m_TaskExitCallBack = nullptr;
		m_nMaxThreadCount = nMaxThreadCount;

		if (!m_bDelayMode)
			_StartWorkThread(nMaxThreadCount);
	}
	/*
	带队列线程初始化和销毁回调方法的初始化
	参数:
		线程数
		数据处理回调方法
		队列线程初始化回调
		队列线程销毁回调
	*/
	void InitTaskQueue(int nMaxThreadCount, functionTaskExCallback fnTaskExCallback, functionTaskInit fnTaskInitCallback, functionTaskExit fnTaskExitCallback)
	{
		m_TaskCallBack = nullptr;
		m_TaskExCallBack = fnTaskExCallback;
		m_TaskInitCallBack = fnTaskInitCallback;
		m_TaskExitCallBack = fnTaskExitCallback;
		m_nMaxThreadCount = nMaxThreadCount;

		if (!m_bDelayMode)
			_StartWorkThread(nMaxThreadCount);
	}

	//添加数据
	void AddData(const T &data)
	{
		if (m_bDelayMode)
		{
			//延时模式下,如果任务还没有处理完毕,就增加一个工作线程
			if (!IsFinished() || m_arrWorkThreads.size() == 0)
			{
				//内部有工作线程数检测,达到最大线程数后自动关闭延时模式
				_StartWorkThread(1);
			}
		}

		assert(m_arrWorkThreads.size());
		m_taskQueue.enqueue(data);
		m_nTotalCount++;
	}

	//获取当前工作线程数
	size_t GetWorkThreadCount()
	{
		return m_arrWorkThreads.size();
	}

	//读取当前队列中待处理的数量
	uint64_t GetQueueSize()
	{
		return m_taskQueue.size_approx();
	}

	//读取已经处理过的数量
	uint64_t GetProcessedCount()
	{
		return m_nProcessedCount;
	}

	//读取所有加入过队列的数量
	uint64_t GetTotalCount()
	{
		return m_nTotalCount;
	}

	//查询队列是否处理完毕  立即返回
	bool IsFinished()
	{
		if (m_nTotalCount == m_nProcessedCount && GetQueueSize() == 0)
			return true;

		return false;
	}

	//等待队列结束  阻塞执行
	void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr)
	{
		printf("Wait for finish...\n");

		while (!IsFinished())
		{
			if (fnWaitCallback)
				fnWaitCallback(m_nProcessedCount, m_nTotalCount);

			std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms));
		}

		if (fnWaitCallback)
			fnWaitCallback(m_nProcessedCount, m_nTotalCount);

		printf("All finished. \n");
	}
};


示例代码:

    
    //
    CTaskQueue<int, int> taskTester;  //第二个int可忽略
	
	//单一回调
	taskTester.InitTaskQueue(8, [](int data)->bool{
		printf("task:%d\n", data);
		return true; //返回false会重新插回队列中
	}
	
	//多个回调的初始化方法
	taskTester.InitTaskQueue(8, [](int data, int &param)->bool{
		printf("task:%d, param:%d\n", data, param);
		return true;
	},
	[](int &param){
		param = GetCurrentThreadId();
		printf("task work thread init:%d \n", param);
	},
	[](int &param){
		printf("task work thread exit:%d \n", param);
	});

	for (int i = 0; i < 10000; i++)
	{
		taskTester.AddData(i);
	}

	taskTester.WaitforFinish(100, [](uint64_t processed, uint64_t total){
		uint64_t ret = (processed * 100) / total;
		printf("processed:%I64d, total:%I64d, %d%% \n", processed, total, (int)ret);
	});




线程池是用之前的帖子中的线程池修改的,增加了几个方法,修正了vs下的BUG 。。。

#pragma once

#include <vector>
#include <memory>
#include <thread>
#include <future>
#include <functional>
#include <stdexcept>
#include <atomic>
#include <chrono>
#include <mutex>

#include "queue/blockingconcurrentqueue.h" //for tsBlockQuque

class ThreadPool 
{
private:
	//退出时回调方法声明
	typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback;

	//工作线程
	std::vector<std::future<int>> m_arrWorkThreads;
	//任务队列
	tsBlockQuque < std::function<void()> > m_TaskQueue;

	//退出标记
	bool m_bExit;
	//检测队列的等待时间,毫秒
	int m_nWaitTime;
	//延时模式开关
	bool m_bDelayMode;
	//延时模式下,启动新工作线程需要加锁
	std::mutex m_mtStartWorkThread;
	//最大线程数
	size_t m_nMaxThreadCount;
	//添加的总数量的数量
	std::atomic<uint64_t> m_nTotalCount;
	//已处理完毕的数量
	std::atomic<uint64_t> m_nProcessedCount;

	//启动工作线程
	void _StartWorkThread(size_t nThreadCount)
	{
		std::lock_guard<std::mutex> lock(m_mtStartWorkThread);

		if (m_arrWorkThreads.size() >= m_nMaxThreadCount)
			return;

		for (size_t i = 0; i < nThreadCount; i++)
		{
			m_arrWorkThreads.emplace_back(std::async(std::launch::async, [this]()->int{

				printf("thread pool thread start...\n");

				while (true)
				{
					std::function<void()> task;
					if (m_TaskQueue.wait_dequeue_timed(task, std::chrono::milliseconds(m_nWaitTime)))
					{
						task();
						m_nProcessedCount++;
						continue;
					}

					//结束标记并且队列没有数据了 返回
					if (m_bExit)
						break;
				}

				printf("thread pool thread exit...\n");
				return 0;
			}));
		}

		//创建完毕工作线程后,重置一下标记
		if (m_bDelayMode && m_arrWorkThreads.size() >= m_nMaxThreadCount)
			m_bDelayMode = false;
	}

public:

	//禁止拷贝和移动
	ThreadPool(const ThreadPool&) = delete;
	ThreadPool& operator=(const ThreadPool&) = delete;

	/*
	构造函数
		nMaxThreadCount:最大工作线程数
		bDelayMode:延时模式
			true:延时启动工作线程,有需要的时候启动
			false:立即启动工作线程
		nWaitTime:等待队列的超时时间,毫秒
	*/
	ThreadPool(size_t nMaxThreadCount, bool bDelayMode = false, int nWaitTime = 200)
	{
		m_nTotalCount = 0;
		m_nProcessedCount = 0;
		m_bExit = false;

		InitThreadPool(nMaxThreadCount, bDelayMode, nWaitTime);
	}
	ThreadPool()
	{
		m_nTotalCount = 0;
		m_nProcessedCount = 0;
		m_bExit = false;
	}
	~ThreadPool()
	{
		m_bExit = true;

		for (auto& fu : m_arrWorkThreads)
		{
			try
			{
				fu.get();
			}
			catch (...){}
		}
	}

	/*
	初始化线程池,如果使用带参数的构造函数初始化,则无需调用此方法
		nMaxThreadCount:最大工作线程数
		bDelayMode:延时模式
			true:延时启动工作线程,有需要的时候启动
			false:立即启动工作线程
		nWaitTime:等待队列的超时时间,毫秒
	*/
	void InitThreadPool(size_t nMaxThreadCount, bool bDelayMode = false, int nWaitTime = 200)
	{
		m_nMaxThreadCount = nMaxThreadCount;
		m_bDelayMode = bDelayMode;
		m_nWaitTime = nWaitTime;

		if (!m_bDelayMode)
			_StartWorkThread(nMaxThreadCount);
	}

	//添加任务
	template<class F, class... Args>
	auto AddTask(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
	{
		if (m_bDelayMode)
		{
			//延时模式下,如果任务还没有处理完毕,就增加一个工作线程
			if (!IsFinished() || m_arrWorkThreads.size() == 0)
			{
				//内部有工作线程数检测,达到最大线程数后自动关闭延时模式
				_StartWorkThread(1);
			}
		}

		assert(m_arrWorkThreads.size());

		using return_type = typename std::result_of<F(Args...)>::type;
		auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
		std::future<return_type> res = task->get_future();
		m_TaskQueue.enqueue([task](){ (*task)(); });
		m_nTotalCount++;

		return res;
	}

	//获取当前工作线程数
	size_t GetWorkThreadCount()
	{
		return m_arrWorkThreads.size();
	}

	//获取总共插入过的任务数
	uint64_t GetTotalCount()
	{
		return m_nTotalCount;
	}

	//获取已经处理完毕的任务数
	uint64_t GetProcessedCount()
	{
		return m_nProcessedCount;
	}

	//读取当前队列中待处理的数量
	uint64_t GetQueueSize()
	{
		return m_TaskQueue.size_approx();
	}

	//判断是否已经执行完毕已经插入的任务
	bool IsFinished()
	{
		if (m_nProcessedCount == m_nTotalCount && m_TaskQueue.size_approx() == 0)
			return true;

		return false;
	}


	//阻塞等待队列执行完毕
	void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr)
	{
		printf("Wait for finish...\n");

		while (!IsFinished())
		{
			if (fnWaitCallback)
				fnWaitCallback(m_nProcessedCount, m_nTotalCount);

			std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms));
		}

		if (fnWaitCallback)
			fnWaitCallback(m_nProcessedCount, m_nTotalCount);

		printf("All finished. \n");
	}
};



最后说说vs神奇的BUG。。。

#include <iostream>
#include <string>
#include <vector>
#include <thread>


class MyClass
{
private:
	std::vector<std::thread> arrthreads;
	bool bexit;
public:
	MyClass()
	{
		bexit = false;
	};
	~MyClass()
	{
		bexit = true;
		for (auto &x : arrthreads)
		{
			x.join();  //vs编译的这里会卡死,gcc编译的正常执行
		}
	};

	void init()
	{
		for (int i = 0; i < 4; i++)
		{
			arrthreads.emplace_back([this](){

				printf("trhead start...\n");

				while (true)
				{
					if (bexit)
						break;

				}

				printf("trhead exit...\n");
			});
		}
	};
};


MyClass testthread;

int main()
{
    printf("main2\n");
    
    testthread.init();
    
    return 0;
}

MyClass析构函数中让工作线程退出,毫无问题,但是vs编译的,join会卡死阻塞或者崩溃跑飞,gcc编译的无问题。

各位大师有何看法。。。

评论列表:

发表评论: