很多場合之所以使用C++,一方面是由于C++編譯后的native code的高效性能,另一方面是由于C++優(yōu)秀的并發(fā)能力。并行方式有多進(jìn)程 和多線程之分,本章暫且只討論多線程,多進(jìn)程方面的知識會在其他章節(jié)具體討論。多線程是開發(fā)C++服務(wù)器程序非常重要的基礎(chǔ),如何根據(jù)需求具體的設(shè)計、分配線程以及線程間的通信,也是服務(wù)器程序非常重要的部分,除了能夠帶來程序的性能提高外,若設(shè)計失誤,則可能導(dǎo)致程序復(fù)雜而又混亂,變成bug滋生的溫床。所以設(shè)計、開發(fā)優(yōu)秀的線程組件以供重用,無論如何都是值得的。
線程相關(guān)的api并不復(fù)雜,然而無論是linux還是windows系統(tǒng),都是c風(fēng)格的接口,我們只需簡單的封裝成對象,方便易用即可。任務(wù)隊列是設(shè)計成用來進(jìn)行線程間通信,使用任務(wù)隊列進(jìn)行線程間通信設(shè)計到一些模式,原理并不難理解,我們需要做到是弄清楚,在什么場景下選用什么樣的模式即可。
任務(wù)隊列對線程間通信進(jìn)行了抽象,限定了線程間只能通過傳遞任務(wù),而相關(guān)的數(shù)據(jù)及操作則被任務(wù)保存。任務(wù)隊列這個名詞可能在其他場景定義過其他意義,這里討論的任務(wù)隊列定義為:能夠把封裝了數(shù)據(jù)和操作的任務(wù)在多線程間傳遞的線程安全的先入先出的隊列。其與線程關(guān)系示意圖如下:
注:兩個虛線框分別表示線程A和線程B恩能夠訪問的數(shù)據(jù)邊界,由此可見 任務(wù)隊列是線程間通信的媒介。
生產(chǎn)者消費者模型在軟件設(shè)計中是極其常見的模型,常常被用來實現(xiàn)對各個組件或系統(tǒng)解耦合。大到分布式的系統(tǒng)交互,小到網(wǎng)絡(luò)層對象和應(yīng)用層對象的通訊,都會應(yīng)用到生產(chǎn)者消費者模型,在任務(wù)隊列中,生產(chǎn)和消費的對象為“任務(wù)”。這里把任務(wù)定義為組合了數(shù)據(jù)和操作的對象,或者簡單理解成包含了void (void*) 類型的函數(shù)指針和void* 數(shù)據(jù)指針的結(jié)構(gòu)。我們把任務(wù)定義成類task_t,下面來分析一下task_t的實現(xiàn)。
插入代碼:
class task_impl_i{public: virtual ~task_impl_i(){} virtual void run() = 0; virtual task_impl_i* fork() = 0;};class task_impl_t: public task_impl_i{public: task_impl_t(task_func_t func_, void* arg_): m_func(func_), m_arg(arg_) {} virtual void run() { m_func(m_arg); } virtual task_impl_i* fork() { return new task_impl_t(m_func, m_arg); }protected: task_func_t m_func; void* m_arg;};struct task_t{ static void dumy(void*){} task_t(task_func_t f_, void* d_): task_impl(new task_impl_t(f_, d_)) { } task_t(task_impl_i* task_imp_): task_impl(task_imp_) { } task_t(const task_t& src_): task_impl(src_.task_impl->fork()) { } task_t() { task_impl = new task_impl_t(&task_t::dumy, NULL); } ~task_t() { delete task_impl; } task_t& operator=(const task_t& src_) { delete task_impl; task_impl = src_.task_impl->fork(); return *this; } void run() { task_impl->run(); } task_impl_i* task_impl;};
Task最重要的接口是run,簡單的執(zhí)行保存的操作,具體的操作保存在task_impl_i的基類中,由于對象本身就是數(shù)據(jù)加操作的集合,所以構(gòu)造task_impl_i的子類對象時,為其賦予不同的數(shù)據(jù)和操作即可。這里使用了組合的方式實現(xiàn)了接口和實現(xiàn)的分離。這么做的優(yōu)點是應(yīng)用層只需知道task的概念即可,對應(yīng)task_impl_i不需要了解。由于不同的操作和數(shù)據(jù)可能需要構(gòu)造不同task_impl_i子類,我們需要提供一些泛型函數(shù),能夠?qū)⒂脩舻乃胁僮骱蛿?shù)據(jù)都能輕易的轉(zhuǎn)換成task對象。task_binder_t 提供一系列的gen函數(shù),能夠轉(zhuǎn)換用戶的普通函數(shù)和數(shù)據(jù)為task_t對象。
struct task_binder_t{ //! C function static task_t gen(void (*func_)(void*), void* p_) { return task_t(func_, p_); } template<typename RET> static task_t gen(RET (*func_)(void)) { struct lambda_t { static void task_func(void* p_) { (*(RET(*)(void))p_)(); }; }; return task_t(lambda_t::task_func, (void*)func_); } template<typename FUNCT, typename ARG1> static task_t gen(FUNCT func_, ARG1 arg1_) { struct lambda_t: public task_impl_i { FUNCT dest_func; ARG1 arg1; lambda_t(FUNCT func_, const ARG1& arg1_): dest_func(func_), arg1(arg1_) {} virtual void run() { (*dest_func)(arg1); } virtual task_impl_i* fork() { return new lambda_t(dest_func, arg1); } }; return task_t(new lambda_t(func_, arg1_));
函數(shù)封裝了用戶的操作邏輯,需要在某線程執(zhí)行特定操作時,需要將操作對應(yīng)的函數(shù)轉(zhuǎn)換成task_t,投遞到目的線程對應(yīng)的任務(wù)隊列。任務(wù)隊列使用起來雖然像是在互相投遞消息,但是根本上仍然是共享數(shù)據(jù)式的數(shù)據(jù)交換方式。主要步驟如下:
l 用戶函數(shù)轉(zhuǎn)換成task_t對象
l 鎖定目的線程的任務(wù)隊列,將task_t 放到任務(wù)隊列尾,當(dāng)隊列為空時,目的線程會wait在條件變量上,此時需要signal喚醒目的線程
實現(xiàn)的關(guān)鍵代碼如下:
void produce(const task_t& task_) { lock_guard_t lock(m_mutex); bool need_sig = m_tasklist.empty(); m_tasklist.push_back(task_); if (need_sig) { m_cond.signal(); } }
消費任務(wù)的線程會變成完全的任務(wù)驅(qū)動,該線程只有一個職責(zé),執(zhí)行任務(wù)隊列的所有任務(wù),若當(dāng)前任務(wù)隊列為空時,線程會阻塞在條件變量上,重新有新任務(wù)到來時,線程會被再次喚醒。實現(xiàn)代碼如下:
int consume(task_t& task_) { lock_guard_t lock(m_mutex); while (m_tasklist.empty()) { if (false == m_flag) { return -1; } m_cond.wait(); } task_ = m_tasklist.front(); m_tasklist.pop_front(); return 0;} int run() { task_t t; while (0 == consume(t)) { t.run(); } return 0; }
任務(wù)隊列已經(jīng)提供了run接口,綁定任務(wù)隊列的線程只需執(zhí)行此函數(shù)即可,此函數(shù)除非用戶顯示的調(diào)用任務(wù)隊列的close接口,否則run函數(shù)永不返回。任務(wù)隊列的close接口是專門用來停止任務(wù)隊列的工作的,代碼如下:
void close() { lock_guard_t lock(m_mutex); m_flag = false; m_cond.broadcast();}
首先設(shè)置了關(guān)閉標(biāo)記,然后在條件變量上執(zhí)行broadcast, 任務(wù)隊列的run函數(shù)也會由此退出。在回頭看一下run接口的代碼你會發(fā)現(xiàn),檢查任務(wù)隊列是否關(guān)閉(m_flag 變量)的代碼是在任務(wù)隊列為空的時候才檢測的,這樣能夠保證任務(wù)隊列被全部執(zhí)行后,run函數(shù)才返回。
下面是一個使用任務(wù)隊列的helloworld的示例:
class foo_t{public: void print(int data) { cout << "helloworld, data:" <<data << " thread id:"<< ::pthread_self() << endl; } void print_callback(int data, void (*callback_)(int)) { callback_(data); } static void check(int data) { cout << "helloworld, data:" <<data << " thread id:"<< ::pthread_self() << endl; }};// 單線程單任務(wù)隊列void test_1(){ thread_t thread; task_queue_t tq; thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 1); foo_t foo; for (int i = 0; i < 100; ++i) { cout << "helloworld, thread id:"<< ::pthread_self() << endl; tq.produce(task_binder_t::gen(&foo_t::print, &foo, i)); sleep(1); } thread.join();}int main(int argc, char* argv[]){ test_1(); return 0;}
本例使用單線程單任務(wù)隊列的方式,由于只有一個線程綁定在任務(wù)隊列上,所以任務(wù)的執(zhí)行會嚴(yán)格按照先入先出的方式執(zhí)行。優(yōu)點是能夠保證邏輯操作的有序性,所以最為常用。
如果想利用更多線程,那么創(chuàng)建更多線程的同時,仍然保證每個任務(wù)隊列綁定在單線程上。讓不同的任務(wù)隊列并行執(zhí)行就可以了。
下面幾種情況適用此模式:
l 比如網(wǎng)游中數(shù)據(jù)庫一般會創(chuàng)建連接池,用戶的操作數(shù)據(jù)庫都是有數(shù)據(jù)庫線程池完成,在將結(jié)果投遞給邏輯層。對每個用戶的數(shù)據(jù)增刪改查操作都必須是有序的,所以每個用戶綁定一個固定的任務(wù)隊列。而不同的用戶的數(shù)據(jù)修改互不干擾,不同的用戶分配不同的任務(wù)隊列即可。
l 比如網(wǎng)絡(luò)層中的多個socket的讀寫是互不干擾的,可以創(chuàng)建兩個或更多線程,每個對應(yīng)一個任務(wù)隊列,不同的socket的操作可以隨機(jī)的分配一個任務(wù)隊列(注意分配是隨機(jī)的,一旦分配了,單個socket的所有操作都會由這個任務(wù)隊列完成,保證邏輯有序性)。
示例代碼:
//! 多線程多任務(wù)隊列void test_2(){ thread_t thread; task_queue_t tq[3]; for (unsigned int i = 0; i < sizeof(tq)/sizeof(task_queue_t); ++i) { thread.create_thread(task_binder_t::gen(&task_queue_t::run, &(tq[i])), 1); } foo_t foo; cout << "helloworld, thread id:"<< ::pthread_self() << endl; for (unsigned int j = 0; j < 100; ++j) { tq[j % (sizeof(tq)/sizeof(task_queue_t))].produce(task_binder_t::gen(&foo_t::print, &foo, j)); sleep(1); } thread.join();}
有時候可能并不需要邏輯操作的完全有序,而是要求操作盡可能快的執(zhí)行,只要有空閑線程,任務(wù)就投遞到空閑線程立刻執(zhí)行。如果時序不影響結(jié)果,這種模式會更有效率,下面幾種情況可能用到這種模式:
l 比如social game中的好友是從platform的api獲取的,需要http協(xié)議通訊,若采用curl等http庫同步通訊時,會阻塞線程,這是可以使用多線程單隊列方式,請求投遞到任務(wù)隊列后,只要有空閑線程立馬執(zhí)行,用戶A雖然比用戶B先到達(dá)任務(wù)隊列,但是并不能保證A比B一定先獲取到好友列表,如果A有2k好友,而B只有兩個呢,當(dāng)然有可能B請求更快。
//! 多線程單任務(wù)隊列void test_3(){ thread_t thread; task_queue_t tq; thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 3); foo_t foo; cout << "helloworld, thread id:"<< ::pthread_self() << endl; for (unsigned int j = 0; j < 100; ++j) { tq.produce(task_binder_t::gen(&foo_t::print, &foo, j)); sleep(1); } thread.join();}
任務(wù)隊列的模式中列舉的例子都是線程間單項通訊,線程A將請求投遞給了B,但B執(zhí)行完畢后A并沒有檢測結(jié)果。實際中往往都是需要將執(zhí)行結(jié)果進(jìn)行額外處理或者投遞到另外任務(wù)隊列。異步回調(diào)可以很好的解決這個問題,原理就是投遞任務(wù)時,同時包含檢查任務(wù)執(zhí)行結(jié)果的函數(shù)。示例代碼:
//! 異步回調(diào)void test_4(){ thread_t thread; task_queue_t tq; thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 1); foo_t foo; cout << "helloworld, thread id:"<< ::pthread_self() << endl; for (unsigned int j = 0; j < 100; ++j) { tq.produce(task_binder_t::gen(&foo_t::print_callback, &foo, j, &foo_t::check)); sleep(1); } thread.join();}
異步是性能優(yōu)化非常重要的手段,下面如下場合可以使用異步:
l 服務(wù)器程序要求很高的實時性,幾乎邏輯層不執(zhí)行io操作,io操作通過任務(wù)隊列被io線程執(zhí)行成功后再通過回調(diào)的方式傳回邏輯層。
l 網(wǎng)游中用戶登錄,需呀從數(shù)據(jù)庫載入用戶數(shù)據(jù),數(shù)據(jù)庫層不需要知曉邏輯層如何處理用戶數(shù)據(jù),當(dāng)接口被調(diào)用時必須傳入回調(diào)函數(shù),數(shù)據(jù)庫層載入數(shù)據(jù)后直接調(diào)用回調(diào)函數(shù),而數(shù)據(jù)作為參數(shù)。
使用任務(wù)隊列可以解耦多線程的設(shè)計。更加優(yōu)秀的使用是將其封裝在接口之后。前邊的例子中都是顯示的操作了任務(wù)隊列對象。但這就限制了用戶必須知道某個接口需要綁定哪個任務(wù)隊列上,尤其是多線程多任務(wù)隊列的例子,如果當(dāng)用戶操作socket接口時還要知道socket對應(yīng)哪個任務(wù)隊列就顯得不夠優(yōu)雅了。Socket自己本身可以保存對應(yīng)任務(wù)隊列的引用,這樣使用者只需調(diào)用socket的接口,而接口內(nèi)部再將請求投遞到爭取的任務(wù)隊列。示例代碼:
void socket_impl_t::async_send(const string& msg_){ tq.produce(task_binder_t::gen(&socket_impl_t::send, &this, msg_));}void socket_impl_t::send(const string& msg_){ //do send code}
l 設(shè)計多線程程序時,往往設(shè)計使用任務(wù)隊列是關(guān)鍵,好用、高效、靈活的任務(wù)隊列組件十分必需,本節(jié)介紹的實現(xiàn)支持多種多線程模式,易用易理解。
l 異步回調(diào)在多線程程序中非常常見,異步往往是為了提高性能和系統(tǒng)吞吐量的,但是異步其不可避免的會帶來復(fù)雜性,所以盡量保證異步相關(guān)的步驟簡單。
l 任務(wù)隊列封裝對象接口的內(nèi)部更佳,使用者直接調(diào)用接口,仿佛沒有任務(wù)隊列這回事,讓他在看不見的地方默默運(yùn)行。
l 本節(jié)設(shè)計的任務(wù)隊列是線程安全的,并且關(guān)閉時已經(jīng)投遞的任務(wù)能夠保證被 。
代碼:http://code.google.com/p/ffown/source/browse/trunk/#trunk%2Ffflib%2Finclude
聯(lián)系客服