九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項超值服

開通VIP
Go:如何優(yōu)雅地實現(xiàn)并發(fā)編排任務(wù)

Go語言中文網(wǎng) 昨天

以下文章來源于吳親強(qiáng)的深夜食堂 ,作者吳親庫里

業(yè)務(wù)場景

在做任務(wù)開發(fā)的時候,你們一定會碰到以下場景:

場景1:調(diào)用第三方接口的時候, 一個需求你需要調(diào)用不同的接口,做數(shù)據(jù)組裝。
場景2:一個應(yīng)用首頁可能依托于很多服務(wù)。那就涉及到在加載頁面時需要同時請求多個服務(wù)的接口。這一步往往是由后端統(tǒng)一調(diào)用組裝數(shù)據(jù)再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。

針對以上兩種場景,假設(shè)在沒有強(qiáng)依賴關(guān)系下,選擇串行調(diào)用,那么總耗時即:

time=s1+s2+....sn

按照當(dāng)代秒入百萬的有為青年,這么長時間早就把你祖宗十八代問候了一遍。

為了偉大的KPI,我們往往會選擇并發(fā)地調(diào)用這些依賴接口。那么總耗時就是:

time=max(s1,s2,s3.....,sn)

當(dāng)然開始堆業(yè)務(wù)的時候可以先串行化,等到上面的人著急的時候,亮出絕招。

這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業(yè)務(wù)某個接口提高百分之XXX性能,間接產(chǎn)生XXX價值。

當(dāng)然這一切的前提是,做老板不懂技術(shù),做技術(shù)”懂”你。

言歸正傳,如果修改成并發(fā)調(diào)用,你可能會這么寫,

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)

    var userInfo *User
    var productList []Product

    go func() {
        defer wg.Done()
        userInfo, _ = getUser()
    }()

    go func() {
        defer wg.Done()
        productList, _ = getProductList()
    }()
    wg.Wait()
    fmt.Printf("用戶信息:%+v\n", userInfo)
    fmt.Printf("商品信息:%+v\n", productList)
}


/********用戶服務(wù)**********/

type User struct {
    Name string
    Age uint8
}

func getUser() (*User, error) {
    time.Sleep(500 * time.Millisecond)
    var u User
    u.Name = "wuqinqiang"
    u.Age = 18
    return &u, nil
}

/********商品服務(wù)**********/

type Product struct {
    Title string
    Price uint32
}

func getProductList() ([]Product, error) {
    time.Sleep(400 * time.Millisecond)
    var list []Product
    list = append(list, Product{
        Title: "SHib",
        Price: 10,
    })
    return list, nil
}

從實現(xiàn)上來說,需要多少服務(wù),會開多少個 G利用 sync.WaitGroup 的特性,

實現(xiàn)并發(fā)編排任務(wù)的效果。

好像,問題不大。

但是隨著代號 996 業(yè)務(wù)場景的增加,你會發(fā)現(xiàn),好多模塊都有相似的功能,只是對應(yīng)的業(yè)務(wù)場景不同而已。

那么我們能不能抽像出一套針對此業(yè)務(wù)場景的工具,而把具體業(yè)務(wù)實現(xiàn)交給業(yè)務(wù)方。


使用

本著不重復(fù)造輪子的原則,去搜了下開源項目,最終看上了 go-zero 里面的一個工具 mapreduce。

可以自行 Google 這個名詞

使用很簡單。我們通過它改造一下上面的代碼:

package main

import (
    "fmt"
    "github.com/tal-tech/go-zero/core/mr"
    "time"
)

func main() {
    var userInfo *User
    var productList []Product
    _ = mr.Finish(func() (err error) {
        userInfo, err = getUser()
        return err
    }, func() (err error) {
        productList, err = getProductList()
        return err
    })
    fmt.Printf("用戶信息:%+v\n", userInfo)
    fmt.Printf("商品信息:%+v\n", productList)
}
//打印
用戶信息:&{Name:wuqinqiang Age:18}
商品信息:[{Title:SHib Price:10}]

是不是舒服多了。

但是這里還需要注意一點(diǎn),假設(shè)你調(diào)用的其中一個服務(wù)錯誤,并且你 return err 對應(yīng)的錯誤,那么其他調(diào)用的服務(wù)會被取消。

比如我們修改 getProductList 直接響應(yīng)錯誤

func getProductList() ([]Product, error) {
    return nil, errors.New("test error")
}
//打印
// 用戶信息:<nil>
// 商品信息:[]

那么最終打印的時候連用戶信息都會為空,因為出現(xiàn)一個服務(wù)錯誤,用戶服務(wù)請求被取消了。

一般情況下,在請求服務(wù)錯誤的時候我們會有保底操作,一個服務(wù)錯誤不能影響其他請求的結(jié)果。
所以在使用的時候具體處理取決于業(yè)務(wù)場景
。


源碼

既然用了,那么就追下源碼吧。

func Finish(fns ...func() errorerror {
    if len(fns) == 0 {
        return nil
    }

    return MapReduceVoid(func(source chan<- interface{}) {
        for _, fn := range fns {
            source <- fn
        }
    }, func(item interface{}, writer Writer, cancel func(error)) {
        fn := item.(func() error)
        if err := fn()err != nil
 {
            cancel(err)
        }
    }, func(pipe <-chan interface{}, cancel func(error)) {
        drain(pipe)
    }, WithWorkers(len(fns)))
}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
    _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
        reducer(input, cancel)
        drain(input)
        // We need to write a placeholder to let MapReduce to continue on reducer done,
        // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
        writer.Write(lang.Placeholder)
    }, opts...)
    return err
}

對于 MapReduceVoid函數(shù),主要查看三個閉包參數(shù)。

  • 第一個 GenerateFunc 用于生產(chǎn)數(shù)據(jù)。

  • MapperFunc 讀取生產(chǎn)出的數(shù)據(jù),進(jìn)行處理。

  • VoidReducerFunc 這里表示不對 mapper 后的數(shù)據(jù)做聚合返回。所以這個閉包在此操作幾乎0作用。

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
    source := buildSource(generate) 
    return MapReduceWithSource(source, mapper, reducer, opts...)
}

func buildSource(generate GenerateFunc) chan interface{} {
    source := make(chan interface{})// 創(chuàng)建無緩沖通道
    threading.GoSafe(func() {
        defer close(source)
        generate(source) //開始生產(chǎn)數(shù)據(jù)
    })

    return source //返回?zé)o緩沖通道
}

buildSource函數(shù)中,返回一個無緩沖的通道。并開啟一個 G 運(yùn)行 generate(source)往無緩沖通道塞數(shù)據(jù)。這個generate(source) 不就是一開始 Finish 傳遞的第一個閉包參數(shù)。

return MapReduceVoid(func(source chan<- interface{}) {
    // 就這個
        for _, fn := range fns {
            source <- fn
        }
    })

然后查看 MapReduceWithSource 函數(shù),

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
    opts ...Option)
 (interface{}, error)
 {
    options := buildOptions(opts...)
    //任務(wù)執(zhí)行結(jié)束通知信號
    output := make(chan interface{})
    //將mapper處理完的數(shù)據(jù)寫入collector
    collector := make(chan interface{}, options.workers)
    // 取消操作信號
    done := syncx.NewDoneChan()
    writer := newGuardedWriter(output, done.Done())
    var closeOnce sync.Once
    var retErr errorx.AtomicError
    finish := func() {
        closeOnce.Do(func() {
            done.Close()
            close(output)
        })
    }
    cancel := once(func(err error) {
        if err != nil {
            retErr.Set(err)
        } else {
            retErr.Set(ErrCancelWithNil)
        }

        drain(source)
        finish()
    })

    go func() {
        defer func() {
            if r := recover(); r != nil {
                cancel(fmt.Errorf("%v", r))
            } else {
                finish()
            }
        }()
        reducer(collector, writer, cancel)
        drain(collector)
    }()
    // 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper
    go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)

    value, ok := <-output
    if err := retErr.Load(); err != nil {
        return nil, err
    } else if ok {
        return value, nil
    } else {
        return nil, ErrReduceNoOutput
    }
}

這段代碼挺長的,我們說下核心的點(diǎn)。這里使用一個G 調(diào)用 executeMappers 方法。

go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
    done <-chan lang.PlaceholderType, workers int)
 {
    var wg sync.WaitGroup
    defer func() {
        // 等待所有任務(wù)全部執(zhí)行完畢
        wg.Wait()
        // 關(guān)閉通道
        close(collector)
    }()
   //根據(jù)指定數(shù)量創(chuàng)建 worker池
    pool := make(chan lang.PlaceholderType, workers) 
    writer := newGuardedWriter(collector, done)
    for {
        select {
        case <-done:
            return
        case pool <- lang.Placeholder:
            // 從buildSource() 返回的無緩沖通道取數(shù)據(jù)
            item, ok := <-input 
            // 當(dāng)通道關(guān)閉,結(jié)束
            if !ok {
                <-pool
                return
            }

            wg.Add(1)
            // better to safely run caller defined method
            threading.GoSafe(func() {
                defer func() {
                    wg.Done()
                    <-pool
                }()
                //真正運(yùn)行閉包函數(shù)的地方
               // func(item interface{}, w Writer) {
               // mapper(item, w, cancel)
               // }
                mapper(item, writer)
            })
        }
    }
}

具體的邏輯已備注,代碼很容易懂。

一旦 executeMappers 函數(shù)返回,關(guān)閉 collector 通道,那么執(zhí)行 reducer 不再阻塞。

go func() {
        defer func() {
            if r := recover(); r != nil {
                cancel(fmt.Errorf("%v", r))
            } else {
                finish()
            }
        }()
        reducer(collector, writer, cancel)
        //這里
        drain(collector)
    }()

這里的 reducer(collector, writer, cancel) 其實就是從 MapReduceVoid 傳遞的第三個閉包函數(shù)。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
    _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
        reducer(input, cancel)
        //這里
        drain(input)
        // We need to write a placeholder to let MapReduce to continue on reducer done,
        // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
        writer.Write(lang.Placeholder)
    }, opts...)
    return err
}

然后這個閉包函數(shù)又執(zhí)行了 reducer(input, cancel)這里的 reducer 就是我們一開始解釋過的 VoidReducerFunc, Finish() 而來。

等等,看到上面三個地方的 drain(input)了嗎?

// drain drains the channel.
func drain(channel <-chan interface{}) {
    // drain the channel
    for range channel {
    }
}

其實就是一個排空 channel 的操作,但是三個地方都對同一個 channel做同樣的操作,也是讓我費(fèi)解。

還有更重要的一點(diǎn)。

go func() {
        defer func() {
            if r := recover(); r != nil {
                cancel(fmt.Errorf("%v", r))
            } else {
                finish()
            }
        }()
        reducer(collector, writer, cancel)
        drain(collector)
    }()

上面的代碼,假如執(zhí)行 reducer,writer 寫入引發(fā) panic,那么drain(collector) 將沒有機(jī)會執(zhí)行。

不過作者已經(jīng)修復(fù)了這個問題,直接把 drain(collector) 放入到 defer。

具體 issues[1]。

到這里,關(guān)于 Finish 的源碼也就結(jié)束了。感興趣的可以看看其他源碼。

很喜歡 go-zero 里的一些工具,但是工具往往并不獨(dú)立,依賴于其他文件包,導(dǎo)致明明只想使用其中一個工具卻需要安裝整個包
所以最終的結(jié)果就是扒源碼,創(chuàng)建無依賴庫工具集,遵循 
MIT 即可。

附錄

[1]https://github.com/tal-tech/go-zero/issues/676

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
手把手教姐姐寫消息隊列
Go并發(fā)模式:管道和取消
NSQ源碼剖析之nsqd
Go語言實戰(zhàn)筆記(二十)| Go Context | 飛雪無情的博客
使用Go語言在樹莓派上編程
Go并發(fā)處理
更多類似文章 >>
生活服務(wù)
熱點(diǎn)新聞
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服