Go語言中文網(wǎng) 昨天
以下文章來源于吳親強(qiáng)的深夜食堂 ,作者吳親庫里
業(yè)務(wù)場景
在做任務(wù)開發(fā)的時候,你們一定會碰到以下場景:
針對以上兩種場景,假設(shè)在沒有強(qiáng)依賴關(guān)系下,選擇串行調(diào)用,那么總耗時即:
time=s1+s2+....sn
按照當(dāng)代秒入百萬的有為青年,這么長時間早就把你祖宗十八代問候了一遍。
為了偉大的KPI,我們往往會選擇并發(fā)地調(diào)用這些依賴接口。那么總耗時就是:
time=max(s1,s2,s3.....,sn)
當(dāng)然開始堆業(yè)務(wù)的時候可以先串行化,等到上面的人著急的時候,亮出絕招。
這樣,年底 PPT
就可以加上濃重的一筆流水賬:為業(yè)務(wù)某個接口提高百分之XXX性能,間接產(chǎn)生XXX價值。
當(dāng)然這一切的前提是,做老板不懂技術(shù),做技術(shù)”懂”你。
言歸正傳,如果修改成并發(fā)調(diào)用,你可能會這么寫,
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var userInfo *User
var productList []Product
go func() {
defer wg.Done()
userInfo, _ = getUser()
}()
go func() {
defer wg.Done()
productList, _ = getProductList()
}()
wg.Wait()
fmt.Printf("用戶信息:%+v\n", userInfo)
fmt.Printf("商品信息:%+v\n", productList)
}
/********用戶服務(wù)**********/
type User struct {
Name string
Age uint8
}
func getUser() (*User, error) {
time.Sleep(500 * time.Millisecond)
var u User
u.Name = "wuqinqiang"
u.Age = 18
return &u, nil
}
/********商品服務(wù)**********/
type Product struct {
Title string
Price uint32
}
func getProductList() ([]Product, error) {
time.Sleep(400 * time.Millisecond)
var list []Product
list = append(list, Product{
Title: "SHib",
Price: 10,
})
return list, nil
}
從實現(xiàn)上來說,需要多少服務(wù),會開多少個 G
,利用 sync.WaitGroup
的特性,
實現(xiàn)并發(fā)編排任務(wù)的效果。
好像,問題不大。
但是隨著代號 996
業(yè)務(wù)場景的增加,你會發(fā)現(xiàn),好多模塊都有相似的功能,只是對應(yīng)的業(yè)務(wù)場景不同而已。
那么我們能不能抽像出一套針對此業(yè)務(wù)場景的工具,而把具體業(yè)務(wù)實現(xiàn)交給業(yè)務(wù)方。
本著不重復(fù)造輪子的原則,去搜了下開源項目,最終看上了 go-zero
里面的一個工具 mapreduce
。
可以自行 Google
這個名詞。
使用很簡單。我們通過它改造一下上面的代碼:
package main
import (
"fmt"
"github.com/tal-tech/go-zero/core/mr"
"time"
)
func main() {
var userInfo *User
var productList []Product
_ = mr.Finish(func() (err error) {
userInfo, err = getUser()
return err
}, func() (err error) {
productList, err = getProductList()
return err
})
fmt.Printf("用戶信息:%+v\n", userInfo)
fmt.Printf("商品信息:%+v\n", productList)}
//打印
用戶信息:&{Name:wuqinqiang Age:18}
商品信息:[{Title:SHib Price:10}]
是不是舒服多了。
但是這里還需要注意一點(diǎn),假設(shè)你調(diào)用的其中一個服務(wù)錯誤,并且你 return err
對應(yīng)的錯誤,那么其他調(diào)用的服務(wù)會被取消。
比如我們修改 getProductList 直接響應(yīng)錯誤。
func getProductList() ([]Product, error) {
return nil, errors.New("test error")
}
//打印
// 用戶信息:<nil>
// 商品信息:[]
那么最終打印的時候連用戶信息都會為空,因為出現(xiàn)一個服務(wù)錯誤,用戶服務(wù)請求被取消了。
一般情況下,在請求服務(wù)錯誤的時候我們會有保底操作,一個服務(wù)錯誤不能影響其他請求的結(jié)果。
所以在使用的時候具體處理取決于業(yè)務(wù)場景。
既然用了,那么就追下源碼吧。
func Finish(fns ...func() error) error {
if len(fns) == 0 {
return nil
}
return MapReduceVoid(func(source chan<- interface{}) {
for _, fn := range fns {
source <- fn
}
}, func(item interface{}, writer Writer, cancel func(error)) {
fn := item.(func() error)
if err := fn(); err != nil {
cancel(err)
}
}, func(pipe <-chan interface{}, cancel func(error)) {
drain(pipe)
}, WithWorkers(len(fns)))
}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
drain(input)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
對于 MapReduceVoid
函數(shù),主要查看三個閉包參數(shù)。
第一個 GenerateFunc
用于生產(chǎn)數(shù)據(jù)。
MapperFunc
讀取生產(chǎn)出的數(shù)據(jù),進(jìn)行處理。
VoidReducerFunc
這里表示不對 mapper
后的數(shù)據(jù)做聚合返回。所以這個閉包在此操作幾乎0作用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
source := buildSource(generate)
return MapReduceWithSource(source, mapper, reducer, opts...)
}
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})// 創(chuàng)建無緩沖通道
threading.GoSafe(func() {
defer close(source)
generate(source) //開始生產(chǎn)數(shù)據(jù)
})
return source //返回?zé)o緩沖通道
}
buildSource
函數(shù)中,返回一個無緩沖的通道。并開啟一個 G
運(yùn)行 generate(source)
,往無緩沖通道塞數(shù)據(jù)。這個generate(source)
不就是一開始 Finish
傳遞的第一個閉包參數(shù)。
return MapReduceVoid(func(source chan<- interface{}) {
// 就這個
for _, fn := range fns {
source <- fn
}
})
然后查看 MapReduceWithSource
函數(shù),
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
//任務(wù)執(zhí)行結(jié)束通知信號
output := make(chan interface{})
//將mapper處理完的數(shù)據(jù)寫入collector
collector := make(chan interface{}, options.workers)
// 取消操作信號
done := syncx.NewDoneChan()
writer := newGuardedWriter(output, done.Done())
var closeOnce sync.Once
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
done.Close()
close(output)
})
}
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
drain(source)
finish()
})
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
drain(collector)
}()
// 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper
go executeMappers(func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
value, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
}
這段代碼挺長的,我們說下核心的點(diǎn)。這里使用一個G
調(diào)用 executeMappers
方法。
go executeMappers(func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup
defer func() {
// 等待所有任務(wù)全部執(zhí)行完畢
wg.Wait()
// 關(guān)閉通道
close(collector)
}()
//根據(jù)指定數(shù)量創(chuàng)建 worker池
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done)
for {
select {
case <-done:
return
case pool <- lang.Placeholder:
// 從buildSource() 返回的無緩沖通道取數(shù)據(jù)
item, ok := <-input
// 當(dāng)通道關(guān)閉,結(jié)束
if !ok {
<-pool
return
}
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {
defer func() {
wg.Done()
<-pool
}()
//真正運(yùn)行閉包函數(shù)的地方
// func(item interface{}, w Writer) {
// mapper(item, w, cancel)
// }
mapper(item, writer)
})
}
}
}
具體的邏輯已備注,代碼很容易懂。
一旦 executeMappers
函數(shù)返回,關(guān)閉 collector
通道,那么執(zhí)行 reducer
不再阻塞。
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
//這里
drain(collector)
}()
這里的 reducer(collector, writer, cancel)
其實就是從 MapReduceVoid
傳遞的第三個閉包函數(shù)。
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
//這里
drain(input)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
然后這個閉包函數(shù)又執(zhí)行了 reducer(input, cancel)
,這里的 reducer
就是我們一開始解釋過的 VoidReducerFunc
,從 Finish() 而來
。
等等,看到上面三個地方的 drain(input)
了嗎?
// drain drains the channel.
func drain(channel <-chan interface{}) {
// drain the channel
for range channel {
}
}
其實就是一個排空 channel
的操作,但是三個地方都對同一個 channel
做同樣的操作,也是讓我費(fèi)解。
還有更重要的一點(diǎn)。
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
drain(collector)
}()
上面的代碼,假如執(zhí)行 reducer
,writer
寫入引發(fā) panic
,那么drain(collector)
將沒有機(jī)會執(zhí)行。
不過作者已經(jīng)修復(fù)了這個問題,直接把 drain(collector)
放入到 defer
。
具體 issues[1]。
到這里,關(guān)于 Finish
的源碼也就結(jié)束了。感興趣的可以看看其他源碼。
很喜歡 go-zero 里的一些工具,但是工具往往并不獨(dú)立,依賴于其他文件包,導(dǎo)致明明只想使用其中一個工具卻需要安裝整個包。
所以最終的結(jié)果就是扒源碼,創(chuàng)建無依賴庫工具集,遵循 MIT
即可。
[1]https://github.com/tal-tech/go-zero/issues/676
聯(lián)系客服