Linux 之C++ 線程池
?????? 我想做的就是對(duì)每個(gè)線程進(jìn)行有針對(duì)性的控制,也即可以對(duì)線程進(jìn)行暫停,恢復(fù),退出等等精細(xì)控制,對(duì)于此項(xiàng)要求,我的想法是聲明一個(gè)類,該類中有些精細(xì)的操作其中包括該線程的狀態(tài),對(duì)線程控制的互斥變量(pthread_mutex_t),以及喚醒的條件(pthread_cond_t),要實(shí)現(xiàn)這些功能其核心在于,每個(gè)線程擁有自己的控制變量,也即線程執(zhí)行函數(shù)是線程類的靜態(tài)函數(shù),該線程函數(shù)的參數(shù)是該線程類對(duì)象的this指針?。?!
class?Servant { private: pthread_mutex_t?mutex_x; pthread_cond_t?cond_t; int?state; public: pthread_t?m_tid; Servant(); void?start(); static?void*?ThreadProc(void*?data); void?notify(); void?stop(); void?wakeup(); void?exit(); void?join(); };
這是該線程類的聲明,最主要的是查看ThreadProc靜態(tài)函數(shù)
void*?Servant::ThreadProc(void*?data) {???? ????Servant*?ser=(Servant*)data; ????int?result=0; ????while(ser->state!=EXIT) ????{ ????????while(ser->state==IDLE) ????????{ ????????????result=pthread_mutex_lock(&(ser->mutex_x)); ????????????if(0==result) ????????????{ ????????????????printf("waiting?for?the?mutex?n"); ????????????}???????????? ????????????result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x));???? ????????????if(ser->state!=IDLE) ????????????????goto?End; ???????????????? ????????????printf("the?conditions?has?been?notifiedn"); ????????????//we?take?this?thread?to?busy?list ????????????ser->m_pool->AddtoIdle(false,ser); ????????????ser->m_pool->Addtobusy(true,ser); ????????????//?really???work ???????????? ????????????DoSomething: ????????????ser->state=WORKING;???????????? ????????????printf("Do?Something...n"); ????????????ser->DoJob(); ????????????ser->Complete();//this?function?change?state ????????????End: ????????????pthread_mutex_unlock(&(ser->mutex_x));???????????????????? ????????} ????} ????return?NULL; }
這函數(shù),主要控制的就兩項(xiàng),其一state,每次對(duì)線程狀態(tài)的改變,都是基于原子線程已經(jīng)執(zhí)行完畢的基礎(chǔ)上的,也即,核心操作是不能被狀態(tài)的改變所修改的。每次被喚醒時(shí),都需要進(jìn)行檢測(cè)state的值,如果STATE還是RUN,則表明有新的任務(wù)要處理,否則表明喚醒只是為了要修改當(dāng)前線程狀態(tài)?。?!,現(xiàn)在看一下一個(gè)API
pthread_cond_wait,該函數(shù)調(diào)用時(shí)首先必須必須由本線程加鎖,這個(gè)函數(shù)需要兩個(gè)變量一個(gè)是mutex_x(加鎖的互斥變量)和cond_t(條件變量),在更新條件以前,mutex_x一直保持鎖定狀態(tài),并在線程掛起進(jìn)入等待解鎖,當(dāng)執(zhí)行pthread_cond_singal時(shí),激活互斥變量mutex,并向下執(zhí)行,當(dāng)離開(kāi)pthread_cond_wait時(shí),重新加鎖
現(xiàn)Servant.h文件內(nèi)容如下:
#include#includeusing?namespace?std; class?Servant { private: pthread_mutex_t?mutex_x; pthread_cond_t?cond_t; int?state; public: pthread_t?m_tid; Servant(); void?start(); static?void*?ThreadProc(void*?data); void?notify(); void?stop(); void?wakeup(); void?exit(); void?join(); }; //ServantPool?supposed?to?be?singlton class?ServantPool { private: vectorm_idle_list; vectorm_busy_list; ServantPool(){}; static?ServantPool?*?hInstance; public: void?TerminateAll(); void?create(int?num); void?start(); void?stop(pthread_t?id); void?wakeup(pthread_t?id); void?waitforAll(); ~ServantPool(); static?ServantPool?*?getInstance(); };
Servant.cpp文件內(nèi)容如下:
#include#include#include#include#include#include#include#include"Servant.h" #define?WORKING?1 #define?STOP?2 #define?EXIT?3 #define?IDLE?4 static?int?num=0; static?????pthread_mutex_t?vec_mutex;? void?Servant::start() { ????state=IDLE; ????int?result=pthread_create(&m_tid,NULL,ThreadProc,this); ????if(0!=result) ????{ ????????printf("thread?create?result?:%sn",strerror(errno)); ????} } Servant::Servant(ServantPool*?ma) {???? ????thread_id=num++; ????m_pool=ma;???? ????pthread_mutex_init(&mutex_x,NULL); ????pthread_cond_init(&cond_t,NULL);???? } void?Servant::setSocket(int?m) { ????m_socket=m; } void?Servant::DoJob() { ????char?buf[100]={0}; ????recv(m_socket,buf,100,0); ????printf("we?haved?the?%d?recv:%sn",num++,buf); ????close(m_socket); } void*?Servant::ThreadProc(void*?data) {???? ????Servant*?ser=(Servant*)data; ????int?result=0; ????while(ser->state!=EXIT) ????{ ????????while(ser->state==IDLE) ????????{ ????????????result=pthread_mutex_lock(&(ser->mutex_x)); ????????????if(0==result) ????????????{ ????????????????printf("waiting?for?the?mutex?n"); ????????????}???????????? ????????????result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x));???? ????????????if(ser->state!=IDLE) ????????????????goto?End; ???????????????? ????????????printf("the?conditions?has?been?notifiedn"); ????????????//we?take?this?thread?to?busy?list ????????????ser->m_pool->AddtoIdle(false,ser); ????????????ser->m_pool->Addtobusy(true,ser); ????????????//?really???work ???????????? ????????????DoSomething: ????????????ser->state=WORKING;???????????? ????????????printf("Do?Something...n"); ????????????ser->DoJob(); ????????????ser->Complete();//this?function?change?state ????????????End: ????????????pthread_mutex_unlock(&(ser->mutex_x));???????????????????? ????????} ????} ????return?NULL; } void?Servant::stop() { ????if(state==IDLE)???????? ????{ ????????m_pool->AddtoIdle(false,this); ????????m_pool->Addtostop(true,this); ????????state=STOP; ????????printf("thread?stop!n");???????? ????} ????else?if(state==WORKING) ????{ ????????printf("current?state?is?WORKING?stop?failed!n");???????????????? ????} ????else?if(state==STOP) ????{ ????????printf("thread?already?stopped!n");???????????????? ????} ????else? ????{ ????????printf("sorry?unknown?state!n"); ????????state=STOP; ????} } void?Servant::wakeup() { ????if(state==STOP)???????? ????{ ????????m_pool->Addtostop(false,this); ????????m_pool->AddtoIdle(true,this); ????????state=IDLE;???? ????????printf("thread?wakeup!n");???????????? ????} ????else?if(state==WORKING) ????{ ????????printf("current?state??is?WORKING?stop?failed!n"); ????} ????else?if(state==IDLE) ????{ ????????printf("current?state??is?idle?never?need?wakeup!n"); ????} ????else ????{ ????????printf("sorry?unknown?state..n"); ????????state=IDLE; ????} } void?Servant::Complete()//完成操作 { ????//完成任務(wù),該線程變?yōu)閕dle???????? ????m_pool->Addtobusy(false,this); ????m_pool->AddtoIdle(true,this);???? ????state=IDLE; } void?Servant::join() { ????pthread_join(m_tid,NULL); } void?Servant::notify() { ????if(state==IDLE) ????{???????? ????????printf("we?have?notified?thread?runningn"); ????????pthread_cond_signal(&cond_t);???? ????} ????else ????{ ????????printf("sorry?,the?signal?is?not?correctn"); ????} } void?Servant::exit() { ????state=EXIT; ????pthread_cond_signal(&cond_t);???? } void?ServantPool::StopAll() {???? ????vector::iterator?itr=m_idle_list.begin();???? ????for(;itr!=m_idle_list.end();) ????{ ????????(*itr)->stop();???????? ????} ????itr=m_busy_list.begin(); ????for(;itr!=m_busy_list.end();) ????{ ????????(*itr)->stop();???????? ????}???? } void?ServantPool::create(int?num) { ????int?i=0; ????for(;i<num;i++) ????{ ????????Servant*?tmp=new?Servant(this); ????????m_idle_list.push_back(tmp); ????} } void?ServantPool::AddtoIdle(bool?add,Servant*?ser) { ????if(add) ????{ ????????//?add?ser?to?idle?list ????????pthread_mutex_lock(&vec_mutex); ????????m_idle_list.push_back(ser); ????????pthread_mutex_unlock(&vec_mutex); ????} ????else ????{ ????????//?del?ser?from?idle?list ????????pthread_mutex_lock(&vec_mutex); ????????vector::iterator?itr=m_idle_list.begin(); ????????for(;itr!=m_idle_list.end();itr++) ????????{ ????????????if(*itr==ser) ????????????{ ????????????????m_idle_list.erase(itr); ????????????????break; ????????????} ????????} ????????pthread_mutex_unlock(&vec_mutex); ????} } void?ServantPool::Addtobusy(bool?add,Servant*?ser) { ????if(add) ????{ ????????//?add?ser?to?idle?list ????????pthread_mutex_lock(&vec_mutex); ????????m_busy_list.push_back(ser); ????????pthread_mutex_unlock(&vec_mutex); ????} ????else ????{ ????????//?del?ser?from?idle?list ????????pthread_mutex_lock(&vec_mutex); ????????vector::iterator?itr=m_busy_list.begin(); ????????for(;itr!=m_busy_list.end();itr++) ????????{ ????????????if(*itr==ser) ????????????{ ????????????????m_busy_list.erase(itr); ????????????????break; ????????????} ????????} ????????pthread_mutex_unlock(&vec_mutex); ????} } void?ServantPool::Addtostop(bool?add,Servant*?ser) { ????if(add) ????{ ????????//?add?ser?to?idle?list ????????pthread_mutex_lock(&vec_mutex); ????????m_stop_list.push_back(ser); ????????pthread_mutex_unlock(&vec_mutex); ????} ????else ????{ ????????//?del?ser?from?idle?list ????????pthread_mutex_lock(&vec_mutex); ????????vector::iterator?itr=m_stop_list.begin(); ????????for(;itr!=m_stop_list.end();itr++) ????????{ ????????????if(*itr==ser) ????????????{ ????????????????m_stop_list.erase(itr); ????????????????break; ????????????} ????????} ????????pthread_mutex_unlock(&vec_mutex); ????} } void?ServantPool::startAll() { ????int?i=0; ????for(;istart();//create?the?thread ????} } void?ServantPool::stop(Servant*?id) {???? ????vector::iterator?itr=m_idle_list.begin();???? ????for(;itr!=m_idle_list.end();itr++) ????{ ????????if((*itr)==id) ????????{???? ????????????(*itr)->stop();???????????? ????????????return; ????????} ????} } void?ServantPool::waitforAll() { ????int?i=0; ????int?nums=m_busy_list.size(); ????for(;ijoin(); ????} ????nums=m_idle_list.size(); ????i=0; ????for(;ijoin(); ????}???? } void?ServantPool::wakeup(Servant*?id) {???? ????vector::iterator?itr=m_busy_list.begin();???? ????for(;itr!=m_busy_list.end();itr++) ????{ ????????if((*itr)==id) ????????{???? ????????????(*itr)->wakeup();???????????? ????????????return; ????????} ????} } ServantPool?*?ServantPool::hInstance=NULL; ServantPool?*?ServantPool::getInstance() {???? ????if(NULL==hInstance) ????{ ????????hInstance=new?ServantPool(); ????????pthread_mutex_init(&vec_mutex,NULL); ????} ????return?hInstance;???????? } ServantPool::~ServantPool() { ????vector::iterator?itr=m_idle_list.begin(); ????for(;itr!=m_idle_list.end();) ????{???????? ????????(*itr)->exit();???????????????? ????????delete?*itr;???????? ????????itr=m_idle_list.erase(itr);???????? ????} ???? ????itr=m_busy_list.begin(); ????for(;itr!=m_busy_list.end();) ????{???????? ????????(*itr)->exit();???????????????? ????????delete?*itr;???????? ????????itr=m_busy_list.erase(itr);???????? ????}???? ???? ????itr=m_stop_list.begin(); ????for(;itr!=m_stop_list.end();) ????{???????? ????????(*itr)->exit();???????????????? ????????delete?*itr;???????? ????????itr=m_stop_list.erase(itr);???????? ????}???? } Servant*?ServantPool::Accepting() { ????if(m_idle_list.size()>0) ????{ ????????return?m_idle_list[0]; ????} ????return?NULL; }
main.c文件如下:
#include#include#include#include#include"Servant.h" //create?100?servants?thread #define?SERVANT_NUM?1 int?main() { ServantPool?*mm=ServantPool::getInstance(); printf("we?begin?create?%d?Servant?threads?n",SERVANT_NUM); mm->create(SERVANT_NUM); mm->start(); mm->waitforAll(); return?0; }
makefile文件內(nèi)容如下:
test:Servant.o?main.o g++?-o???test?Servant.o?main.o??-lpthread Servant.o:Servant.cpp?Servant.h g++?-g?-c??Servant.cpp?-lpthread main.o:main.c?Servant.h??? g++?-g?-c??main.c?-lpthread clean: rm?*.o?test
這里面需要注意一點(diǎn),就是,在修改線程狀態(tài)時(shí),首先應(yīng)該修改的是線程狀態(tài)值,然后才繼續(xù)進(jìn)行信號(hào)通知,這樣可以避免,當(dāng)信號(hào)通知時(shí),狀態(tài)值還沒(méi)有發(fā)生變化?。。?/p>
信號(hào)修改失敗。好了,這就是暫時(shí)要表達(dá)的線程池的內(nèi)容了,如有錯(cuò)誤,該請(qǐng)大神指正