c++實(shí)現(xiàn)簡單的線程池
這是對(duì)pthread線程的一個(gè)簡單應(yīng)用
1. 實(shí)現(xiàn)了線程池的概念,線程可以重復(fù)使用。
2. 對(duì)信號(hào)量,互斥鎖等進(jìn)行封裝,業(yè)務(wù)處理函數(shù)中只需寫和業(yè)務(wù)相關(guān)的代碼。
3. 移植性好。如果想把這個(gè)線程池代碼應(yīng)用到自己的實(shí)現(xiàn)中去,只要寫自己的業(yè)務(wù)處理函數(shù)和改寫工作隊(duì)列數(shù)據(jù)的處理方法就可以了。
Sample代碼主要包括一個(gè)主程序和兩個(gè)線程實(shí)現(xiàn)類
ThreadTest.cpp:主程序
CThreadManager:線程管理Class,線程池的實(shí)現(xiàn)類
CThread:線程Class.
主程序?qū)崿F(xiàn)方法。
1. 實(shí)現(xiàn)main函數(shù)和一個(gè)需要線程處理的業(yè)務(wù)函數(shù)(例子代碼中業(yè)務(wù)函數(shù)是一個(gè)簡單的計(jì)算函數(shù)Count)。在main函數(shù)中創(chuàng)建CThreadManager的實(shí)例,產(chǎn)生線程池。這個(gè)時(shí)候,把業(yè)務(wù)函數(shù)作為函數(shù)指針傳到CThreadManager里面,最終會(huì)被線程調(diào)用。
2. 向工作隊(duì)列中放入業(yè)務(wù)函數(shù)要處理的數(shù)據(jù)。
3. 設(shè)置信號(hào)量,喚醒線程。
// 線程要執(zhí)行的函數(shù)
int Count(int nWork)
{
int nResult = nWork * nWork;
printf("count result is %d\n",nResult);
return 0;
}
int main() {
// 創(chuàng)建線程管理類的實(shí)例,把要執(zhí)行的線程函數(shù)和最大線程數(shù)傳進(jìn)去
CThreadManager* pManager = new CThreadManager(Count, 3);
// 把要進(jìn)行計(jì)算的數(shù)放到工作隊(duì)列中
pManager->PushWorkQue(5);
pManager->PushWorkQue(20);
// 設(shè)置信號(hào)量,喚醒線程
pManager->PostSem();
pManager->PostSem();
// 等待子線程執(zhí)行
sleep(1);
return 0;
}
CThreadManager實(shí)現(xiàn)的方法
1. 把信號(hào)量和互斥鎖等封裝成自己的函數(shù)
2. 在new方法里,循環(huán)調(diào)用CThread的new方法,啟動(dòng)一定數(shù)量(可設(shè)定)的線程,產(chǎn)生線程池。
3. 這些線程啟動(dòng)后,就會(huì)執(zhí)行CThreadManager中的ManageFuction函數(shù)。這個(gè)函數(shù)是無限循環(huán)的,保證了線程在整個(gè)程序的生命周期中不銷毀。
4. 在循環(huán)處理里面,第一行代碼就是等待一個(gè)信號(hào)量,這個(gè)信號(hào)量是由主程序進(jìn)行設(shè)置的,這個(gè)信號(hào)信號(hào)量如果沒有被設(shè)置(代表暫時(shí)沒有需要處理的工作),所有線程都在這里阻塞著。
4. 一旦信號(hào)量被設(shè)置,根據(jù)Linux線程調(diào)度機(jī)制,在阻塞的線程隊(duì)列中,其中一個(gè)線程被喚醒,可以執(zhí)行后面的代碼。
5. 從工作隊(duì)列中取出要進(jìn)行處理的數(shù)據(jù)(使用互斥鎖進(jìn)行排他)
6. 通過函數(shù)指針調(diào)用main函數(shù)傳過來的業(yè)務(wù)函數(shù),處理數(shù)據(jù)。
7. 業(yè)務(wù)函數(shù)執(zhí)行完之后,線程進(jìn)入下一個(gè)循環(huán),等待新的信號(hào)量。
class CThreadManager {
friend void* ManageFuction(void*);
private:
sem_t m_sem; // 信號(hào)量
pthread_mutex_t m_mutex; // 互斥鎖
queue<int> m_queWork; // 工作隊(duì)列
list<CThread*> m_lstThread; // 線程list
int (*m_threadFuction)(int); //函數(shù)指針,指向main函數(shù)傳過來的線程執(zhí)行函數(shù)
public:
CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt);
virtual ~CThreadManager();
int WaitSem();
int PostSem();
int LockMutex();
int UnlockMutex();
void PushWorkQue(int nWork);
int PopWorkQue();
int RunThreadFunction(int nWork);
};
// 線程執(zhí)行函數(shù),它只是個(gè)殼子,處理信號(hào)量和互斥鎖等,
// 最后調(diào)用main函數(shù)傳過來的線程執(zhí)行函數(shù)來實(shí)現(xiàn)業(yè)務(wù)處理
void* ManageFuction(void* argv)
{
CThreadManager* pManager = (CThreadManager*)argv;
// 進(jìn)行無限循環(huán)(意味著線程是不銷毀的,重復(fù)利用)
while(true)
{
// 線程開啟后,就在這里阻塞著,直到main函數(shù)設(shè)置了信號(hào)量
pManager->WaitSem();
printf("thread wakeup.\n");
// 從工作隊(duì)列中取出要處理的數(shù)
pManager->LockMutex();
int nWork = pManager->PopWorkQue();
pManager->UnlockMutex();
printf("call Count function.\n");
pManager->RunThreadFunction(nWork);
}
return 0;
}
// 構(gòu)造方法
CThreadManager::CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt)
{
sem_init(&m_sem, 0, 0);
pthread_mutex_init(&m_mutex, NULL);
m_threadFuction = threadFuction;
for(int i=0; i<nMaxThreadCnt; i++)
{
CThread* pThread = new CThread(ManageFuction, this);
printf("thread started.\n");
m_lstThread.push_back(pThread);
}
}
CThread實(shí)現(xiàn)的方法
CThreadManager比較簡單,封裝了創(chuàng)建線程和join線程的函數(shù)。
CThread::CThread(void* (*threadFuction)(void*),void* threadArgv)
{
// 初始化線程屬性
pthread_attr_t threadAttr;
pthread_attr_init(&threadAttr);
pthread_create(&m_thread, &threadAttr, threadFuction, threadArgv);
}
c++線程池,繼承CDoit,實(shí)現(xiàn)其中的start和end
/*
* 多線程管理類
*
*/
#ifndef CTHREADPOOLMANAGE_H
#define CTHREADPOOLMANAGE_H
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <list>
#include <vector>
#include <time.h>
#include <asm/errno.h>
#define USLEEP_TIME 100
#define CHECK_TIME 1
using namespace std;
class CDoit
{
public:
virtual int start(void *){};
virtual int end(){};
};
class CthreadPoolManage
{
private:
int _minThreads; //最少保留幾個(gè)線程
int _maxThreads; //最多可以有幾個(gè)線程
int _waitSec; //空閑多少秒后將線程關(guān)閉
class threadInfo{
public:
threadInfo(){
isbusy = false;
doFlag = true;
}
//
pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
bool isbusy; //是否空閑
bool doFlag;
//
time_t beginTime; //線程不工作開始時(shí)間
pthread_t cThreadPid; //線程id
pthread_attr_t cThreadAttr; //線程屬性
CDoit * doit; //任務(wù)類
void * value; //需要傳遞的值
};
//線程函數(shù)
static void* startThread(void*);
//任務(wù)隊(duì)列鎖
pthread_mutex_t _duty_mutex;
//任務(wù)隊(duì)列
list<threadInfo*> _dutyList;
//線程隊(duì)列鎖
pthread_mutex_t _thread_mutex;
//線程隊(duì)列
list<threadInfo*> _threadList;
///初始化,創(chuàng)建最小個(gè)數(shù)線程///
void initThread();
///任務(wù)分配線程///
static void* taskAllocation(void*arg);
pthread_t tasktPid;
///線程銷毀、狀態(tài)檢查線程///
static void* checkThread(void* arg);
pthread_t checktPid;
bool checkrun;
//線程異常退出清理
static void threadCleanUp(void* arg);
//
int addThread(list<threadInfo*> *plist,threadInfo* ptinfo);
public:
CthreadPoolManage();
/*
保留的最少線程,最多線程數(shù),空閑多久銷毀,保留幾個(gè)線程的冗余
*/
CthreadPoolManage(int min,int max,int waitSec);
~CthreadPoolManage();
int start();
//任務(wù)注入器
int putDuty(CDoit *,void *);
int getNowThreadNum();
};
#endif // CTHREADPOOLMANAGE_H
CPP
/*
* 線程池,線程管理類
*
*/
#include "cthreadpoolmanage.h"
CthreadPoolManage::CthreadPoolManage()
{
_minThreads = 5; //最少保留幾個(gè)線程
_maxThreads = 5; //最多可以有幾個(gè)線程
_waitSec = 10; //空閑多少秒后將線程關(guān)閉
pthread_mutex_init(&_duty_mutex, NULL);
pthread_mutex_init(&_thread_mutex, NULL);
checkrun = true;
}
CthreadPoolManage::CthreadPoolManage(int min, int max, int waitSec)
{
CthreadPoolManage();
_minThreads = min; //最少保留幾個(gè)線程
_maxThreads = max; //最多可以有幾個(gè)線程
_waitSec = waitSec; //空閑多少秒后將線程關(guān)閉
}
CthreadPoolManage::~CthreadPoolManage()
{
}
void CthreadPoolManage::threadCleanUp(void* arg)
{
threadInfo* tinfo = (threadInfo*)arg;
tinfo->isbusy = false;
pthread_mutex_unlock(&tinfo->mtx);
pthread_attr_destroy (&tinfo->cThreadAttr);
delete tinfo;
}
void* CthreadPoolManage::startThread(void* arg)
{
cout<<"線程開始工作"<<endl;
threadInfo* tinfo = (threadInfo*)arg;
pthread_cleanup_push(threadCleanUp,arg);
while(tinfo->doFlag){
pthread_mutex_lock(&tinfo->mtx);
if(tinfo->doit == NULL)
{
cout<<"開始等待任務(wù)"<<endl;
pthread_cond_wait(&tinfo->cond,&tinfo->mtx);
cout<<"有任務(wù)了"<<endl;
}
tinfo->isbusy = true;
tinfo->doit->start(tinfo->value);
tinfo->doit->end();
tinfo->doit=NULL;
tinfo->isbusy = false;
time( &tinfo->beginTime);
pthread_mutex_unlock(&tinfo->mtx);
}
//0正常執(zhí)行到這兒不執(zhí)行清理函數(shù),異常會(huì)執(zhí)行
pthread_cleanup_pop(0);
pthread_attr_destroy (&tinfo->cThreadAttr);
delete tinfo;
cout<<"線程結(jié)束"<<endl;
}
void CthreadPoolManage::initThread()
{
int i = 0;
for(i = 0;i<this->_minThreads;i++)
{
threadInfo *tinfo = new threadInfo;
tinfo->doit = NULL;
tinfo->value = NULL;
tinfo->isbusy = false;
tinfo->doFlag = true;
// PTHREAD_CREATE_DETACHED (分離線程) 和 PTHREAD _CREATE_JOINABLE (非分離線程)
pthread_attr_init(&tinfo->cThreadAttr);
pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED );
cout<<"初始化了一個(gè)線程"<<endl;
if(pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,(void *)tinfo) != 0)
{
cout<<"創(chuàng)建線程失敗"<<endl;
break;
}
this->_threadList.push_back(tinfo);
}
}
int CthreadPoolManage::addThread(std::list< CthreadPoolManage::threadInfo* >* plist, CthreadPoolManage::threadInfo* ptinfo)
{
threadInfo *tinfo = new threadInfo;
tinfo->doit = ptinfo->doit;
tinfo->value = ptinfo->value;
tinfo->isbusy = true;
if(pthread_create(&tinfo->cThreadPid,NULL,startThread,(void *)tinfo) != 0)
{
cout<<"創(chuàng)建線程失敗"<<endl;
return -1;
}
plist->push_back(tinfo);
return 0;
}
int CthreadPoolManage::putDuty(CDoit* doit, void* value)
{
threadInfo *tinfo = new threadInfo;
time( &tinfo->beginTime);
tinfo->doit= doit;
tinfo->value = value;
pthread_mutex_lock(&_duty_mutex);
this->_dutyList.push_back(tinfo);
pthread_mutex_unlock(&_duty_mutex);
return 0;
}
void* CthreadPoolManage::taskAllocation(void*arg)
{
CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg;
int size_1 = 0;
int size_2 = 0;
int i_1 = 0;
int i_2 = 0;
bool a_1 = true;
bool a_2 = true;
threadInfo* ptinfo;
threadInfo* ptinfoTmp;
while(true){
size_1 = 0;
size_2 = 0;
pthread_mutex_lock(&ptmanage->_duty_mutex);
pthread_mutex_lock(&ptmanage->_thread_mutex);
size_1 = ptmanage->_dutyList.size();
size_2 =ptmanage->_threadList.size();
for(list<threadInfo*>::iterator itorti1 = ptmanage->_dutyList.begin();itorti1 !=ptmanage->_dutyList.end();)
{
ptinfo = *itorti1;
a_1 = true;
for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){
ptinfoTmp = *itorti2;
if(EBUSY == pthread_mutex_trylock(&ptinfoTmp->mtx))
{
continue;
}
if(!ptinfoTmp->isbusy)
{
ptinfoTmp->doit = ptinfo->doit;
ptinfoTmp->value = ptinfo->value;
ptinfoTmp->isbusy = true;
pthread_cond_signal(&ptinfoTmp->cond);
pthread_mutex_unlock(&ptinfoTmp->mtx);
a_1 = false;
delete ptinfo;
break;
}
pthread_mutex_unlock(&ptinfoTmp->mtx);
}
if(a_1){
if(ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0)
{
itorti1++;
continue;
}else{
itorti1 = ptmanage->_dutyList.erase(itorti1);
}
delete ptinfo;
}else{
itorti1 = ptmanage->_dutyList.erase(itorti1);
}
}
pthread_mutex_unlock(&ptmanage->_duty_mutex);
pthread_mutex_unlock(&ptmanage->_thread_mutex);
usleep(USLEEP_TIME);
}
return 0;
}
void* CthreadPoolManage::checkThread(void* arg)
{
CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg;
threadInfo* ptinfo;
time_t nowtime;
while(ptmanage->checkrun){
sleep(CHECK_TIME);
pthread_mutex_lock(&ptmanage->_thread_mutex);
if(ptmanage->_threadList.size()<=ptmanage->_minThreads)
{
pthread_mutex_unlock(&ptmanage->_thread_mutex);
continue;
}
for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){
ptinfo = *itorti2;
if(EBUSY == pthread_mutex_trylock(&ptinfo->mtx))
{
itorti2++;
continue;
}
time(&nowtime);
if(ptinfo->isbusy == false && nowtime-ptinfo->beginTime>ptmanage->_waitSec)
{
ptinfo->doFlag = false;
itorti2 = ptmanage->_threadList.erase(itorti2);
}else{
itorti2++;
}
pthread_mutex_unlock(&ptinfo->mtx);
}
pthread_mutex_unlock(&ptmanage->_thread_mutex);
}
}
int CthreadPoolManage::start()
{
//初始化
this->initThread();
//啟動(dòng)任務(wù)分配線程
if(pthread_create(&tasktPid,NULL,taskAllocation,(void *)this) != 0)
{
cout<<"創(chuàng)建任務(wù)分配線程失敗"<<endl;
return -1;
}
//創(chuàng)建現(xiàn)程狀態(tài)分配管理線程
if(pthread_create(&checktPid,NULL,checkThread,(void *)this) != 0)
{
cout<<"創(chuàng)建線程狀態(tài)分配管理線程失敗"<<endl;
return -1;
}
return 0;
}
///////////////////////////////
int CthreadPoolManage::getNowThreadNum()
{
int num = 0;
pthread_mutex_lock(&this->_thread_mutex);
num = this->_threadList.size();
pthread_mutex_unlock(&this->_thread_mutex);
return num ;
}
上一篇:深入淺析STL vector用法
欄 目:C語言
本文標(biāo)題:c++實(shí)現(xiàn)簡單的線程池
本文地址:http://www.jygsgssxh.com/a1/Cyuyan/2663.html
您可能感興趣的文章
- 04-02c語言沒有round函數(shù) round c語言
- 01-10數(shù)據(jù)結(jié)構(gòu)課程設(shè)計(jì)-用棧實(shí)現(xiàn)表達(dá)式求值的方法詳解
- 01-10使用OpenGL實(shí)現(xiàn)3D立體顯示的程序代碼
- 01-10深入理解C++中常見的關(guān)鍵字含義
- 01-10求斐波那契(Fibonacci)數(shù)列通項(xiàng)的七種實(shí)現(xiàn)方法
- 01-10C語言 解決不用+、-、&#215;、&#247;數(shù)字運(yùn)算符做加法
- 01-10使用C++實(shí)現(xiàn)全排列算法的方法詳解
- 01-10c++中inline的用法分析
- 01-10用C++實(shí)現(xiàn)DBSCAN聚類算法
- 01-10深入全排列算法及其實(shí)現(xiàn)方法


閱讀排行
本欄相關(guān)
- 04-02c語言函數(shù)調(diào)用后清空內(nèi)存 c語言調(diào)用
- 04-02func函數(shù)+在C語言 func函數(shù)在c語言中
- 04-02c語言的正則匹配函數(shù) c語言正則表達(dá)
- 04-02c語言用函數(shù)寫分段 用c語言表示分段
- 04-02c語言中對(duì)數(shù)函數(shù)的表達(dá)式 c語言中對(duì)
- 04-02c語言編寫函數(shù)冒泡排序 c語言冒泡排
- 04-02c語言沒有round函數(shù) round c語言
- 04-02c語言分段函數(shù)怎么求 用c語言求分段
- 04-02C語言中怎么打出三角函數(shù) c語言中怎
- 04-02c語言調(diào)用函數(shù)求fibo C語言調(diào)用函數(shù)求
隨機(jī)閱讀
- 08-05dedecms(織夢(mèng))副欄目數(shù)量限制代碼修改
- 01-10C#中split用法實(shí)例總結(jié)
- 08-05織夢(mèng)dedecms什么時(shí)候用欄目交叉功能?
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 01-10delphi制作wav文件的方法
- 01-10SublimeText編譯C開發(fā)環(huán)境設(shè)置
- 01-10使用C語言求解撲克牌的順子及n個(gè)骰子
- 01-11ajax實(shí)現(xiàn)頁面的局部加載
- 08-05DEDE織夢(mèng)data目錄下的sessions文件夾有什
- 04-02jquery與jsp,用jquery


