二. 分布式計(jì)算(Map/Reduce)
分布式式計(jì)算,同樣是一個(gè)寬泛的概念,在這里,它狹義的指代,按GoogleMap/Reduce框架所設(shè)計(jì)的分布式框架。在Hadoop中,分布式文件系統(tǒng),很大程度上,是為各種分布式計(jì)算需求所服務(wù)的。我們說分布式文件系統(tǒng)就是加了分布式的文件系統(tǒng),類似的定義推廣到分布式計(jì)算上,我們可以將其視為增加了分布式支持的計(jì)算函數(shù)。從計(jì)算的角度上看,Map/Reduce框架接受各種格式的鍵值對(duì)文件作為輸入,讀取計(jì)算后,最終生成自定義格式的輸出文件。而從分布式的角度上看,分布式計(jì)算的輸入文件往往規(guī)模巨大,且分布在多個(gè)機(jī)器上,單機(jī)計(jì)算完全不可支撐且效率低下,因此Map/Reduce框架需要提供一套機(jī)制,將此計(jì)算擴(kuò)展到無限規(guī)模的機(jī)器集群上進(jìn)行。依照這樣的定義,我們對(duì)整個(gè)Map/Reduce的理解,也可以分別沿著這兩個(gè)流程去看。。。
在Map/Reduce框架中,每一次計(jì)算請(qǐng)求,被稱為作業(yè)。在分布式計(jì)算Map/Reduce框架中,為了完成這個(gè)作業(yè),它進(jìn)行兩步走的戰(zhàn)略,首先是將其拆分成若干個(gè)Map任務(wù),分配到不同的機(jī)器上去執(zhí)行,每一個(gè)Map任務(wù)拿輸入文件的一部分作為自己的輸入,經(jīng)過一些計(jì)算,生成某種格式的中間文件,這種格式,與最終所需的文件格式完全一致,但是僅僅包含一部分?jǐn)?shù)據(jù)。因此,等到所有Map任務(wù)完成后,它會(huì)進(jìn)入下一個(gè)步驟,用以合并這些中間文件獲得最后的輸出文件。此時(shí),系統(tǒng)會(huì)生成若干個(gè)Reduce任務(wù),同樣也是分配到不同的機(jī)器去執(zhí)行,它的目標(biāo),就是將若干個(gè)Map任務(wù)生成的中間文件為匯總到最后的輸出文件中去。當(dāng)然,這個(gè)匯總不總會(huì)像1+ 1 =2那么直接了當(dāng),這也就是Reduce任務(wù)的價(jià)值所在。經(jīng)過如上步驟,最終,作業(yè)完成,所需的目標(biāo)文件生成。整個(gè)算法的關(guān)鍵,就在于增加了一個(gè)中間文件生成的流程,大大提高了靈活性,使其分布式擴(kuò)展性得到了保證。。。
I. 術(shù)語對(duì)照
和分布式文件系統(tǒng)一樣,Google、Hadoop和....我,各執(zhí)一種方式表述統(tǒng)一概念,為了保證其統(tǒng)一性,特有下表。。。
文中翻譯 | Hadoop術(shù)語 | Google術(shù)語 | 相關(guān)解釋 |
作業(yè) | Job | Job | 用戶的每一個(gè)計(jì)算請(qǐng)求,就稱為一個(gè)作業(yè)。 |
作業(yè)服務(wù)器 | JobTracker | Master | 用戶提交作業(yè)的服務(wù)器,同時(shí),它還負(fù)責(zé)各個(gè)作業(yè)任務(wù)的分配,管理所有的任務(wù)服務(wù)器。 |
任務(wù)服務(wù)器 | TaskTracker | Worker | 任勞任怨的工蜂,負(fù)責(zé)執(zhí)行具體的任務(wù)。 |
任務(wù) | Task | Task | 每一個(gè)作業(yè),都需要拆分開了,交由多個(gè)服務(wù)器來完成,拆分出來的執(zhí)行單位,就稱為任務(wù)。 |
備份任務(wù) | Speculative Task | Buckup Task | 每一個(gè)任務(wù),都有可能執(zhí)行失敗或者緩慢,為了降低為此付出的代價(jià),系統(tǒng)會(huì)未雨綢繆的實(shí)現(xiàn)在另外的任務(wù)服務(wù)器上執(zhí)行同樣一個(gè)任務(wù),這就是備份任務(wù)。 |
II. 基本架構(gòu)
與分布式文件系統(tǒng)類似,Map/Reduce的集群,也由三類服務(wù)器構(gòu)成。其中作業(yè)服務(wù)器,在Hadoop中稱為JobTracker,在Google論文中稱為Master。前者告訴我們,作業(yè)服務(wù)器是負(fù)責(zé)管理運(yùn)行在此框架下所有作業(yè)的,后者告訴我們,它也是為各個(gè)作業(yè)分配任務(wù)的核心。與HDFS的主控服務(wù)器類似,它也是作為單點(diǎn)存在的,簡(jiǎn)化了負(fù)責(zé)的同步流程。具體的負(fù)責(zé)執(zhí)行用戶定義操作的,是任務(wù)服務(wù)器,每一個(gè)作業(yè)被拆分成很多的任務(wù),包括Map任務(wù)和Reduce任務(wù)等,任務(wù)是具體執(zhí)行的基本單元,它們都需要分配到合適任務(wù)服務(wù)器上去執(zhí)行,任務(wù)服務(wù)器一邊執(zhí)行一邊向作業(yè)服務(wù)器匯報(bào)各個(gè)任務(wù)的狀態(tài),以此來幫助作業(yè)服務(wù)器了解作業(yè)執(zhí)行的整體情況,分配新的任務(wù)等等。。。
除了作業(yè)的管理者執(zhí)行者,還需要有一個(gè)任務(wù)的提交者,這就是客戶端。與分布式文件系統(tǒng)一樣,客戶端也不是一個(gè)單獨(dú)的進(jìn)程,而是一組API,用戶需要自定義好自己需要的內(nèi)容,經(jīng)由客戶端相關(guān)的代碼,將作業(yè)及其相關(guān)內(nèi)容和配置,提交到作業(yè)服務(wù)器去,并時(shí)刻監(jiān)控執(zhí)行的狀況。。。
同作為Hadoop的實(shí)現(xiàn),與HDFS的通信機(jī)制相同,HadoopMap/Reduce也是用了協(xié)議接口來進(jìn)行服務(wù)器間的交流。實(shí)現(xiàn)者作為RPC服務(wù)器,調(diào)用者經(jīng)由RPC的代理進(jìn)行調(diào)用,如此,完成大部分的通信,具體服務(wù)器的架構(gòu),和其中運(yùn)行的各個(gè)協(xié)議狀況,參見下圖。從圖中可以看到,與HDFS相比,相關(guān)的協(xié)議少了幾個(gè),客戶端與任務(wù)服務(wù)器,任務(wù)服務(wù)器之間,都不再有直接通信關(guān)系。這并不意味著客戶端就不需要了解具體任務(wù)的執(zhí)行狀況,也不意味著,任務(wù)服務(wù)器之間不需要了解別家任務(wù)執(zhí)行的情形,只不過,由于整個(gè)集群各機(jī)器的聯(lián)系比HDFS復(fù)雜的多,直接通信過于的難以維系,所以,都統(tǒng)一由作業(yè)服務(wù)器整理轉(zhuǎn)發(fā)。另外,從這幅圖可以看到,任務(wù)服務(wù)器不是一個(gè)人在戰(zhàn)斗,它會(huì)像孫悟空一樣招出一群寶寶幫助其具體執(zhí)行任務(wù)。這樣做的好處,個(gè)人覺得,應(yīng)該有安全性方面的考慮,畢竟,任務(wù)的代碼是用戶提交的,數(shù)據(jù)也是用戶指定的,這質(zhì)量自然良莠不齊,萬一碰上個(gè)搞破壞的,把整個(gè)任務(wù)服務(wù)器進(jìn)程搞死了,就因小失大了。因此,放在單獨(dú)的地盤進(jìn)行,愛咋咋地,也算是權(quán)責(zé)明確了。。。
與分布式文件系統(tǒng)相比,Map/Reduce框架的還有一個(gè)特點(diǎn),就是可定制性強(qiáng)。文件系統(tǒng)中很多的算法,都是很固定和直觀的,不會(huì)由于所存儲(chǔ)的內(nèi)容不同而有太多的變化。而作為通用的計(jì)算框架,需要面對(duì)的問題則要復(fù)雜很多,在各種不同的問題、不同的輸入、不同的需求之間,很難有一種包治百病的藥能夠一招鮮吃遍天。作為Map/Reduce框架而言,一方面要盡可能的抽取出公共的一些需求,實(shí)現(xiàn)出來。更重要的,是需要提供良好的可擴(kuò)展機(jī)制,滿足用戶自定義各種算法的需求。Hadoop是由Java來實(shí)現(xiàn)的,因此通過反射來實(shí)現(xiàn)自定義的擴(kuò)展,顯得比較小菜一碟了。在JobConf類中,定義了大量的接口,這基本上是HadoopMap/Reduce框架所有可定制內(nèi)容的一次集中展示。在JobConf中,有大量set接口接受一個(gè)Class<? extendsxxx>的參數(shù),通常它都有一個(gè)默認(rèn)實(shí)現(xiàn)的類,用戶如果不滿意,則可自定義實(shí)現(xiàn)。。。
III. 計(jì)算流程
如果一切都按部就班的進(jìn)行,那么整個(gè)作業(yè)的計(jì)算流程,應(yīng)該是作業(yè)的提交 -> Map任務(wù)的分配和執(zhí)行 -> Reduce任務(wù)的分配和執(zhí)行-> 作業(yè)的完成。而在每個(gè)任務(wù)的執(zhí)行中,又包含輸入的準(zhǔn)備 -> 算法的執(zhí)行 ->輸出的生成,三個(gè)子步驟。沿著這個(gè)流程,我們可以很快的整理清晰整個(gè)Map/Reduce框架下作業(yè)的執(zhí)行。。。
1、作業(yè)的提交
一個(gè)作業(yè),在提交之前,需要把所有應(yīng)該配置的東西都配置好,因?yàn)橐坏┨峤坏搅俗鳂I(yè)服務(wù)器上,就陷入了完全自動(dòng)化的流程,用戶除了觀望,最多也就能起一個(gè)監(jiān)督作用,懲治一些不好好工作的任務(wù)。。。
基本上,用戶在提交代碼階段,需要做的工作主要是這樣的:
首先,書寫好所有自定的代碼,最起碼,需要有Map和Reduce的執(zhí)行代碼。在Hadoop中,Map需要派生自Mapper<K1, V1,K2, V2>接口,Reduce需要派生自Reducer<K2, V2, K3,V3>接口。這里都是用的泛型,用以支持不同的鍵值類型。這兩個(gè)接口都僅有一個(gè)方法,一個(gè)是map,一個(gè)是reduce,這兩個(gè)方法都直接受四個(gè)參數(shù),前兩個(gè)是輸入的鍵和值相關(guān)的數(shù)據(jù)結(jié)構(gòu),第三個(gè)是作為輸出相關(guān)的數(shù)據(jù)結(jié)構(gòu),最后一個(gè),是一個(gè)Reporter類的實(shí)例,實(shí)現(xiàn)的時(shí)候可以利用它來統(tǒng)計(jì)一些計(jì)數(shù)。除了這兩個(gè)接口,還有大量可以派生的接口,比如分割的Partitioner<K2,V2>接口。。。
然后,需要書寫好主函數(shù)的代碼,其中最主要的內(nèi)容就是實(shí)例化一個(gè)JobConf類的對(duì)象,然后調(diào)用其豐富的setXXX接口,設(shè)定好所需的內(nèi)容,包括輸入輸出的文件路徑,Map和Reduce的類,甚至包括讀取寫入文件所需的格式支持類,等等。。。
最后,調(diào)用JobClient的runJob方法,提交此JobConf對(duì)象。runJob方法會(huì)先行調(diào)用到JobSubmissionProtocol接口所定義的submitJob方法,將此作業(yè),提交給作業(yè)服務(wù)器。接著,runJob開始循環(huán),不停的調(diào)用JobSubmissionProtocol的getTaskCompletionEvents方法,獲得TaskCompletionEvent類的對(duì)象實(shí)例,了解此作業(yè)各任務(wù)的執(zhí)行狀況。。。
2、Map任務(wù)的分配
當(dāng)一個(gè)作業(yè)提交到了作業(yè)服務(wù)器上,作業(yè)服務(wù)器會(huì)生成若干個(gè)Map任務(wù),每一個(gè)Map任務(wù),負(fù)責(zé)將一部分的輸入轉(zhuǎn)換成格式與最終格式相同的中間文件。通常一個(gè)作業(yè)的輸入都是基于分布式文件系統(tǒng)的文件(當(dāng)然在單機(jī)環(huán)境下,文件系統(tǒng)單機(jī)的也可以...),因?yàn)椋梢院芴烊坏暮头植际降挠?jì)算產(chǎn)生聯(lián)系。而對(duì)于一個(gè)Map任務(wù)而言,它的輸入往往是輸入文件的一個(gè)數(shù)據(jù)塊,或者是數(shù)據(jù)塊的一部分,但通常,不跨數(shù)據(jù)塊。因?yàn)?,一旦跨了?shù)據(jù)塊,就可能涉及到多個(gè)服務(wù)器,帶來了不必要的復(fù)雜性。。。
當(dāng)一個(gè)作業(yè),從客戶端提交到了作業(yè)服務(wù)器上,作業(yè)服務(wù)器會(huì)生成一個(gè)JobInProgress對(duì)象,作為與之對(duì)應(yīng)的標(biāo)識(shí),用于管理。作業(yè)被拆分成若干個(gè)Map任務(wù)后,會(huì)預(yù)先掛在作業(yè)服務(wù)器上的任務(wù)服務(wù)器拓?fù)錁洹_@是依照分布式文件數(shù)據(jù)塊的位置來劃分的,比如一個(gè)Map任務(wù)需要用某個(gè)數(shù)據(jù)塊,這個(gè)數(shù)據(jù)塊有三份備份,那么,在這三臺(tái)服務(wù)器上都會(huì)掛上此任務(wù),可以視為是一個(gè)預(yù)分配。。。
關(guān)于任務(wù)管理和分配的大部分的真實(shí)功能和邏輯的實(shí)現(xiàn),JobInProgress則依托JobInProgressListener和TaskScheduler的子類。TaskScheduler,顧名思義是用于任務(wù)分配的策略類(為了簡(jiǎn)化描述,用它代指所有TaskScheduler的子類...)。它會(huì)掌握好所有作業(yè)的任務(wù)信息,其assignTasks函數(shù),接受一個(gè)TaskTrackerStatus作為參數(shù),依照此任務(wù)服務(wù)器的狀態(tài)和現(xiàn)有的任務(wù)狀況,為其分配新的任務(wù)。而為了掌握所有作業(yè)相關(guān)任務(wù)的狀況,TaskScheduler會(huì)將若干個(gè)JobInProgressListener注冊(cè)到JobTracker中去,當(dāng)有新的作業(yè)到達(dá)、移除或更新的時(shí)候,JobTracker會(huì)告知給所有的JobInProgressListener,以便它們做出相應(yīng)的處理。。。
任務(wù)分配是一個(gè)重要的環(huán)節(jié),所謂任務(wù)分配,就是將合適作業(yè)的合適任務(wù)分配到合適的服務(wù)器上。不難看出,里面蘊(yùn)含了兩個(gè)步驟,先是選擇作業(yè),然后是在此作業(yè)中選擇任務(wù)。和所有分配工作一樣,任務(wù)分配也是一個(gè)復(fù)雜的活。不良好的任務(wù)分配,可能會(huì)導(dǎo)致網(wǎng)絡(luò)流量增加、某些任務(wù)服務(wù)器負(fù)載過重效率下降,等等。不僅如此,任務(wù)分配還是一個(gè)無一致模式的問題,不同的業(yè)務(wù)背景,可能需要不同的算法才能滿足需求。因此,在Hadoop中,有很多TaskScheduler的子類,像Facebook,Yahoo,都為其貢獻(xiàn)出了自家用的算法。在Hadoop中,默認(rèn)的任務(wù)分配器,是JobQueueTaskScheduler類。它選擇作業(yè)的基本次序是:MapClean Up Task(Map任務(wù)服務(wù)器的清理任務(wù),用于清理相關(guān)的過期的文件和環(huán)境...) -> Map SetupTask(Map任務(wù)服務(wù)器的安裝任務(wù),負(fù)責(zé)配置好相關(guān)的環(huán)境...) -> Map Tasks -> Reduce Clean Up Task-> Reduce Setup Task -> ReduceTasks。在這個(gè)前提下,具體到Map任務(wù)的分配上來。當(dāng)一個(gè)任務(wù)服務(wù)器工作的游刃有余,期待獲得新的任務(wù)的時(shí)候,JobQueueTaskScheduler會(huì)按照各個(gè)作業(yè)的優(yōu)先級(jí),從最高優(yōu)先級(jí)的作業(yè)開始分配。每分配一個(gè),還會(huì)為其留出余量,已被不時(shí)之需。舉一個(gè)例子:系統(tǒng)目前有優(yōu)先級(jí)3、2、1的三個(gè)作業(yè),每個(gè)作業(yè)都有一個(gè)可分配的Map任務(wù),一個(gè)任務(wù)服務(wù)器來申請(qǐng)新的任務(wù),它還有能力承載3個(gè)任務(wù)的執(zhí)行,JobQueueTaskScheduler會(huì)先從優(yōu)先級(jí)3的作業(yè)上取一個(gè)任務(wù)分配給它,然后再留出一個(gè)1任務(wù)的余量。此時(shí),系統(tǒng)只能在將優(yōu)先級(jí)2作業(yè)的任務(wù)分配給此服務(wù)器,而不能分配優(yōu)先級(jí)1的任務(wù)。這樣的策略,基本思路就是一切為高優(yōu)先級(jí)的作業(yè)服務(wù),優(yōu)先分配不說,分配了好保留有余力以備不時(shí)之需,如此優(yōu)待,足以讓高優(yōu)先級(jí)的作業(yè)喜極而泣,讓低優(yōu)先級(jí)的作業(yè)感慨既生瑜何生亮,甚至是活活餓死。。。
確定了從哪個(gè)作業(yè)提取任務(wù)后,具體的分配算法,經(jīng)過一系列的調(diào)用,最后實(shí)際是由JobInProgress的findNewMapTask函數(shù)完成的。它的算法很簡(jiǎn)單,就是盡全力為此服務(wù)器非配且盡可能好的分配任務(wù),也就是說,只要還有可分配的任務(wù),就一定會(huì)分給它,而不考慮后來者。作業(yè)服務(wù)器會(huì)從離它最近的服務(wù)器開始,看上面是否還掛著未分配的任務(wù)(預(yù)分配上的),從近到遠(yuǎn),如果所有的任務(wù)都分配了,那么看有沒有開啟多次執(zhí)行,如果開啟,考慮把未完成的任務(wù)再分配一次(后面有地方詳述...)。。。
對(duì)于作業(yè)服務(wù)器來說,把一個(gè)任務(wù)分配出去了,并不意味著它就徹底解放,可以對(duì)此任務(wù)可以不管不顧了。因?yàn)槿蝿?wù)可以在任務(wù)服務(wù)器上執(zhí)行失敗,可能執(zhí)行緩慢,這都需要作業(yè)服務(wù)器幫助它們?cè)賮硪淮?。因此在Task中,記錄有一個(gè)TaskAttemptID,對(duì)于任務(wù)服務(wù)器而言,它們每次跑的,其實(shí)都只是一個(gè)Attempt而已,Reduce任務(wù)只需要采信一個(gè)的輸出,其他都算白忙乎了。。。
3、Map任務(wù)的執(zhí)行
與HDFS類似,任務(wù)服務(wù)器是通過心跳消息,向作業(yè)服務(wù)器匯報(bào)此時(shí)此刻其上各個(gè)任務(wù)執(zhí)行的狀況,并向作業(yè)服務(wù)器申請(qǐng)新的任務(wù)的。具體實(shí)現(xiàn),是TaskTracker調(diào)用InterTrackerProtocol協(xié)議的heartbeat方法來做的。這個(gè)方法接受一個(gè)TaskTrackerStatus對(duì)象作為參數(shù),它描述了此時(shí)此任務(wù)服務(wù)器的狀態(tài)。當(dāng)其有余力接受新的任務(wù)的時(shí)候,它還會(huì)傳入acceptNewTasks為true的參數(shù),表示希望作業(yè)服務(wù)器委以重任。JobTracker接收到相關(guān)的參數(shù)后,經(jīng)過處理,會(huì)返回一個(gè)HeartbeatResponse對(duì)象。這個(gè)對(duì)象中,定義了一組TaskTrackerAction,用于指導(dǎo)任務(wù)服務(wù)器進(jìn)行下一步的工作。系統(tǒng)中已定義的了一堆其TaskTrackerAction的子類,有的對(duì)攜帶的參數(shù)進(jìn)行了擴(kuò)充,有的只是標(biāo)明了下ID,具體不詳寫了,一看便知。。。
當(dāng)TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它會(huì)開始執(zhí)行所分配的新的任務(wù)。在TaskTracker中,有一個(gè)TaskTracker.TaskLauncher線程(確切的說是兩個(gè),一個(gè)等Map任務(wù),一個(gè)等Reduce任務(wù)),它們?cè)诎V癡的守候著新任務(wù)的來到。一旦等到了,會(huì)最終調(diào)用到Task的createRunner方法,構(gòu)造出一個(gè)TaskRunner對(duì)象,新建一個(gè)線程來執(zhí)行。對(duì)于一個(gè)Map任務(wù),它對(duì)應(yīng)的Runner是TaskRunner的子類MapTaskRunner,不過,核心部分都在TaskRunner的實(shí)現(xiàn)內(nèi)。TaskRunner會(huì)先將所需的文件全部下載并拆包好,并記錄到一個(gè)全局緩存中,這是一個(gè)全局的目錄,可以供所有此作業(yè)的所有任務(wù)使用。它會(huì)用一些軟鏈接,將一些文件名鏈接到這個(gè)緩存中來。然后,根據(jù)不同的參數(shù),配置出一個(gè)JVM執(zhí)行的環(huán)境,這個(gè)環(huán)境與JvmEnv類的對(duì)象對(duì)應(yīng)。
接著,TaskRunner會(huì)調(diào)用JvmManager的launchJvm方法,提交給JvmManager處理。JvmManager用于管理該TaskTracker上所有運(yùn)行的Task子進(jìn)程。在目前的實(shí)現(xiàn)中,嘗試的是池化的方式。有若干個(gè)固定的槽,如果槽沒有滿,那么就啟動(dòng)新的子進(jìn)程,否則,就尋找idle的進(jìn)程,如果是同Job的直接放進(jìn)去,否則殺死這個(gè)進(jìn)程,用一個(gè)新的進(jìn)程代替。每一個(gè)進(jìn)程都是由JvmRunner來管理的,它也是位于單獨(dú)線程中的。但是從實(shí)現(xiàn)上看,這個(gè)機(jī)制好像沒有部署開,子進(jìn)程是死循環(huán)等待,而不會(huì)阻塞在父進(jìn)程的相關(guān)線程上,父線程的變量一直都沒有個(gè)調(diào)整,一旦分配,始終都處在繁忙的狀況了。
真實(shí)的執(zhí)行載體,是Child,它包含一個(gè)main函數(shù),進(jìn)程執(zhí)行,會(huì)將相關(guān)參數(shù)傳進(jìn)來,它會(huì)拆解這些參數(shù),并且構(gòu)造出相關(guān)的Task實(shí)例,調(diào)用其run函數(shù)進(jìn)行執(zhí)行。每一個(gè)子進(jìn)程,可以執(zhí)行指定個(gè)數(shù)量的Task,這就是上面所說的池化的配置。但是,這套機(jī)制在我看來,并沒有運(yùn)行起來,每個(gè)進(jìn)程其實(shí)都沒有機(jī)會(huì)不死而執(zhí)行新的任務(wù),只是傻傻的等待進(jìn)程池滿,而被一刀斃命。也許是我老眼昏花,沒看出其中實(shí)現(xiàn)的端倪。。。
4、Reduce任務(wù)的分配與執(zhí)行
比之Map任務(wù),Reduce的分配及其簡(jiǎn)單,基本上是所有Map任務(wù)完成了,有空閑的任務(wù)服務(wù)器,來了就給分配一個(gè)Job任務(wù)。因?yàn)镸ap任務(wù)的結(jié)果星羅棋布,且變化多端,真要搞一個(gè)全局優(yōu)化的算法,絕對(duì)是得不償失。而Reduce任務(wù)的執(zhí)行進(jìn)程的構(gòu)造和分配流程,與Map基本完全的一致,沒有啥可說的了。。。
但其實(shí),Reduce任務(wù)與Map任務(wù)的最大不同,是Map任務(wù)的文件都在本地隔著,而Reduce任務(wù)需要到處采集。這個(gè)流程是作業(yè)服務(wù)器經(jīng)由此Reduce任務(wù)所處的任務(wù)服務(wù)器,告訴Reduce任務(wù)正在執(zhí)行的進(jìn)程,它需要的Map任務(wù)執(zhí)行過的服務(wù)器地址,此Reduce任務(wù)服務(wù)器會(huì)于原Map任務(wù)服務(wù)器聯(lián)系(當(dāng)然本地就免了...),通過FTP服務(wù),下載過來。這個(gè)隱含的直接數(shù)據(jù)聯(lián)系,就是執(zhí)行Reduce任務(wù)與執(zhí)行Map任務(wù)最大的不同了。。。
5、作業(yè)的完成
當(dāng)所有Reduce任務(wù)都完成了,所需數(shù)據(jù)都寫到了分布式文件系統(tǒng)上,整個(gè)作業(yè)才正式完成了。此中,涉及到很多的類,很多的文件,很多的服務(wù)器,所以說起來很費(fèi)勁,話說,一圖解千語,說了那么多,我還是畫兩幅圖,徹底表達(dá)一下吧。。。
首先,是一個(gè)時(shí)序圖。它模擬了一個(gè)由3個(gè)Map任務(wù)和1個(gè)Reduce任務(wù)構(gòu)成的作業(yè)執(zhí)行流程。我們可以看到,在執(zhí)行的過程中,只要有人太慢,或者失敗,就會(huì)增加一次嘗試,以此換取最快的執(zhí)行總時(shí)間。一旦所有Map任務(wù)完成,Reduce開始運(yùn)作(其實(shí),不一定要這樣的...),對(duì)于每一個(gè)Map任務(wù)來說,只有執(zhí)行到Reduce任務(wù)把它上面的數(shù)據(jù)下載完成,才算成功,否則,都是失敗,需要重新進(jìn)行嘗試。。。
而第二副圖,不是我畫的,就不轉(zhuǎn)載了,參見
這里,它描述了整個(gè)Map/Reduce的服務(wù)器狀況圖,包括整體流程、所處服務(wù)器進(jìn)程、輸入輸出等,看清楚這幅圖,對(duì)Map/Reduce的基本流程應(yīng)該能完全跑通了。有這幾點(diǎn),可能圖中描述的不夠清晰需要提及一下,一個(gè)是在HDFS中,其實(shí)還有日志文件,圖中沒有標(biāo)明;另一個(gè)是步驟5,其實(shí)是由TaskTracker主動(dòng)去拉取而不是JobTracker推送過來的;還有步驟8和步驟11,創(chuàng)建出來的MapTask和ReduceTask,在Hadoop中都是運(yùn)行在獨(dú)立的進(jìn)程上的。。。
IV. Map任務(wù)詳請(qǐng)
從上面,可以了解到整個(gè)Map和Reduce任務(wù)的整體流程,而后面要啰嗦的,是具體執(zhí)行中的細(xì)節(jié)。Map任務(wù)的輸入,是分布式文件系統(tǒng)上的,包含鍵值對(duì)信息的文件。為了給每一個(gè)Map任務(wù)指定輸入,我們需要掌握文件格式把它分切成塊,并從每一塊中分離出鍵值信息。在HDFS中,輸入的文件格式,是由InputFormat<K,V>類來表示的,在JobConf中,它的默認(rèn)值是TextInputFormat類(見getInputFormat),此類是特化的FileInputFormat<LongWritable,Text>子類,而FileInputFormat<K, V>正是InputFormat<K,V>的子類。通過這樣的關(guān)系我們可以很容易的理解,默認(rèn)的文件格式是文本文件,且鍵是LongWritable類型(整形數(shù)),值是Text類型(字符串)。僅僅知道文件類型是不夠的,我們還需要將文件中的每一條數(shù)據(jù),分離成鍵值對(duì),這個(gè)工作,是RecordReader<K,V>來做的。在TextInputFormat的getRecordReader方法中我們可以看到,與TextInputFormat默認(rèn)配套使用的,是LineRecordReader類,是特化的RecordReader<LongWritable,Text>的子類,它將每一行作為一個(gè)記錄,起始的位置作為鍵,整行的字符串作為值。有了格式,分出了鍵值,還需要切開分給每一個(gè)Map任務(wù)。每一個(gè)Map任務(wù)的輸入用InputSplit接口表示,對(duì)于一個(gè)文件輸入而言,其實(shí)現(xiàn)是FileSplit,它包含著文件名、起始位置、長(zhǎng)度和存儲(chǔ)它的一組服務(wù)器地址。。。
當(dāng)Map任務(wù)拿到所屬的InputSplit后,就開始一條條讀取記錄,并調(diào)用用于定義的Mapper,進(jìn)行計(jì)算(參見MapRunner<K1,V1, K2, V2>和MapTask的run方法),然后,輸出。MapTask會(huì)傳遞給Mapper一個(gè)OutputCollector<K,V>對(duì)象,作為輸出的數(shù)據(jù)結(jié)構(gòu)。它定義了一個(gè)collect的函數(shù),接受一個(gè)鍵值對(duì)。在MapTask中,定義了兩個(gè)OutputCollector的子類,一個(gè)是MapTask.DirectMapOutputCollector<K,V>,人如其名,它的實(shí)現(xiàn)確實(shí)很Direct,直截了當(dāng)。它會(huì)利用一個(gè)RecordWriter<K,V>對(duì)象,collect一調(diào)用,就直接調(diào)用RecordWriter<K,V>的write方法,寫入本地的文件中去。如果覺著RecordWriter<K,V>出現(xiàn)的很突兀,那么看看上一段提到的RecordReader<K,V>,基本上,數(shù)據(jù)結(jié)構(gòu)都是對(duì)應(yīng)著的,一個(gè)是輸入一個(gè)是輸出。輸出很對(duì)稱也是由RecordWriter<K,V>和OutputFormat<K, V>來協(xié)同完成的,其默認(rèn)實(shí)現(xiàn)是LineRecordWriter<K,V>和TextOutputFormat<K, V>,多么的眼熟啊。。。
除了這個(gè)非常直接的實(shí)現(xiàn)之外,MapTask中還有一個(gè)復(fù)雜的多的實(shí)現(xiàn),是MapTask.MapOutputBuffer<K extendsObject, V extendsObject>。有道是簡(jiǎn)單壓倒一切,那為什么有很簡(jiǎn)單的實(shí)現(xiàn),要琢磨一個(gè)復(fù)雜的呢。原因在于,看上去很美的往往帶著刺,簡(jiǎn)單的輸出實(shí)現(xiàn),每調(diào)用一次collect就寫一次文件,頻繁的硬盤操作很有可能導(dǎo)致此方案的低效。為了解決這個(gè)問題,這就有了這個(gè)復(fù)雜版本,它先開好一段內(nèi)存做緩存,然后制定一個(gè)比例做閾值,開一個(gè)線程監(jiān)控此緩存。collect來的內(nèi)容,先寫到緩存中,當(dāng)監(jiān)控線程發(fā)現(xiàn)緩存的內(nèi)容比例超過閾值,掛起所有寫入操作,建一個(gè)新的文件,把緩存的內(nèi)容批量刷到此文件中去,清空緩存,重新開放,接受繼續(xù)collect。。。
為什么說是刷到文件中去呢。因?yàn)檫@不是一個(gè)簡(jiǎn)單的照本宣科簡(jiǎn)單復(fù)制的過程,在寫入之前,會(huì)先將緩存中的內(nèi)存,經(jīng)過排序、合并器(Combiner)統(tǒng)計(jì)之后,才會(huì)寫入。如果你覺得Combiner這個(gè)名詞聽著太陌生,那么考慮一下Reducer,Combiner也就是一個(gè)Reducer類,通過JobConf的setCombinerClass進(jìn)行設(shè)置,在常用的配置中,Combiner往往就是用用戶為Reduce任務(wù)定義的那個(gè)Reducer子類。只不過,Combiner只是服務(wù)的范圍更小一些而已,它在Map任務(wù)執(zhí)行的服務(wù)器本地,依照Map處理過的那一小部分?jǐn)?shù)據(jù),先做一次Reduce操作,這樣,可以壓縮需要傳輸內(nèi)容的大小,提高速度。每一次刷緩存,都會(huì)開一個(gè)新的文件,等此任務(wù)所有的輸入都處理完成后,就有了若干個(gè)有序的、經(jīng)過合并的輸出文件。系統(tǒng)會(huì)將這些文件搞在一起,再做一個(gè)多路的歸并外排,同時(shí)使用合并器進(jìn)行合并,最終,得到了唯一的、有序的、經(jīng)過合并的中間文件(注:文件數(shù)量等同于分類數(shù)量,在不考慮分類的時(shí)候,簡(jiǎn)單的視為一個(gè)...)。它,就是Reduce任務(wù)夢(mèng)寐以求的輸入文件。。。
除了做合并,復(fù)雜版本的OutputCollector,還具有分類的功能。分類,是通過Partitioner<K2,V2>來定義的,默認(rèn)實(shí)現(xiàn)是HashPartitioner<K2,V2>,作業(yè)提交者可以通過JobConf的setPartitionerClass來自定義。分類的含義是什么呢,簡(jiǎn)單的說,就是將Map任務(wù)的輸出,劃分到若干個(gè)文件中(通常與Reduce任務(wù)數(shù)目相等),使得每一個(gè)Reduce任務(wù),可以處理某一類文件。這樣的好處是大大的,舉一個(gè)例子說明一下。比如有一個(gè)作業(yè)是進(jìn)行單詞統(tǒng)計(jì)的,其Map任務(wù)的中間結(jié)果應(yīng)該是以單詞為鍵,以單詞數(shù)量為值的文件。如果這時(shí)候只有一個(gè)Reduce任務(wù),那還好說,從全部的Map任務(wù)那里收集文件過來,分別統(tǒng)計(jì)得到最后的輸出文件就好。但是,如果單Reduce任務(wù)無法承載此負(fù)載或效率太低,就需要多個(gè)Reduce任務(wù)并行執(zhí)行。此時(shí),再沿用之前的模式就有了問題。每個(gè)Reduce任務(wù)從一部分Map任務(wù)那里獲得輸入文件,但最終的輸出結(jié)果并不正確,因?yàn)橥粋€(gè)單詞可能在不同的Reduce任務(wù)那里都有統(tǒng)計(jì),需要想方法把它們統(tǒng)計(jì)在一起才能獲得最后結(jié)果,這樣就沒有將Map/Reduce的作用完全發(fā)揮出來。這時(shí)候,就需要用到分類。如果此時(shí)有兩個(gè)Reduce任務(wù),那么將輸出分成兩類,一類存放字母表排序較高的單詞,一類存放字母表排序低的單詞,每一個(gè)Reduce任務(wù)從所有的Map任務(wù)那里獲取一類的中間文件,得到自己的輸出結(jié)果。最終的結(jié)果,只需要把各個(gè)Reduce任務(wù)輸出的,拼接在一起就可以了。本質(zhì)上,這就是將Reduce任務(wù)的輸入,由垂直分割,變成了水平分割。Partitioner的作用,正是接受一個(gè)鍵值,返回一個(gè)分類的序號(hào)。它會(huì)在從緩存刷到文件之前做這個(gè)工作,其實(shí)只是多了一個(gè)文件名的選擇而已,別的邏輯都不需要變化。。。
除了緩存、合并、分類等附加工作之外,復(fù)雜版本的OutputCollector還支持錯(cuò)誤數(shù)據(jù)的跳過功能,在后面分布式將排錯(cuò)的時(shí)候,還會(huì)提及,標(biāo)記一下,按下不表。。。
V. Reduce任務(wù)詳情
理論上看,Reduce任務(wù)的整個(gè)執(zhí)行流程要比Map任務(wù)更為的羅嗦一些,因?yàn)?,它需要收集輸入文件,然后才能進(jìn)行處理。Reduce任務(wù),主要有這么三個(gè)步驟:Copy、Sort、Reduce(參見ReduceTask的run方法)。所謂Copy,就是從執(zhí)行各個(gè)Map任務(wù)的服務(wù)器那里,收羅到本地來??截惖娜蝿?wù),是由ReduceTask.ReduceCopier類來負(fù)責(zé),它有一個(gè)內(nèi)嵌類,叫MapOutputCopier,它會(huì)在一個(gè)單獨(dú)的線程內(nèi),負(fù)責(zé)某個(gè)Map任務(wù)服務(wù)器上文件的拷貝工作。遠(yuǎn)程拷貝過來的內(nèi)容(當(dāng)然也可以是本地了...),作為MapOutput對(duì)象存在,它可以在內(nèi)存中也可以序列化在磁盤上,這個(gè)根據(jù)內(nèi)存使用狀況來自動(dòng)調(diào)節(jié)。整個(gè)拷貝過程是一個(gè)動(dòng)態(tài)的過程,也就是說它不是一次給好所有輸入信息就不再變化了。它會(huì)不停的調(diào)用TaskUmbilicalProtocol協(xié)議的getMapCompletionEvents方法,向其父TaskTracker詢問此作業(yè)個(gè)Map任務(wù)的完成狀況(TaskTracker要向JobTracker詢問后再轉(zhuǎn)告給它...)。當(dāng)獲取到相關(guān)Map任務(wù)執(zhí)行服務(wù)器的信息后,都會(huì)有一個(gè)線程開啟,做具體的拷貝工作。同時(shí),還有一個(gè)內(nèi)存Merger線程和一個(gè)文件Merger線程在同步工作,它們將新鮮下載過來的文件(可能在內(nèi)存中,簡(jiǎn)單的統(tǒng)稱為文件...),做著歸并排序,以此,節(jié)約時(shí)間,降低輸入文件的數(shù)量,為后續(xù)的排序工作減負(fù)。。。
Sort,排序工作,就相當(dāng)于上述排序工作的一個(gè)延續(xù)。它會(huì)在所有的文件都拷貝完畢后進(jìn)行,因?yàn)殡m然同步有做著歸并的工作,但可能留著尾巴,沒做徹底。經(jīng)過這一個(gè)流程,該徹底的都徹底了,一個(gè)嶄新的、合并了所有所需Map任務(wù)輸出文件的新文件,誕生了。而那些千行萬苦從其他各個(gè)服務(wù)器網(wǎng)羅過來的Map任務(wù)輸出文件,很快的結(jié)束了它們的歷史使命,被掃地出門一掃而光,全部刪除了。。。
所謂好戲在后頭,Reduce任務(wù)的最后一個(gè)階段,正是Reduce本身。它也會(huì)準(zhǔn)備一個(gè)OutputCollector收集輸出,與MapTask不同,這個(gè)OutputCollector更為簡(jiǎn)單,僅僅是打開一個(gè)RecordWriter,collect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統(tǒng),基本都是分布式文件系統(tǒng),或者說是HDFS。而在輸入方面,ReduceTask會(huì)從JobConf那里調(diào)用一堆getMapOutputKeyClass、getMapOutputValueClass、getOutputKeyComparator等等之類的自定義類,構(gòu)造出Reducer所需的鍵類型,和值的迭代類型Iterator(一個(gè)鍵到了這里一般是對(duì)應(yīng)一組值)。具體實(shí)現(xiàn)頗為拐彎抹角,建議看一下Merger.MergeQueue,RawKeyValueIterator,ReduceTask.ReduceValuesIterator等等之類的實(shí)現(xiàn)。有了輸入,有了輸出,不斷循環(huán)調(diào)用自定義的Reducer,最終,Reduce階段完成。。。
VI. 分布式支持
1、服務(wù)器正確性保證
HadoopMap/Reduce服務(wù)器狀況和HDFS很類似,由此可知,救死扶傷的方法也是大同小異。廢話不多說了,直接切正題。同作為客戶端,Map/Reduce的客戶端只是將作業(yè)提交,就開始搬個(gè)板凳看戲,沒有占茅坑的行動(dòng)。因此,一旦它掛了,也就掛了,不傷大雅。而任務(wù)服務(wù)器,也需要隨時(shí)與作業(yè)服務(wù)器保持心跳聯(lián)系,一旦有了問題,作業(yè)服務(wù)器可以將其上運(yùn)行的任務(wù),移交給它人完成。作業(yè)服務(wù)器,作為一個(gè)單點(diǎn),非常類似的是利用還原點(diǎn)(等同于HDFS的鏡像)和歷史記錄(等同于HDFS的日志),來進(jìn)行恢復(fù)。其上,需要持久化用于恢復(fù)的內(nèi)容,包含作業(yè)狀況、任務(wù)狀況、各個(gè)任務(wù)嘗試的工作狀況等。有了這些內(nèi)容,再加上任務(wù)服務(wù)器的動(dòng)態(tài)注冊(cè),就算挪了個(gè)窩,還是很容易恢復(fù)的。JobHistory是歷史記錄相關(guān)的一個(gè)靜態(tài)類,本來,它也就是一個(gè)干寫日志活的,只是在Hadoop的實(shí)現(xiàn)中,對(duì)日志的寫入做了面向?qū)ο蟮姆庋b,同時(shí)又大量用到觀察者模式做了些嵌入,使得看起來不是那么直觀。本質(zhì)上,它就是打開若干個(gè)日志文件,利用各類接口來往里面寫內(nèi)容。只不過,這些日志,會(huì)放在分布式文件系統(tǒng)中,就不需要像HDFS那樣,來一個(gè)SecondXXX隨時(shí)候命,由此可見,有巨人在腳下踩著,真好。JobTracker.RecoveryManager類是作業(yè)服務(wù)器中用于進(jìn)行恢復(fù)相關(guān)的事情,當(dāng)作業(yè)服務(wù)器啟動(dòng)的時(shí)候,會(huì)調(diào)用其recover方法,恢復(fù)日志文件中的內(nèi)容。其中步驟,注釋中寫的很清楚,請(qǐng)自行查看。。。
2、任務(wù)執(zhí)行的正確和速度
整個(gè)作業(yè)流程的執(zhí)行,秉承著木桶原理。執(zhí)行的最慢的Map任務(wù)和Reduce任務(wù),決定了系統(tǒng)整體執(zhí)行時(shí)間(當(dāng)然,如果執(zhí)行時(shí)間在整個(gè)流程中占比例很小的話,也許就微不足道了...)。因此,盡量加快最慢的任務(wù)執(zhí)行速度,成為提高整體速度關(guān)鍵。所使用的策略,簡(jiǎn)約而不簡(jiǎn)單,就是一個(gè)任務(wù)多次執(zhí)行。當(dāng)所有未執(zhí)行的任務(wù)都分配出去了,并且先富起來的那部分任務(wù)已經(jīng)完成了,并還有任務(wù)服務(wù)器孜孜不倦的索取任務(wù)的時(shí)候,作業(yè)服務(wù)器會(huì)開始炒剩飯,把那些正在吭哧吭哧在某個(gè)服務(wù)器上慢慢執(zhí)行的任務(wù),再把此任務(wù)分配到一個(gè)新的任務(wù)服務(wù)器上,同時(shí)執(zhí)行。兩個(gè)服務(wù)器各盡其力,成王敗寇,先結(jié)束者的結(jié)果將被采納。這樣的策略,隱含著一個(gè)假設(shè),就是我們相信,輸入文件的分割算法是公平的,某個(gè)任務(wù)執(zhí)行慢,并不是由于這個(gè)任務(wù)本身負(fù)擔(dān)太重,而是由于服務(wù)器不爭(zhēng)氣負(fù)擔(dān)太重能力有限或者是即將撒手西去,給它換個(gè)新環(huán)境,人挪死樹挪活事半功倍。。。
當(dāng)然,肯定有哽咽的任務(wù),不論是在哪個(gè)服務(wù)器上,都無法順利完成。這就說明,此問題不在于服務(wù)器上,而是任務(wù)本身天資有缺憾。缺憾在何處?每個(gè)作業(yè),功能代碼都是一樣的,別的任務(wù)成功了,就是這個(gè)任務(wù)不成功,很顯然,問題出在輸入那里。輸入中有非法的輸入條目,導(dǎo)致程序無法辨識(shí),只能揮淚惜別。說到這里,解決策略也浮出水面了,三十六計(jì)走位上,惹不起,還是躲得起的。在MapTask中的MapTask.SkippingRecordReader<K,V>和ReduceTask里的ReduceTask.SkippingReduceValuesIterator<KEY,VALUE>,都是用于干這個(gè)事情的。它們的原理很簡(jiǎn)單,就是在讀一條記錄前,把當(dāng)前的位置信息,封裝成SortedRanges.Range對(duì)象,經(jīng)由Task的reportNextRecordRange方法提交到TaskTracker上去。TaskTracker會(huì)把這些內(nèi)容,擱在TaskStatus對(duì)象中,隨著心跳消息,匯報(bào)到JobTracker上面。這樣,作業(yè)服務(wù)器就可以隨時(shí)隨刻了解清楚,每個(gè)任務(wù)正讀取在那個(gè)位置,一旦出錯(cuò),再次執(zhí)行的時(shí)候,就在分配的任務(wù)信息里面添加一組SortedRanges信息。MapTask或ReduceTask讀取的時(shí)候,會(huì)看一下這些區(qū)域,如果當(dāng)前區(qū)域正好處于上述雷區(qū),跳過不讀。如此反復(fù),正可謂,道路曲折,前途光明啊。。。
VII. 總結(jié)
對(duì)于Map/Reduce而言,真正的困難,在于提高其適應(yīng)能力,打造一款能夠包治百病的執(zhí)行框架。Hadoop已經(jīng)做得很好了,但只有真正搞清楚了整個(gè)流程,你才能幫助它做的更好。。。