1、為什么需要消息隊列?
當(dāng)系統(tǒng)中出現(xiàn)“生產(chǎn)“和“消費“的速度或穩(wěn)定性等因素不一致的時候,就需要消息隊列,作為抽象層,彌合雙方的差異。
舉個例子:業(yè)務(wù)系統(tǒng)觸發(fā)短信發(fā)送申請,但短信發(fā)送模塊速度跟不上,需要將來不及處理的消息暫存一下,緩沖壓力。
再舉個例子:調(diào)遠程系統(tǒng)下訂單成本較高,且因為網(wǎng)絡(luò)等因素,不穩(wěn)定,攢一批一起發(fā)送。
再舉個栗子,交互模塊5:00到24:00和電商系統(tǒng)聯(lián)通,和內(nèi)部ERP斷開。1:00到4:00和ERP聯(lián)通,和電商系統(tǒng)斷開。
再舉個例子,服務(wù)員點菜快,廚師做菜慢。
再舉個例子,到銀行辦事的人多,提供服務(wù)的窗口少。
乖乖排隊吧。
2、使用消息隊列有什么好處?
2.1、提高系統(tǒng)響應(yīng)速度
使用了消息隊列,生產(chǎn)者一方,把消息往隊列里一扔,就可以立馬返回,響應(yīng)用戶了。無需等待處理結(jié)果。
處理結(jié)果可以讓用戶稍后自己來取,如醫(yī)院取化驗單。也可以讓生產(chǎn)者訂閱(如:留下手機號碼或讓生產(chǎn)者實現(xiàn)listener接口、加入監(jiān)聽隊列),有結(jié)果了通知。獲得約定將結(jié)果放在某處,無需通知。
2.2、提高系統(tǒng)穩(wěn)定性
考慮電商系統(tǒng)下訂單,發(fā)送數(shù)據(jù)給生產(chǎn)系統(tǒng)的情況。
電商系統(tǒng)和生產(chǎn)系統(tǒng)之間的網(wǎng)絡(luò)有可能掉線,生產(chǎn)系統(tǒng)可能會因維護等原因暫停服務(wù)。
如果不使用消息隊列,電商系統(tǒng)數(shù)據(jù)發(fā)布出去,顧客無法下單,影響業(yè)務(wù)開展。
兩個系統(tǒng)間不應(yīng)該如此緊密耦合。應(yīng)該通過消息隊列解耦。同時讓系統(tǒng)更健壯、穩(wěn)定。
3、為什么需要分布式?
3.1、多系統(tǒng)協(xié)作需要分布式
消息隊列中的數(shù)據(jù)需要在多個系統(tǒng)間共享數(shù)據(jù)才能發(fā)揮價值。
所以必須提供分布式通信機制、協(xié)同機制。
3.2、單系統(tǒng)內(nèi)部署環(huán)境需要分布式
單系統(tǒng)內(nèi)部,為了更好的性能、為了避免單點故障,多為集群環(huán)境。
集群環(huán)境中,應(yīng)用運行在多臺服務(wù)器的多個JVM中;數(shù)據(jù)也保存在各種類型的數(shù)據(jù)庫或非數(shù)據(jù)庫的多個節(jié)點上。
為了滿足多節(jié)點協(xié)作需要,需要提供分布式的解決方案。
4、分布式環(huán)境下需要解決哪些問題
4.1、并發(fā)問題
需進行良好的并發(fā)控制。確?!熬€程安全“。
不要出現(xiàn)一個訂單被出貨兩次。不要出現(xiàn)顧客A下的單,發(fā)貨發(fā)給了顧客B等情況。
4.2、簡單的、統(tǒng)一的操作機制
需定義簡單的,語義明確的,業(yè)務(wù)無關(guān)的,恰當(dāng)穩(wěn)妥的統(tǒng)一的訪問方式。
4.3、容錯
控制好單點故障,確保數(shù)據(jù)安全。
4.4、可橫向擴展
可便捷擴容。
5、如何實現(xiàn)?
成熟的消息隊列中間件產(chǎn)品太多了,族繁不及備載。
成熟產(chǎn)品經(jīng)過驗證,接口規(guī)范,可擴展性強。
結(jié)合事業(yè)環(huán)境因素、組織過程遺產(chǎn)、實施運維考慮、技術(shù)路線考慮、開發(fā)人員情況等原因綜合考慮,基于Redis自己做一個是最可行的選擇。 1、消息隊列需提供哪些功能?
在功能設(shè)計上,我崇尚奧卡姆剃刀法則。
對于消息隊列,只需要兩個方法: 生產(chǎn) 和 消費。
具體的業(yè)務(wù)場景是任務(wù)隊列,代碼設(shè)計如下:
public abstract class TaskQueue{
private final String name ;
public String getName(){return this.name;}
public abstract void addTask(Serializable taskId);
public abstract Serializable popTask();
}
同時支持多個隊列,每個隊列都應(yīng)該有個名字。final確保TaskQueue是線程安全的。TaskQueue的實現(xiàn)類也應(yīng)該確保線程安全。
addTask向隊列中添加一個任務(wù)。隊列中僅保存任務(wù)的id,不存儲任務(wù)的業(yè)務(wù)數(shù)據(jù)。
popTask從隊列中取出一個任務(wù)來執(zhí)行。
這種設(shè)計不是特別友好,因為她需要調(diào)用者自行保證任務(wù)執(zhí)行成功,如果執(zhí)行失敗,自行確保重新把任務(wù)放回隊列。 無論如何,這種機制是可以工作的。想想奧卡姆剃刀法則,我們先按照這個設(shè)計實現(xiàn)出來看看。
如果調(diào)用者把業(yè)務(wù)數(shù)據(jù)存在數(shù)據(jù)庫中,業(yè)務(wù)數(shù)據(jù)中包含“狀態(tài)“列,標(biāo)識任務(wù)是否被執(zhí)行,調(diào)用者需要自行管理這個狀態(tài),并控制事務(wù)。
popTask采用阻塞方式,還是非阻塞方式呢?
如果采用阻塞方式,隊列中沒任務(wù)的時候,客戶端不會斷開連接,只是等。
一般情況下,客戶端會有多個worker搶著干活兒,幾條狼一起等一個肉包子,畫面太美。連接是重要資源,如果一直沒活兒干,先放回池里,也不錯。
先采用非阻塞的方式吧,如果隊列是空的,popTask返回null,立即返回。
2、后續(xù)可能提供的功能
2.1、引入Task生命周期概念
應(yīng)用場景不同,需求也不同。
在嚴格的應(yīng)用場景中,需要確保每個Task執(zhí)行“成功“了。
對于上面提到的popTask后不管的“模式“,這是另外一種“運行模式“,兩種模式可以并行存在。
在這種新模式下,Task狀態(tài)有3種:新創(chuàng)建(new,剛調(diào)用addTask加到隊列中)、正在執(zhí)行(in-process,調(diào)用popTask后,調(diào)用finish前)、完成(done,執(zhí)行OK了,調(diào)用finishTask后)。
調(diào)整后的代碼如下:
public abstract class TaskQueue{
private final String name ;
public String getName(){return this.name;}
public abstract int getMode();
public abstract void addTask(Serializable taskId);
public abstract Serializable popTask();
public abstract void finishTask(Serializable taskId);
}
2.2、增加批量取出任務(wù)的功能
popTask()一次取出一個任務(wù),太磨嘰了。
好比我們要買5瓶水,開車去超市買,每去一次買1瓶,有點兒啥。
我們需要一個一次取多個任務(wù)的方法。
public abstract class TaskQueue{
... ...
public abstract Serializable[] popTasks(long cnt);
}1
2.3、增加阻塞等待機制
想象一種場景:
小明同學(xué),取出一個任務(wù),發(fā)現(xiàn)干不了,放回隊列,再去取,取出來發(fā)現(xiàn)還是干不了,又放回去。反反復(fù)復(fù)。
小明童鞋腫么了?可能是他干活需要網(wǎng)絡(luò),網(wǎng)絡(luò)斷了??赡苁撬鋈蝿?wù)需要寫磁盤,磁盤滿了。
如果小明像鄰居家的孩子一樣優(yōu)秀,當(dāng)他發(fā)現(xiàn)哪里不對的時候,他應(yīng)該冷靜下來,歇會兒。
但他萬一不是呢?只有我們能幫他了。
假如隊列中有10000個待辦任務(wù)。
這時候小明來了。他失敗100次后,我們應(yīng)該攔他嗎?不應(yīng)該,除非他主動要求(在系統(tǒng)參數(shù)中配置)。5000次后呢?也不應(yīng)該,除非他主動要求。我們的原則是:我們做的所有事情,對于調(diào)用者,都是可以預(yù)期的。
我們可以在系統(tǒng)參數(shù)中要求調(diào)用者設(shè)置一個閥值N,如果不設(shè)置,默認為100。連續(xù)失敗N次后,讓調(diào)用者睡一會兒,睡多長時間,讓調(diào)用者配置。
假如我們的底層實現(xiàn)中包含待辦子隊列、重做子隊列和完成子隊列(這種設(shè)計好復(fù)雜!pop的時候先pop重做,還是先pop待辦,復(fù)雜死了!但愿不需要這樣)。
待辦子隊列中有10000個任務(wù)。
在小明失敗10000次后,所有的任務(wù)都在重做子隊列了。這時候我們應(yīng)該攔他嗎?
重做子隊列要不要設(shè)置大小,超過之后,讓下一個訪問者等。
等的話就會涉及超時,超時后,任務(wù)也不能丟棄。
太復(fù)雜 了!設(shè)置一個連續(xù)失敗次數(shù)的限制就夠了!
2.4、考慮增加Task類
不保存任務(wù)的相關(guān)數(shù)據(jù)是基本原則,不動搖。
增加Task類可以管理下生命周期,更有用的是,可以把Task本身設(shè)計成Listener,代碼大概時這樣的:
public abstract class Task{
public Serializable getId();
public int getState();
pubic void doTask();
public void whenAdded(final TaskQueue tq);
public void whenPoped(final TaskQueue tq);
// public void whenFaild(final TaskQueue tq);
public void whenFinished(final TaskQueue tq);
}
通過Task接口,我們可以對調(diào)用過程進行更強勢的管理(如進行事務(wù)控制),對調(diào)用者施加更強的控制,用戶也可以獲得更多的交互機會,同TaskQueue有更好的交互(如在whenFinished中做持久化工作)。
但這些真的有必要嗎?是不是太侵入了?注解的方式會好些嗎?
再考慮吧。
2.5、增加系統(tǒng)參數(shù)
貌似需要個Config類了,不爽!
本來想做一個很小很精致的小東西的,如果必須再加吧。
如果做的話,需要支持properties、注解設(shè)置、api方式設(shè)置、Spring注入式設(shè)置,煩。
次回預(yù)告:Redis本身機制和TaskQueue的契合。
1、Redis是什么鬼?
Redis是一個簡單的,高效的,分布式的,基于內(nèi)存的緩存工具。
假設(shè)好服務(wù)器后,通過網(wǎng)絡(luò)連接(類似數(shù)據(jù)庫),提供Key-Value式緩存服務(wù)。
簡單,是Redis突出的特色。
簡單可以保證核心功能的穩(wěn)定和優(yōu)異。
2、性能
性能方面:Redis是足夠高效的。
和Memecached對比,在數(shù)據(jù)量較小大情況下,Redis性能更優(yōu)秀。
數(shù)據(jù)量大到一定程度的時候,Memecached性能稍好。
簡單結(jié)論:但總體上講Redis性能已經(jīng)足夠好。
經(jīng)實驗得知:
List操作和字符串操作性能相當(dāng),略差,幾乎可以忽略。
使用Jedis自帶pool,“每次從pool中取用完放回“ 和 “重用單個連接“ 相比,平均用時是3倍。這部分需要繼續(xù)研究底層機制,采用更合理的實驗方法進一步獲得數(shù)據(jù)。
使用Jedis自帶pool,性能上是滿足當(dāng)前訪問量需要的,等有時間了再進一步深入。
3、數(shù)據(jù)類型
Redis支持5種數(shù)據(jù)類型:字符串、Map、List、Set、Sorted Set。
List特別適合用于實現(xiàn)隊列。提供的操作包括:
從左側(cè)(或右側(cè))放入一個元素,從右側(cè)(或左側(cè))取出一個元素,讀取某個范圍的元素,刪除某個范圍的元素。
Sorted Set中元素是的,可以通過名字找。
Map可以高效地通過key找。
假如我們需要實現(xiàn)finishTash(taskId),需要通過名字在隊列中找元素,上面兩個可能會用到。
4、原子操作
實現(xiàn)分布式隊列首要問題是:不能出現(xiàn)并發(fā)問題。
Redis是底層是單線程的,命令執(zhí)行是原子操作,支持事務(wù),契合了我們的需求。
Redis直接提供的命令都是原子操作,包括lpush、rpop、blpush、brpop等。
Redis支持事務(wù)。通過類似 begin…[cancel]…commit的語法,提供begin…commit之間的命令為原子操作的功能,之間命令對數(shù)據(jù)的改變對其他操作是不可見的。類似關(guān)系型數(shù)據(jù)庫中的存儲過程,同時提供了別的事務(wù)隔離級別。
Redis支持腳本,每個腳本的執(zhí)行是原子性的。
做了一下并發(fā)測試:
寫了個小程序,隨機對List做push或pop操作,push的比pop的稍多。
記錄每次處理的詳細信息到數(shù)據(jù)庫。
最后把List中數(shù)據(jù)都pop出來,詳細記錄每次pop詳細信息。
統(tǒng)計push和pop是否相等,統(tǒng)計針對每條數(shù)據(jù)是否都有push和pop。
500并發(fā),沒有出現(xiàn)并發(fā)問題。
5、集群
實現(xiàn)分布式隊列另一個重要問題是:不能出現(xiàn)單點故障。
Redis支持Master-Slave數(shù)據(jù)復(fù)制,從服務(wù)器設(shè)置 slave-of master-ip:port 即可。
集群功能可以由客戶端提供。
客戶端使用哨兵,可自動切換主服務(wù)器。
由于隊列操作都是寫操作,從服務(wù)器主要目的是備份數(shù)據(jù),保證數(shù)據(jù)安全。
如果想基于 sharding 做多master集群,可以結(jié)合 zookeeper 自己做。
Redis 3.0支持集群了,還沒細看,應(yīng)該是個好消息,等大家都用起來,沒什么問題的話,可以考慮試試看。
如果 master 宕掉,怎么辦?
“哨兵”會選出一個新的master來。產(chǎn)生過程中,消息隊列暫停服務(wù)。
最極端的情況,所有Redis都停了,當(dāng)消息隊列發(fā)現(xiàn)Redis停止響應(yīng)時,對業(yè)務(wù)系統(tǒng)的請求應(yīng)拋出異常,停止隊列服務(wù)。
這樣會影響業(yè)務(wù),業(yè)務(wù)系統(tǒng)下訂單、審批等操作會失敗。如果可以接受,這是一種方案。
Redis整個集群宕掉,這種情況很少發(fā)生,如果真發(fā)生了,業(yè)務(wù)系統(tǒng)停止服務(wù)也是可以理解的。
如果想要在Redis整個集群宕掉的情況下,消息隊列仍繼續(xù)提供服務(wù)。
方法是這樣的:
啟用備用存儲機制,可以是zookeeper、可以是關(guān)系型數(shù)據(jù)庫、可以是另外可用的Memecached等。
本地內(nèi)存存儲是不可取的,首先,同步多個客戶端虛擬機內(nèi)存數(shù)據(jù)太復(fù)雜,相當(dāng)于自己實現(xiàn)了一個Redis,其次,保證內(nèi)存數(shù)據(jù)存儲安全太復(fù)雜。
備用存儲機制相當(dāng)于實現(xiàn)了另外一個版本的消息隊列,邏輯一致,底層存儲不同。這個實現(xiàn)可以性能低一些,保證最基本的原則即可。
想要保證不出現(xiàn)并發(fā)問題,由于消息隊列程序同時運行在多個虛擬機中,對象鎖、方法鎖無效。需要有一個獨立于虛擬機的鎖機制,zookeeper是個好選擇。
將關(guān)系型數(shù)據(jù)庫設(shè)置為別的事務(wù)隔離級別,太傻了。除了zk有其他好辦法嗎?
Redis集群整個宕掉的同時Zookeeper也全軍覆沒怎么辦?
這個問題是沒有盡頭的,提供了第二備用存儲、第三備用存儲、第四備用存儲、…,理論上也會同時宕掉,那時候怎么辦?
有錢任性的土豪可以繼續(xù),預(yù)算有限的情況,能做到哪步就做到哪步。
6、持久化
分布式隊列的應(yīng)用場景和緩存的應(yīng)用場景是不一樣的。
如果有沒來得及持久化的數(shù)據(jù)怎么辦?
從業(yè)務(wù)系統(tǒng)的角度,已經(jīng)成功發(fā)送給消息隊列了。
消息隊列也以為Redis妥妥地收好了。
可Redis還沒寫到日記里,更沒有及時通知小伙伴,掛了。可能是斷電了,可能是進程被kill了。
后果會怎樣?
已經(jīng)執(zhí)行過的任務(wù)會再次執(zhí)行一遍。
已經(jīng)放到隊列中的任務(wù),消失了。
標(biāo)記為已經(jīng)完成的任務(wù),狀態(tài)變?yōu)椤斑M行中”了,然后又被執(zhí)行了一遍。
后果不可接受。
分布式隊列不允許丟數(shù)據(jù)。
從業(yè)務(wù)角度,哪怕丟1條數(shù)據(jù)也是無法接受的。
從運維角度,Redis丟數(shù)據(jù)后,如果可以及時發(fā)現(xiàn)并補救,也是可以接受的。
從架構(gòu)角度,隊列保存在Redis中,業(yè)務(wù)數(shù)據(jù)(包括任務(wù)狀態(tài))保存在關(guān)系型數(shù)據(jù)庫中。
任務(wù)狀態(tài)是從業(yè)務(wù)角度確定的,消息隊列不應(yīng)該干涉。如果業(yè)務(wù)狀態(tài)沒有統(tǒng)一的規(guī)范和定義,從業(yè)務(wù)數(shù)據(jù)比對任務(wù)隊列是否全面正確,就只能交給業(yè)務(wù)開發(fā)方來做。
從分工上來看,任務(wù)隊列的目的是管理任務(wù)執(zhí)行的狀態(tài),業(yè)務(wù)系統(tǒng)把這個職責(zé)交給了任務(wù)隊列,業(yè)務(wù)系統(tǒng)自身的任務(wù)狀態(tài)維護未必準(zhǔn)確。
結(jié)論:任務(wù)隊列不能推卸責(zé)任,不能丟數(shù)據(jù)是核心功能,不能打折扣。
采用 Master-Slave 數(shù)據(jù)復(fù)制模式,配置bgsave,追加存儲到aof。
在從服務(wù)器上配置bgsave,不影響master性能。
隊列操作都是寫操作,master任務(wù)繁重,能讓slave分擔(dān)的持久化工作,就不要master做。
rdb和aof兩種方法都用上,多重保險。
appendfsync設(shè)為always。// 單節(jié)點測性能,連續(xù)100000次算平均時間,和per second比對,性能損失不大。
性能會有些許損失,但任務(wù)執(zhí)行為異步操作,無需用戶同步等待,為了保證數(shù)據(jù)安全,這樣是值得的。
當(dāng)運維需要重啟Master服務(wù)器的時候,采取這樣的順序:
1. 通過 cli shutdown 停止master服務(wù)器, master交代完后事后,關(guān)掉自己。這時候“哨兵”會找一個新的master出來。
萬萬不可以直接kill或者直接打開防火墻中斷master和slave之間的連接。
master 對外防火墻,停止對外服務(wù),Master 自動切換到其他服務(wù)器上, 原 Master 繼續(xù)持久化 aof,發(fā)送到原來各從服務(wù)器。
2. 在原 master 上進行運維操作。
3. 啟動原 master,這時候它已經(jīng)是從服務(wù)器了。耐心等待它從新 master 獲取數(shù)據(jù)。觀察 redis 日志輸出,確認數(shù)據(jù)安全。
4. 對新的 master 重復(fù)1-3的操作。
5. 將以上操作寫成腳本,自動化執(zhí)行,避免人為錯誤。
1、訪問Redis的工具類
public class RedisManager {
private static Pool<Jedis> pool;
protected final static Logger logger = Logger.getLogger(RedisManager.class);
static{
try {
init();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void init() throws Exception {
Properties props = ConfigManager.getProperties("redis");
logger.debug("初始化Redis連接池。");
if(props==null){
throw new RuntimeException("沒有找到redis配置文件");
}
// 創(chuàng)建jedis池配置實例
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 設(shè)置池配置項值
int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim());
jedisPoolConfig.setMaxTotal(poolMaxTotal);
int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim());
jedisPoolConfig.setMaxIdle(poolMaxIdle);
long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim());
jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);
logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ",
poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));
// 根據(jù)配置實例化jedis池
String connectMode = props.getProperty("redis.connectMode");
String hostPortStr = props.getProperty("redis.hostPort");
logger.debug(String.format("host : %s ",hostPortStr));
logger.debug(String.format("mode : %s ",connectMode));
if(StringUtils.isEmpty(hostPortStr)){
throw new OptimusException("redis配置文件未配置主機-端口集");
}
String[] hostPortSet = hostPortStr.split(",");
if("single".equals(connectMode)){
String[] hostPort = hostPortSet[0].split(":");
pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
}else if("sentinel".equals(connectMode)){
Set<String> sentinels = new HashSet<String>();
for(String hostPort : hostPortSet){
sentinels.add(hostPort);
}
pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);
}
}
/**
* 使用完成后,必須調(diào)用 returnResource 還回。
* @return 獲取Jedis對象
*/
public static Jedis getResource(){
Jedis jedis = pool.getResource();
if(logger.isDebugEnabled()){
logger.debug("獲得鏈接:" + jedis);
}
return jedis;
}
/**
* 獲取Jedis對象。
*
* 用完后,需要調(diào)用returnResource放回連接池。
*
* @param db 數(shù)據(jù)庫序號
* @return
*/
public static Jedis getResource(int db){
Jedis jedis = pool.getResource();
jedis.select(db);
if(logger.isDebugEnabled()){
logger.debug("獲得鏈接:" + jedis);
}
return jedis;
}
/**
* @param jedis
*/
public static void returnResource(Jedis jedis){
if(jedis!=null){
pool.returnResource(jedis);
if(logger.isDebugEnabled()){
logger.debug("放回鏈接:" + jedis);
}
}
}
/**
* 需要通過Spring確認這個方法被調(diào)用。
* @throws Exception
*/
public static void destroy() throws Exception {
pool.destroy();
}
}
這個類沒有通過技術(shù)手段強制調(diào)用returnResource和destroy,需要想想辦法。
2、隊列接口
public interface TaskQueue {
/**
* 獲取隊列名
* @return
*/
String getName();
/**
* 往隊列中添加任務(wù)
* @param task
*/
void pushTask(String task);
/**
* 從隊列中取出一個任務(wù)
* @return
*/
String popTask();
}
用String類型描述任務(wù),也可以考慮byte[],要求對每個任務(wù)描述的數(shù)據(jù)盡可能短。
3、隊列的Redis實現(xiàn)類
/**
* 任務(wù)隊列Redis實現(xiàn)。
*
* 采用每次獲取Jedis并放回pool的方式。
* 如果獲得Jedis后一直不放手,反復(fù)重用,兩個操作耗時可以降低1/3。
* 暫時先忍受這種低性能,不明確Jedis是否線程安全。
*
*/
public class TaskQueueRedisImpl implements TaskQueue {
private final static int REDIS_DB_IDX = 9;
private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);
private final String name;
/**
* 構(gòu)造函數(shù)。
*
* @param name
*/
public TaskQueueRedisImpl(String name) {
this.name = name;
}
/* (non-Javadoc)
* @see com.gwssi.common.mq.TaskQueue#getName()
*/
public String getName() {
return this.name;
}
/* (non-Javadoc)
* @see com.gwssi.common.mq.TaskQueue#pushTask(String)
*/
public void pushTask(String task) {
Jedis jedis = null;
try{
jedis = RedisManager.getResource(REDIS_DB_IDX);
jedis.lpush(this.name, task);
}catch(Throwable e){
logger.error(e.getMessage(),e);
}finally{
if(jedis!=null){
RedisManager.returnResource(jedis);
}
}
} /* (non-Javadoc)
* @see com.gwssi.common.mq.TaskQueue#popTask()
*/
public String popTask() {
Jedis jedis = null;
String task = null;
try{
jedis = RedisManager.getResource(REDIS_DB_IDX);
task = jedis.rpop(this.name);
}catch(Throwable e){
logger.error(e.getMessage(),e);
}finally{
if(jedis!=null){
RedisManager.returnResource(jedis);
}
}
return task;
}
}
4、獲取隊列實例的工具類
/**
* <pre>
* // 獲得隊列
* TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
*
* // 添加任務(wù)到隊列
* String task = "task id";
* tq.pushTask(task);
*
* // 從隊列中取出任務(wù)執(zhí)行
* String taskToDo = tq.popTask();
* </pre>
* @author liuhailong
*/
public class TaskQueueManager {
protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);
private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();
/**
* 短信隊列名。
*/
public static final String SMS_QUEUE = "SMS_QUEUE";
/**
* 規(guī)則隊列名。
*/
public static final String RULE_QUEUE = "RULE_QUEUE";
private static void initQueneMap() {
logger.debug("初始化任務(wù)隊列...");
queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
logger.debug("建立隊列:"+RULE_QUEUE);
queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
logger.debug("建立隊列:"+SMS_QUEUE);
}
static {
initQueneMap();
}
public static TaskQueue get(String name){
return getRedisTaskQueue(name);
}
public static TaskQueue getRedisTaskQueue(String name){
return queneMap.get(name);
}
}
和具體的隊列過于緊耦合,但簡單好用。
先跑起來再說。
5、向隊列中添加任務(wù)的代碼
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);1
6、從隊列中取出任務(wù)執(zhí)行的代碼
public class SmsSendTask{
protected final static Logger logger = Logger.getLogger(SmsSendTask.class);
protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
/**
* 入口方法。
*/
public void execute() {
TaskQueue taskQueue = null;
String task = null;
try {
taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
// 非線程安全
Set<Serializable> executedTaskSet = new HashSet<Serializable>();
task = taskQueue.popTask();
while(task!=null){
// 判斷是否把所有任務(wù)都執(zhí)行一遍了,避免死循環(huán)
if(executedTaskSet.contains(task)){
taskQueue.pushTask(task);
break;
}
executeSingleTask(taskQueue,task);
task = taskQueue.popTask();
}
}catch(Throwable e){
logger.error(e.getMessage(),e);
e.printStackTrace();
}
}
/**
* 發(fā)送單條短信。
*
* 取出任務(wù)并執(zhí)行,如果失敗,放回任務(wù)列表。
*
* @param taskQueue
* @param task
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private void executeSingleTask(TaskQueue taskQueue, String task) {
try {
// do the job
String smsId = task;
Map<String,String> sms = smsSendService.getSmsList(smsId);
smsSendService.send(sms);
smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);
String opType = "2";
TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
} catch (Throwable e) {
if(task!=null){
taskQueue.pushTask(task);
smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
if(logger.isDebugEnabled()){
logger.error(String.format("任務(wù)%s執(zhí)行失?。?s,重新放回隊列", task, e.getMessage()));
}
}else {
e.printStackTrace();
}
}
}
}
這部分代碼是固定模式,而且不這樣做存在重大缺陷,會有任務(wù)執(zhí)行失敗,被丟棄,這部分代碼應(yīng)該寫到隊列實現(xiàn)中。
有空再改。