singleflight 使用记录以及源码阅读
阅读原文时间:2023年07月15日阅读:1

1、简介

安装方式:

go get -u golang.org/x/sync/singleflight

singleflight 是Go官方扩展同步包的一个库。通过给每次函数调用分配一个key,相同key的函数并发调用时,在函数执行期间,相同函数的调用,只会被执行一次,返回相同的结果。其本质是对函数调用的结果进行复用

2、使用方法

2.1 使用Do获取函数执行结果

Do方法是同步返回函数执行结果

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "runtime"
    "sync"
    "time"
)

func main()  {
    var sg singleflight.Group
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func(j int) {
            defer wg.Done()
            v, err, shared := sg.Do("testDo", testDo)

            fmt.Printf("i: %v, v:%v, err:%v, shared:%v\n", j, v, err, shared)
        }(i)
    }

    wg.Wait()
}

func testDo() (interface{}, error) {
    // 模拟函数执行需要的时间
    time.Sleep(time.Millisecond)

    return "testDo", nil
}

2.2 使用DoChan获取函数执行结果

DoChan返回一个 channel,函数执行的结果通过 channel 来进行传递。

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "runtime"
    "sync"
    "time"
)

func main()  {
    var sg singleflight.Group
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func(j int) {
            defer wg.Done()
            ch := sg.DoChan("testDoChan", testDoChan)
            select {
            case ret := <- ch:
                fmt.Printf("i: %v, v:%v, err:%v, shared:%v\n", j, ret.Val, ret.Err, ret.Shared)

            }

        }(i)
    }

    wg.Wait()
}

func testDoChan() (interface{}, error) {
    // 模拟函数执行需要的时间
    time.Sleep(time.Millisecond)

    return "testDoChan", nil
}

3、源码解读

3.1 Group

//Group 整个库的核心结构体
type Group struct {
    mu sync.Mutex       // 并发时,保护 m
    m  map[string]*call // 使用 懒加载 方式进行初始化
}

3.2 call

//call m中的value
type call struct {
    wg sync.WaitGroup

    //相同key,fn执行的返回结果
    val interface{}
    err error

    //fn执行期间,相同 key 添加的次数,第一次添加不算
    dups  int
    chans []chan<- Result // DoChan 返回fn执行的结果
}

3.3 Group.Do

//Do 执行函数的地方,key: 给函数自定义的标识
//fn: 需要执行的函数,fn开始运行后,未运行结果前,这个期间对相同key的调用,都会返回第一次执行fn返回的结果
//v:fn执行返回的结果,err:fn执行返回的err
//shared:fn执行结果是否会共享,fn运行期间,是否有相同的key被调用,有则返回true,反之返回false
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {//懒加载
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {//fn执行期间,又有相同的key添加进来执行
        c.dups++ //fn执行期间,有相同的key添加进来
        g.mu.Unlock()
        c.wg.Wait() //等待fn执行结果(fn函数里面,会调用c.wg.Done)

        //-------
        //判断fn执行过程中,是否有 panic 或者 runtime.Goexit()
        //感觉主要是为了 DoChan 函数,DoChan 返回的是channel,防止fn函数执行期间出现问题,导致无法往 chan 里面写入结果。
        //从而导致 外面需要获取 fn 执行结果的协程一直在等待
        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        return c.val, c.err, true
    }

    //---- 以下是key 第一次添加到 m 中时,执行的代码---
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    // 执行 fn 的地方
    g.doCall(c, key, fn) // 没有新开一个协程,和DoChan不同。
    return c.val, c.err, c.dups > 0
}

3.4 Group.DoChan

//DoChan 和Do 十分类似,只不过返回的结果通过 chan 来传递
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {//懒加载
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {//fn执行期间,又有相同的key添加进来执行
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }

    //---- 以下是key 第一次添加到 m 中时,执行的代码---
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn) // 新开启了一个协程,和Do不同

    return ch
}

3.5 Group.doCall

  • 双defer+normalReturn+recovered 判断fn执行是panic还是runtime.Goexit

    //doCall 真正运行fn的地方,需要重点理解
    func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    normalReturn := false //是否正常返回,默认false
    recovered := false //是否recover,默认false

    // use double-defer to distinguish panic from runtime.Goexit,
    //使用双 defer 来区分 panic和runtime.Goexit
    //是需要结合 normalReturn 和 recovere 的值来进行判断,从而区分是panic还是runtime.Goexit
    defer func() {
        // the given function invoked runtime.Goexit
        if !normalReturn && !recovered {
            //既没有正常返回,又没有被 recover,所以是fn执行期间,调用了 runtime.Goexit()
            c.err = errGoexit
        }
    g.mu.Lock()
    defer g.mu.Unlock()
    c.wg.Done()
    // 走到这里,fn函数已经执行过了
    if g.m[key] == c {
        delete(g.m, key) //fn函数执行完毕,好让后续的key可以继续进来执行fn函数
    }
    
    if e, ok := c.err.(*panicError); ok { // recover住的错误
        // In order to prevent the waiting channels from being blocked forever,
        // needs to ensure that this panic cannot be recovered.
        if len(c.chans) &gt; 0 { //通过使用DoChan来执行 fn,发生的错误
            go panic(e) // recover只能够 recover住同一个协程里的panic,不是同一个协程的无法捕获。
            select {} // 保证协程不退出,错误会直接暴露出去
        } else {  //通过使用 Do来执行fn,发生的错误
            panic(e)
        }
    } else if c.err == errGoexit {
        // Already in the process of goexit, no need to call again
        //第一个调用的fn函数的协程已经退出,相同key的函数因为 chan 接收不到数据,会发生死锁()
        //fatal error: all goroutines are asleep - deadlock!
    } else {
        // Normal return
        for _, ch := range c.chans {
            ch &lt;- Result{c.val, c.err, c.dups &gt; 0}
        }
    }
    }() func() { defer func() { if !normalReturn {//fn执行期间,发生了panic if r := recover(); r != nil { c.err = newPanicError(r) // 标识为panic错误,Do函数中判断时,好做区分 e, ok := c.err.(*panicError) } } }()
    c.val, c.err = fn()
    normalReturn = true  //fn执行期间,没有panic
    }() if !normalReturn { recovered = true //fn执行期间,发生了panic,并且被 recover住了,注意:调用runtime.Goexit()时,是无法recover的 }

    }

3.6 Group.Forget

//Forget 使用Do执行fn时,可以手动删除 g.m 中的key
func (g *Group) Forget(key string){
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()
}

4、执行流程

菜鸟一枚,文中难免有错误的地方,如有,恳请大佬指出。

5、参考资料

绝对详尽的singleflight讲解