聲明:本文為CSDN原創(chuàng)投稿文章,未經(jīng)許可,禁止任何形式的轉(zhuǎn)載。
作者:周立偉,京東商城高級(jí)工程師,關(guān)注分布式、高并發(fā)和Java中間件的研究。
責(zé)編:錢(qián)曙光,關(guān)注架構(gòu)和算法領(lǐng)域,尋求報(bào)道或者投稿請(qǐng)發(fā)郵件qianshg@csdn.net,另有「CSDN 高級(jí)架構(gòu)師群」,內(nèi)有諸多知名互聯(lián)網(wǎng)公司的大牛架構(gòu)師,歡迎架構(gòu)師加微信qshuguang2008申請(qǐng)入群,備注姓名+公司+職位。
【編者按】 TBSchedule是一款非常優(yōu)秀的高性能分布式調(diào)度框架,本文是作者結(jié)合多年使用TBSchedule的經(jīng)驗(yàn),在研讀三遍源碼的基礎(chǔ)上完成。期間作者也與阿里空玄有過(guò)不少技術(shù)交流,并非常感謝空玄給予的大力支持。另外,作者寫(xiě)這篇文章的目的一是出于對(duì)TBSchedule的一種熱愛(ài),二是現(xiàn)在是一個(gè)資源共享、技術(shù)共享的時(shí)代,希望把它展現(xiàn)給大家(送人玫瑰,手留余香),能給大家的工作帶來(lái)幫助。
時(shí)下互聯(lián)網(wǎng)和電商領(lǐng)域,各個(gè)平臺(tái)都存在大數(shù)據(jù)、高并發(fā)的特點(diǎn),對(duì)數(shù)據(jù)處理的要求越來(lái)越高,既要保證高效性,又要保證安全性、準(zhǔn)確性。TBSchedule的使命就是將調(diào)度作業(yè)從業(yè)務(wù)系統(tǒng)中分離出來(lái),降低或者是消除和業(yè)務(wù)系統(tǒng)的耦合度,進(jìn)行高效異步任務(wù)處理。其實(shí)在互聯(lián)網(wǎng)和電商領(lǐng)域TBSchedule的使用非常廣泛,目前被應(yīng)用于阿里巴巴、淘寶、支付寶、京東、聚美、汽車(chē)之家、國(guó)美等很多互聯(lián)網(wǎng)企業(yè)的流程調(diào)度系統(tǒng)。
在深入了解TBSchedule之前我們先從內(nèi)部和外部形態(tài)對(duì)它有個(gè)初步認(rèn)識(shí),如圖1.1、圖1.2。
從TBSchedule的內(nèi)部形態(tài)來(lái)說(shuō),與他有關(guān)的關(guān)鍵詞包括批量任務(wù)、動(dòng)態(tài)擴(kuò)展、多主機(jī)、多線程、并發(fā)、分片……,這些詞看起來(lái)非常的高大上,都是時(shí)下互聯(lián)網(wǎng)技術(shù)比較流行的詞匯。從TBSchedule的外部架構(gòu)來(lái)看,一目了然,宿主在調(diào)度應(yīng)用中與ZooKeeper進(jìn)行通信。一個(gè)框架結(jié)構(gòu)是否是優(yōu)秀的,從美感的角度就可以看出來(lái),一個(gè)好的架構(gòu)一定是隱藏了內(nèi)部復(fù)雜的原理,外部視覺(jué)上美好的,讓用戶(hù)使用起來(lái)簡(jiǎn)單易懂。
為什么TBSchedule值得推廣呢?
TBSchedule到底有多強(qiáng)大呢?我對(duì)TBSchedule的優(yōu)勢(shì)特點(diǎn)進(jìn)行了如下總結(jié):
TBSchedule支持Cluster,可以宿主在多臺(tái)服務(wù)器多個(gè)線程組并行進(jìn)行任務(wù)調(diào)度,或者說(shuō)可以將一個(gè)大的任務(wù)拆成多個(gè)小任務(wù)分配到不同的服務(wù)器。
TBSchedule的分布式機(jī)制是通過(guò)靈活的Sharding方式實(shí)現(xiàn)的,比如可以按所有數(shù)據(jù)的ID按10取模分片(分片規(guī)則如圖2.1)、按月份分片等等,根據(jù)不同的需求,不同的場(chǎng)景由客戶(hù)端配置分片規(guī)則。然后就是TBSchedule的宿主服務(wù)器可以進(jìn)行動(dòng)態(tài)擴(kuò)容和資源回收,這個(gè)特點(diǎn)主要是因?yàn)樗蠖艘蕾?lài)的ZooKeeper,這里的ZooKeeper對(duì)于TBSchedule來(lái)說(shuō)是一個(gè)NoSQL,用于存儲(chǔ)策略、任務(wù)、心跳信息數(shù)據(jù),它的數(shù)據(jù)結(jié)構(gòu)類(lèi)似文件系統(tǒng)的目錄結(jié)構(gòu),它的節(jié)點(diǎn)有臨時(shí)節(jié)點(diǎn)、持久節(jié)點(diǎn)之分。調(diào)度引擎上線后,隨著業(yè)務(wù)量數(shù)據(jù)量的增多,當(dāng)前Cluster可能不能滿(mǎn)足目前的處理需求,那么就需要增加服務(wù)器數(shù)量,一個(gè)新的服務(wù)器上線后會(huì)在ZooKeeper中創(chuàng)建一個(gè)代表當(dāng)前服務(wù)器的一個(gè)唯一性路徑(臨時(shí)節(jié)點(diǎn)),并且新上線的服務(wù)器會(huì)和ZooKeeper保持長(zhǎng)連接,當(dāng)通信斷開(kāi)后,節(jié)點(diǎn)會(huì)自動(dòng)摘除。
TBSchedule會(huì)定時(shí)掃描當(dāng)前服務(wù)器的數(shù)量,重新進(jìn)行任務(wù)分配。TBSchedule不僅提供了服務(wù)端的高性能調(diào)度服務(wù),還提供了一個(gè)scheduleConsole war隨著宿主應(yīng)用的部署直接部署到服務(wù)器,可以通過(guò)web的方式對(duì)調(diào)度的任務(wù)、策略進(jìn)行監(jiān)控管理,以及實(shí)時(shí)更新調(diào)整。
是不是已經(jīng)對(duì)TBSchedule稍微了有些好感呢?我們接著往下看。
TBSchedule提供了兩個(gè)核心組件ScheduleServer、TBScheduleManagerFactory和兩類(lèi)核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,這兩部分是客戶(hù)端研發(fā)的關(guān)鍵部分,是使用TBSchedule必須要了解的。
ScheduleServer即任務(wù)處理器,的主要作用是任務(wù)和策略的管理、任務(wù)采集和執(zhí)行,由一組工作線程組成,這組工作線程是基于隊(duì)列實(shí)現(xiàn)的,進(jìn)行任務(wù)抓取和任務(wù)處理(有兩種處理模式,下面會(huì)講)。每個(gè)任務(wù)處理器和ZooKeeper有一個(gè)心跳通信連接,用于檢測(cè)Server的狀態(tài)和進(jìn)行任務(wù)動(dòng)態(tài)分配。舉個(gè)例子,比如3臺(tái)服務(wù)器的worker集群執(zhí)行出票消息生成任務(wù),對(duì)于這個(gè)任務(wù)類(lèi)型每臺(tái)服務(wù)器可以配置一個(gè)ScheduleSever(即一個(gè)線程組),也可以配置兩個(gè)線程組,那么就相當(dāng)于6臺(tái)服務(wù)器在并行執(zhí)行此任務(wù)類(lèi)型。當(dāng)某臺(tái)服務(wù)器宕機(jī)或者其他原因與ZooKeeper通信斷開(kāi)時(shí),它的任務(wù)將被其他服務(wù)器接管。ScheduleServer參數(shù)定義如圖2.2
在這些參數(shù)中taskItems是一個(gè)非常重要的屬性,是客戶(hù)單可以自由發(fā)揮的地方,是任務(wù)分片的基礎(chǔ),比如我們處理一個(gè)任務(wù)可以根據(jù)ID按10取模,那么任務(wù)項(xiàng)就是0-9,3臺(tái)服務(wù)器分別拿到4、 3、 3個(gè)任務(wù)項(xiàng),服務(wù)器的上下線都會(huì)對(duì)任務(wù)項(xiàng)進(jìn)行重新分配。任務(wù)項(xiàng)是進(jìn)行任務(wù)分配的最小單位。一個(gè)任務(wù)項(xiàng)只能由一個(gè)ScheduleServer來(lái)進(jìn)行處理,但一個(gè)Server可以處理任意數(shù)量的任務(wù)項(xiàng)。這就是剛才我們說(shuō)的分片特性。
調(diào)度服務(wù)器TBScheduleManagerFactory的主要工作ZooKeeper連接參數(shù)配置和ZooKeeper的初始化、調(diào)度管理。
兩類(lèi)核心接口是需要被我們定義的目標(biāo)任務(wù)實(shí)現(xiàn)的,根據(jù)自己的需要進(jìn)行任務(wù)采集(重寫(xiě)selectTasks方法)和任務(wù)執(zhí)行(重寫(xiě)execute方法),這兩類(lèi)接口也是客戶(hù)端研發(fā)根據(jù)需求自由發(fā)揮的地方。
接下來(lái)我們深入了解下TBSchedule,看看它的內(nèi)部是如何實(shí)現(xiàn)的。圖2.3流程圖是我花了很多心血通過(guò)一周時(shí)間畫(huà)出來(lái)的,基本是清晰的展現(xiàn)了TBSchedule內(nèi)部的執(zhí)行流程以及每個(gè)步驟ZooKeeper節(jié)點(diǎn)路徑和數(shù)據(jù)的變化。因?yàn)閳D中的注釋已經(jīng)描述的很詳細(xì)了,每個(gè)節(jié)點(diǎn)右側(cè)是ZooKeeper的信息(數(shù)據(jù)結(jié)構(gòu)見(jiàn)圖2.4),這里就不再做過(guò)多的文字描述了,有任何建議或者不明白的地方可以找我交流。
TBSchedule還有個(gè)強(qiáng)大之處是它提供了兩種處理器模式模式:
1. SLEEP模式
當(dāng)某一個(gè)線程任務(wù)處理完畢,從任務(wù)池中取不到任務(wù)的時(shí)候,檢查其它線程是否處于活動(dòng)狀態(tài)。如果是,則自己休眠;如果其它線程都已經(jīng)因?yàn)闆](méi)有任務(wù)進(jìn)入休眠,當(dāng)前線程是最后一個(gè)活動(dòng)線程的時(shí)候,就調(diào)用業(yè)務(wù)接口,獲取需要處理的任務(wù),放入任務(wù)池中,同時(shí)喚醒其它休眠線程開(kāi)始工作。
2. NOTSLEEP模式
當(dāng)一個(gè)線程任務(wù)處理完畢,從任務(wù)池中取不到任務(wù)的時(shí)候,立即調(diào)用業(yè)務(wù)接口獲取需要處理的任務(wù),放入任務(wù)池中。
SLEEP模式內(nèi)部邏輯相對(duì)較簡(jiǎn)單,如果遇到大任務(wù)需要處理較長(zhǎng)時(shí)間,可能會(huì)造成其他線程被動(dòng)阻塞的情況。但其實(shí)生產(chǎn)環(huán)境一般都是小而快的任務(wù),即使出現(xiàn)阻塞的情況ScheduleConsole也會(huì)及時(shí)的監(jiān)控到。NOTSLEEP模式減少了線程休眠的時(shí)間,避免了因大任務(wù)造成阻塞的情況,但為了避免數(shù)據(jù)被重復(fù)處理,增加了CPU在數(shù)據(jù)比較上的開(kāi)銷(xiāo)。TBSchedule默認(rèn)是SLEEP模式。
到目前為止我相信大家對(duì)TBSchedule有了一個(gè)深刻的了解,心中的疑霧逐漸散開(kāi)了。理論是實(shí)踐的基礎(chǔ),實(shí)踐才是最終的目的,下一節(jié)我們將結(jié)合理論知識(shí)進(jìn)行TBSchedule實(shí)戰(zhàn)。
在項(xiàng)目中使用TBSchedule需要依賴(lài)ZooKeeper、TBSchedule。
ZooKeeper依賴(lài):
<dependency> <groupId>org.apache.ZooKeeper</groupId> <artifactId>ZooKeeper</artifactId> <version>3.4.6</version> </dependency>
TBSchedule依賴(lài):
<dependency> <groupId>com.taobao.pamirs.schedule</groupId> <artifactId>TBSchedule</artifactId> <version>3.3.3.2</version> </dependency>
TBSchedule有三種引入方式:
TBSchedule隨著宿主調(diào)度應(yīng)用部署到服務(wù)器后,可以通過(guò)Web瀏覽器的方式訪問(wèn)其提供監(jiān)控平臺(tái)。
第一步,初始化ZooKeeper
第二步,創(chuàng)建調(diào)度策略
第三步,創(chuàng)建調(diào)度任務(wù)
第四步,監(jiān)控調(diào)度任務(wù)
2、通過(guò)原生Java引入
// 初始化Spring ApplicationContext ctx = new FileSystemXmlApplicationContext( "spring-config.xml"); // 初始化調(diào)度工廠 TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory(); Properties p = new Properties(); p.put("zkConnectString", "127.0.0.1:2181"); p.put("rootPath", "/taobao-schedule/train_worker"); p.put("zkSessionTimeout", "60000"); p.put("userName", "train_dev"); p.put("password", " train_dev "); p.put("isCheckParentPath", "true"); scheduleManagerFactory.setApplicationContext(ctx); scheduleManagerFactory.init(p); // 創(chuàng)建任務(wù)調(diào)度任務(wù)的基本信息String baseTaskTypeName = "DemoTask"; ScheduleTaskType baseTaskType = new ScheduleTaskType(); baseTaskType.setBaseTaskType(baseTaskTypeName); baseTaskType.setDealBeanName("demoTaskBean"); baseTaskType.setHeartBeatRate(10000); baseTaskType.setJudgeDeadInterval(100000); baseTaskType.setTaskParameter("AREA=BJ,YEAR>30"); baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem( "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," + "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," + "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}")); baseTaskType.setFetchDataNumber(500); baseTaskType.setThreadNumber(5); this.scheduleManagerFactory.getScheduleDataManager() .createBaseTaskType(baseTaskType); log.info("創(chuàng)建調(diào)度任務(wù)成功:" + baseTaskType.toString()); // 創(chuàng)建任務(wù)的調(diào)度策略 String taskName = baseTaskTypeName; String strategyName =taskName +"-Strategy"; try { this.scheduleManagerFactory.getScheduleStrategyManager() .deleteMachineStrategy(strategyName,true); } catch (Exception e) { e.printStackTrace(); } ScheduleStrategy strategy = new ScheduleStrategy(); strategy.setStrategyName(strategyName); strategy.setKind(ScheduleStrategy.Kind.Schedule); strategy.setTaskName(taskName); strategy.setTaskParameter("china"); strategy.setNumOfSingleServer(1); strategy.setAssignNum(10); strategy.setIPList("127.0.0.1".split(",")); this.scheduleManagerFactory.getScheduleStrategyManager() .createScheduleStrategy(strategy); log.info("創(chuàng)建調(diào)度策略成功:" + strategy.toString());
3、通過(guò)Spring容器引入
<!-- 初始化ZooKeeper --> <bean id="scheduleManagerFactory" class="xx.xx.TBScheduleManagerFactory"><property name="zkConfig"><map> <entry key="zkConnectString" value="127.0.0.1:2181" /> <entry key="rootPath" value="/taobao-schedule/train_worker" /> <entry key="zkSessionTimeout" value="60000" /> <entry key="userName" value="train_dev" /> <entry key="password" value="train_dev" /> <entry key="isCheckParentPath" value="true" /></map></property> </bean><!-- 配置調(diào)度策略 凌晨1點(diǎn)到3點(diǎn)執(zhí)行 --><bean id="abstractDemoScheduleTask" class="com.xx.core.TBSchedule.InitScheduleTask" abstract="true"><property name="scheduleTaskType.heartBeatRate" value="10000" /><property name="scheduleTaskType.judgeDeadInterval" value="100000" /><property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/> <property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/> <property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" /><property name="scheduleTaskType.sleepTimeNoData" value="60000"/><property name="scheduleTaskType.sleepTimeInterval" value="60000"/><property name="scheduleTaskType.fetchDataNumber" value="500" /><property name="scheduleTaskType.executeNumber" value="1" /><property name="scheduleTaskType.threadNumber" value="5" /><property name="scheduleTaskType.taskItems"> <list> <value>0:{TYPE=A,KIND=1}</value> <value>1:{TYPE=A,KIND=2}</value> <value>2:{TYPE=A,KIND=3}</value> <value>3:{TYPE=A,KIND=4}</value> <value>4:{TYPE=A,KIND=5}</value> <value>5:{TYPE=A,KIND=6}</value> <value>6:{TYPE=A,KIND=7}</value> <value>7:{TYPE=A,KIND=8}</value> <value>8:{TYPE=A,KIND=9}</value> <value>9:{TYPE=A,KIND=10}</value> </list></property><property name="scheduleStrategy.kind" value="Schedule" /><property name="scheduleStrategy.numOfSingleServer" value="1" /><property name="scheduleStrategy.assignNum" value="10" /> <property name="scheduleStrategy.iPList"> <list> <value>127.0.0.1</value> </list> </property> </bean> <!-- 配置調(diào)度任務(wù) --><bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask"><property name="scheduleTaskType.baseTaskType" value="demoTask" /><property name="scheduleTaskType.dealBeanName" value="demoTaskBean" /><property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" /><property name="scheduleStrategy.taskName" value="demoTaskBean" /></bean> 調(diào)度任務(wù)具體實(shí)現(xiàn) DemoTask.java /** * DemoTask任務(wù)類(lèi) */public class DemoTask mplements IScheduleTaskDealSingle,TScheduleTaskDeal { /** * 數(shù)據(jù)采集 * @param taskItemNum--分配的任務(wù)項(xiàng) taskItemList--總?cè)蝿?wù)項(xiàng) * eachFetchDataNum--采集任務(wù)數(shù)量 */ @Override public List<DemoTask> selectTasks(String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception { List<DemoTask> taskList = new LinkedList<DemoTask>(); //客戶(hù)端根據(jù)條件進(jìn)行數(shù)據(jù)采集start //客戶(hù)端根據(jù)條件進(jìn)行數(shù)據(jù)采集end return rt; }/** * 數(shù)據(jù)處理 */ @Override public boolean execute(DemoTask task, String ownSign) throws Exception { //客戶(hù)端pop任務(wù)進(jìn)行處理start //客戶(hù)端pop任務(wù)進(jìn)行處理end return true; }}
其實(shí)我們看對(duì)于TBSchedule客戶(hù)端的使用非常簡(jiǎn)單,初始化ZooKeeper、配置調(diào)度策略、配置調(diào)度任務(wù),對(duì)調(diào)度任務(wù)進(jìn)行具體實(shí)現(xiàn),就這幾個(gè)步驟?,F(xiàn)在可以慶祝下了,你又掌握了一個(gè)優(yōu)秀開(kāi)源框架的設(shè)計(jì)思想和使用方式。
任何事物都是沒(méi)有最好只有更好,TBSchedule也一樣,雖然它現(xiàn)在已經(jīng)很完美了,我們不能放棄對(duì)更完美的追求。阿里團(tuán)隊(duì)可以在下面幾個(gè)方面進(jìn)行優(yōu)化。
至此,我們已經(jīng)完成了對(duì)TBSchedule的全部介紹,盡快使用起來(lái)吧!
TBSchedule技術(shù)交流掃描下方二維碼:
若群滿(mǎn),請(qǐng)?zhí)砑尤褐鱭shuguang2008為好友,備注:姓名+公司+職位+TBSchedule申請(qǐng)入群。
聯(lián)系客服