C++基于消息隊(duì)列的多線程實(shí)現(xiàn)示例代碼
前言
實(shí)現(xiàn)消息隊(duì)列的關(guān)鍵因素是考量不同線程訪問(wèn)消息隊(duì)列的同步問(wèn)題。本實(shí)現(xiàn)涉及到幾個(gè)知識(shí)點(diǎn)
std::lock_guard 介紹
std::lock_gurad 是 C++11 中定義的模板類。定義如下:
template <class Mutex> class lock_guard;
lock_guard 對(duì)象通常用于管理某個(gè)鎖(Lock)對(duì)象,因此與 Mutex RAII 相關(guān),方便線程對(duì)互斥量上鎖,即在某個(gè) lock_guard 對(duì)象的聲明周期內(nèi),它所管理的鎖對(duì)象會(huì)一直保持上鎖狀態(tài);而 lock_guard 的生命周期結(jié)束之后,它所管理的鎖對(duì)象會(huì)被解鎖(注:類似 shared_ptr 等智能指針管理動(dòng)態(tài)分配的內(nèi)存資源 )。
模板參數(shù) Mutex 代表互斥量類型,例如 std::mutex 類型,它應(yīng)該是一個(gè)基本的 BasicLockable 類型,標(biāo)準(zhǔn)庫(kù)中定義幾種基本的 BasicLockable 類型,分別 std::mutex, std::recursive_mutex, std::timed_mutex,std::recursive_timed_mutex 以及 std::unique_lock
std::unique_lock 介紹
lock_guard 最大的缺點(diǎn)也是簡(jiǎn)單,沒(méi)有給程序員提供足夠的靈活度,因此,C++11 標(biāo)準(zhǔn)中定義了另外一個(gè)與 Mutex RAII 相關(guān)類 unique_lock,該類與 lock_guard 類相似,也很方便線程對(duì)互斥量上鎖,但它提供了更好的上鎖和解鎖控制。
顧名思義,unique_lock 對(duì)象以獨(dú)占所有權(quán)的方式( unique owership)管理 mutex 對(duì)象的上鎖和解鎖操作,所謂獨(dú)占所有權(quán),就是沒(méi)有其他的 unique_lock 對(duì)象同時(shí)擁有某個(gè) mutex 對(duì)象的所有權(quán)。
新創(chuàng)建的 unique_lock 對(duì)象管理 Mutex 對(duì)象 m,并嘗試調(diào)用 m.lock() 對(duì) Mutex 對(duì)象進(jìn)行上鎖,如果此時(shí)另外某個(gè) unique_lock 對(duì)象已經(jīng)管理了該 Mutex 對(duì)象 m,則當(dāng)前線程將會(huì)被阻塞。
std::condition介紹
當(dāng) std::condition_variable 對(duì)象的某個(gè) wait 函數(shù)被調(diào)用的時(shí)候,它使用 std::unique_lock(通過(guò) std::mutex) 來(lái)鎖住當(dāng)前線程。當(dāng)前線程會(huì)一直被阻塞,直到另外一個(gè)線程在相同的 std::condition_variable 對(duì)象上調(diào)用了 notification 函數(shù)來(lái)喚醒當(dāng)前線程。
std::condition_variable 提供了兩種 wait() 函數(shù)。當(dāng)前線程調(diào)用 wait() 后將被阻塞(此時(shí)當(dāng)前線程應(yīng)該獲得了鎖(mutex),不妨設(shè)獲得鎖 lck),直到另外某個(gè)線程調(diào)用 notify_* 喚醒了當(dāng)前線程。
在線程被阻塞時(shí),該函數(shù)會(huì)自動(dòng)調(diào)用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競(jìng)爭(zhēng)上的線程得以繼續(xù)執(zhí)行。另外,一旦當(dāng)前線程獲得通知(notified,通常是另外某個(gè)線程調(diào)用 notify_* 喚醒了當(dāng)前線程),wait() 函數(shù)也是自動(dòng)調(diào)用 lck.lock(),使得 lck 的狀態(tài)和 wait 函數(shù)被調(diào)用時(shí)相同。
在第二種情況下(即設(shè)置了 Predicate),只有當(dāng) pred 條件為 false 時(shí)調(diào)用 wait() 才會(huì)阻塞當(dāng)前線程,并且在收到其他線程的通知后只有當(dāng) pred 為 true 時(shí)才會(huì)被解除阻塞。因此第二種情況類似以下代碼:
while (!pred()) wait(lck);
std::function介紹
使用std::function可以將普通函數(shù),lambda表達(dá)式和函數(shù)對(duì)象類統(tǒng)一起來(lái)。它們并不是相同的類型,然而通過(guò)function模板類,可以轉(zhuǎn)化為相同類型的對(duì)象(function對(duì)象),從而放入一個(gè)vector或其他容器里,方便回調(diào)。
代碼實(shí)現(xiàn):
#pragma once
#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H
#include <queue>
#include <mutex>
#include <condition_variable>
template<class Type>
class CMessageQueue
{
public:
 CMessageQueue& operator = (const CMessageQueue&) = delete;
 CMessageQueue(const CMessageQueue& mq) = delete;
 CMessageQueue() :_queue(), _mutex(), _condition(){}
 virtual ~CMessageQueue(){}
 void Push(Type msg){
  std::lock_guard <std::mutex> lock(_mutex);
  _queue.push(msg);
   //當(dāng)使用阻塞模式從消息隊(duì)列中獲取消息時(shí),由condition在新消息到達(dá)時(shí)提醒等待線程
  _condition.notify_one();
 }
  //blocked定義訪問(wèn)方式是同步阻塞或者非阻塞模式
 bool Pop(Type& msg, bool isBlocked = true){
  if (isBlocked)
  {
   std::unique_lock <std::mutex> lock(_mutex);
   while (_queue.empty())
   {
    _condition.wait(lock);
    
   }
   //注意這一段必須放在if語(yǔ)句中,因?yàn)閘ock的生命域僅僅在if大括號(hào)內(nèi)
   msg = std::move(_queue.front());
   _queue.pop();
   return true;
   
  }
  else
  {
   std::lock_guard<std::mutex> lock(_mutex);
   if (_queue.empty())
    return false;
   msg = std::move(_queue.front());
   _queue.pop();
   return true;
  }
 }
 int32_t Size(){
  std::lock_guard<std::mutex> lock(_mutex);
  return _queue.size();
 }
 bool Empty(){
  std::lock_guard<std::mutex> lock(_mutex);
  return _queue.empty();
 }
private:
 std::queue<Type> _queue;//存儲(chǔ)消息的隊(duì)列
 mutable std::mutex _mutex;//同步鎖
 std::condition_variable _condition;//實(shí)現(xiàn)同步式獲取消息
};
#endif//MESSAGE_QUEUE_H
線程池可以直接在構(gòu)造函數(shù)中構(gòu)造線程,并傳入回調(diào)函數(shù),也可以寫一個(gè)Run函數(shù)顯示調(diào)用。這里我們選擇了第二種,對(duì)比:
- 在handler函數(shù)外部做循環(huán)接受消息,當(dāng)消息到達(dá)后調(diào)用hanlder處理。這種實(shí)現(xiàn)在上層做封裝,但是會(huì)在線程中頻繁的切換調(diào)用函數(shù)。這種設(shè)計(jì)無(wú)法復(fù)用一些資源,如當(dāng)在handler中做數(shù)據(jù)庫(kù)操作時(shí),需要頻繁的連接和斷開連接,可以通過(guò)定義兩個(gè)虛函數(shù)Prehandler和AfterHandler來(lái)實(shí)現(xiàn)。
?。?!構(gòu)造函數(shù)中調(diào)用虛函數(shù)并不會(huì)能真正的調(diào)用子類的實(shí)現(xiàn)?。?!
雖然可以對(duì)虛函數(shù)進(jìn)行實(shí)調(diào)用,但程序員編寫虛函數(shù)的本意應(yīng)該是實(shí)現(xiàn)動(dòng)態(tài)聯(lián)編。在構(gòu)造函數(shù)中調(diào)用虛函數(shù),函數(shù)的入口地址是在編譯時(shí)靜態(tài)確定的,并未實(shí)現(xiàn)虛調(diào)用 - 寫一個(gè)Run函數(shù),將這一部分實(shí)現(xiàn)放在run函數(shù)中,顯示調(diào)用。
《Effective C++ 》條款9:永遠(yuǎn)不要在構(gòu)造函數(shù)或析構(gòu)函數(shù)中調(diào)用虛函數(shù) 
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <functional>
#include <vector>
#include <thread>
#include "MessageQueue.h"
#define MIN_THREADS 1
template<class Type>
class CThreadPool
{
 CThreadPool& operator = (const CThreadPool&) = delete;
 CThreadPool(const CThreadPool& other) = delete;
public:
 CThreadPool(int32_t threads, 
  std::function<void(Type& record, CThreadPool<Type>* pSub)> handler);
 virtual ~CThreadPool();
 void Run();
 virtual void PreHandler(){}
 virtual void AfterHandler(){}
 void Submit(Type record);
private:
 bool _shutdown;
 int32_t _threads;
 std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler;
 std::vector<std::thread> _workers;
 CMessageQueue<Type> _tasks;
};
template<class Type>
CThreadPool<Type>::CThreadPool(int32_t threads, 
 std::function<void(Type& record, CThreadPool<Type>* pSub)> handler)
 :_shutdown(false),
 _threads(threads),
 _handler(handler),
 _workers(),
 _tasks()
{
 //第一種實(shí)現(xiàn)方案,注意這里的虛函數(shù)調(diào)用不正確
 /*if (_threads < MIN_THREADS)
  _threads = MIN_THREADS;
 for (int32_t i = 0; i < _threads; i++)
 {
  
  _workers.emplace_back(
   [this]{
   PreHandler();
   while (!_shutdown){
    Type record;
    _tasks.Pop(record, true);
    _handler(record, this);
   }
   AfterHandler();
  }
  );
 }*/
}
//第二種實(shí)現(xiàn)方案
template<class Type>
void CThreadPool<Type>::Run()
{
 if (_threads < MIN_THREADS)
  _threads = MIN_THREADS;
 for (int32_t i = 0; i < _threads; i++)
 {
  _workers.emplace_back(
   [this]{
   PreHandler();
   while (!_shutdown){
    Type record;
    _tasks.Pop(record, true);
    _handler(record, this);
   }
   AfterHandler();
  }
  );
 }
}
template<class Type>
CThreadPool<Type>::~CThreadPool()
{
 for (std::thread& worker : _workers)
  worker.join();
}
template<class Type>
void CThreadPool<Type>::Submit(Type record)
{
 _tasks.Push(record);
}
#endif // !THREAD_POOL_H
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)我們的支持。
上一篇:詳解C++中虛析構(gòu)函數(shù)的作用及其原理分析
欄 目:C語(yǔ)言
下一篇:VC++文件監(jiān)控之ReadDirectoryChangesW
本文標(biāo)題:C++基于消息隊(duì)列的多線程實(shí)現(xiàn)示例代碼
本文地址:http://www.jygsgssxh.com/a1/Cyuyan/369.html
您可能感興趣的文章
- 04-02c語(yǔ)言沒(méi)有round函數(shù) round c語(yǔ)言
 - 01-10深入理解C++中常見(jiàn)的關(guān)鍵字含義
 - 01-10使用C++實(shí)現(xiàn)全排列算法的方法詳解
 - 01-10c++中inline的用法分析
 - 01-10用C++實(shí)現(xiàn)DBSCAN聚類算法
 - 01-10全排列算法的非遞歸實(shí)現(xiàn)與遞歸實(shí)現(xiàn)的方法(C++)
 - 01-10C++大數(shù)模板(推薦)
 - 01-10淺談C/C++中的static與extern關(guān)鍵字的使用詳解
 - 01-10深入C/C++浮點(diǎn)數(shù)在內(nèi)存中的存儲(chǔ)方式詳解
 - 01-10基于atoi()與itoa()函數(shù)的內(nèi)部實(shí)現(xiàn)方法詳解
 


閱讀排行
- 1C語(yǔ)言 while語(yǔ)句的用法詳解
 - 2java 實(shí)現(xiàn)簡(jiǎn)單圣誕樹的示例代碼(圣誕
 - 3利用C語(yǔ)言實(shí)現(xiàn)“百馬百擔(dān)”問(wèn)題方法
 - 4C語(yǔ)言中計(jì)算正弦的相關(guān)函數(shù)總結(jié)
 - 5c語(yǔ)言計(jì)算三角形面積代碼
 - 6什么是 WSH(腳本宿主)的詳細(xì)解釋
 - 7C++ 中隨機(jī)函數(shù)random函數(shù)的使用方法
 - 8正則表達(dá)式匹配各種特殊字符
 - 9C語(yǔ)言十進(jìn)制轉(zhuǎn)二進(jìn)制代碼實(shí)例
 - 10C語(yǔ)言查找數(shù)組里數(shù)字重復(fù)次數(shù)的方法
 
本欄相關(guān)
- 04-02c語(yǔ)言函數(shù)調(diào)用后清空內(nèi)存 c語(yǔ)言調(diào)用
 - 04-02func函數(shù)+在C語(yǔ)言 func函數(shù)在c語(yǔ)言中
 - 04-02c語(yǔ)言的正則匹配函數(shù) c語(yǔ)言正則表達(dá)
 - 04-02c語(yǔ)言用函數(shù)寫分段 用c語(yǔ)言表示分段
 - 04-02c語(yǔ)言中對(duì)數(shù)函數(shù)的表達(dá)式 c語(yǔ)言中對(duì)
 - 04-02c語(yǔ)言編寫函數(shù)冒泡排序 c語(yǔ)言冒泡排
 - 04-02c語(yǔ)言沒(méi)有round函數(shù) round c語(yǔ)言
 - 04-02c語(yǔ)言分段函數(shù)怎么求 用c語(yǔ)言求分段
 - 04-02C語(yǔ)言中怎么打出三角函數(shù) c語(yǔ)言中怎
 - 04-02c語(yǔ)言調(diào)用函數(shù)求fibo C語(yǔ)言調(diào)用函數(shù)求
 
隨機(jī)閱讀
- 01-10C#中split用法實(shí)例總結(jié)
 - 08-05dedecms(織夢(mèng))副欄目數(shù)量限制代碼修改
 - 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
 - 04-02jquery與jsp,用jquery
 - 01-10delphi制作wav文件的方法
 - 01-10使用C語(yǔ)言求解撲克牌的順子及n個(gè)骰子
 - 01-11ajax實(shí)現(xiàn)頁(yè)面的局部加載
 - 01-10SublimeText編譯C開發(fā)環(huán)境設(shè)置
 - 08-05織夢(mèng)dedecms什么時(shí)候用欄目交叉功能?
 - 08-05DEDE織夢(mèng)data目錄下的sessions文件夾有什
 


