目前業(yè)務(wù)上需要一個(gè)速度快,數(shù)據(jù)超時(shí)刪除的內(nèi)存隊(duì)列,實(shí)現(xiàn)和使用如下:
package mainimport ( queue "github.com/fwhezfwhez/go-queue" "fmt")func main() { //初始化,init q:= queue.NewEmpty() //壓入,push q.Push(5) q.Push(4) //打印,print q.Print() //出列,pop fmt.Println(q.Pop()) //打印,print q.Print() //長度,len fmt.Println(q.Length()) //并發(fā)安全壓入,currently safe push q.SafePush(6) //并發(fā)安全出列,currently safe pop fmt.Print(q.SafePop()) q.Print() // time queue tq := queue.TimeQueueWithTimeStep(10*time.Second, 50, 1*time.Nanosecond) tq.StartTimeSpying() tq.TPush(5) tq.SafeTPush(6) fmt.Println("init:") tq.Print() time.Sleep(5 * time.Second) fmt.Println("after 5s:") tq.Print() time.Sleep(9 * time.Second) fmt.Println("after 14s") tq.Print()}
// start to spy on queue's time-out data and throw itfunc (q *Queue) StartTimeSpying() { fmt.Println("time supervisor starts") go q.startTimeSpying()}// detail of StartTimeSpying functionfunc (q *Queue) startTimeSpying() error { var err = make(chan string, 0) go func(queue *Queue, er chan string) { fmt.Println("start time spying, data in the queue can stay for " q.ExpireAfter.String()) for { if queue.timeSpy == false { err <- "spying routine stops because: queue's timeSpy is false, make sure the queue is definition by q=TimeQueue(time.Duration,int)" return } select { case <-queue.flag: fmt.Println("time spy executing stops") return default: fmt.Print() } ok,er:=queue.timingRemove() if er!=nil{ err <- er.(errorx.Error).StackTrace() } if ok { time.Sleep(queue.timeStep) } } }(q, err) select { case msg := <-err: fmt.Println("time spy supervisor accidentally stops because: ",msg) return errorx.NewFromString(msg) case <-q.flag: fmt.Println("time spy supervisor stops") return nil }}// remove those time-out datafunc (q *Queue) timingRemove() (bool,error) { if len(q.Data) <1 { return true,nil } head, index, er := q.THead() if er != nil { return false, errorx.Wrap(er) } if index < 0 { return false, errorx.NewFromString("queue'length goes 0") } now := time.Now().Unix() created := time.Unix(head.CreatedAt, 0) //fmt.Println("now:",now) //fmt.Println("expire:",created.Add(q.ExpireAfter).Unix()) if created.Add(q.ExpireAfter).Unix() < now { // out of time _,_,e := q.TPop() if e!=nil { return false, errorx.Wrap(e) } if len(q.Data) >0 { return q.timingRemove() }else{ return true,nil } } else{ return true ,nil }}
來源:http://www.icode9.com/content-4-27871.html
聯(lián)系客服