go ants源码分析
阅读原文时间:2022年05月06日阅读:1

golang ants 源码分析

结构图

poolwithfunc与pool相差不大,这里我们只分析ants默认pool的流程

文件

作用

ants.go

定义常量、errors显示、默认建一个大小为2147483647的goroutine池、封装一些方便用户操作查看goroutine池的函数

options.go

goroutine池的相关配置

pool.go

普通pool(不绑定特定函数)的创建以及对pool相关的操作

pool_func.go

创建绑定某个特定函数的pool以及对pool相关的操作

worker.go

goworker的struct(其他语言中的类)、run(其他语言中的方法)

worker_array.go

一个worker_array的接口和一个能返回实现该接口的函数

worker_func.go

worker_loop_queue.go

worker_stack.go

workerStack(struct)实现worker_array中的所有接口

spinlock.go

锁相关

关键结构

type Pool struct

type Pool struct {
    capacity int32       // 容量
    running  int32       // 正在运行的数量
    lock     sync.Locker //定义一个锁 用以支持 Pool 的同步操作
    workers  workerArray // workers 一个接口 存放可循环利用的Work(goroutine)的相关信息
    //  type workerArray interface {
    //  len() int
    //  isEmpty() bool
    //  insert(worker *goWorker) error
    //  detach() *goWorker
    //  retrieveExpiry(duration time.Duration) []*goWorker
    //  reset()
    //  }
    state         int32         //记录池子的状态(关闭,开启)
    cond          *sync.Cond    // 条件变量
    workerCache   sync.Pool     // golang原始池子 使用sync.Pool对象池管理和创建worker对象,提升性能
    blockingNum   int           // 阻塞等待的任务数量;
    stopHeartbeat chan struct{} //一个空结构体的通道,仅用于接收标志
    options       *Options      // 用于配置pool的options指针
}
  • func (p *Pool) purgePeriodically() //定期清理过期worker任务
  • func (p *Pool) Submit(task func()) error //提交func任务与worker绑定进行运行
  • func (p *Pool) Running() int //有多少个运行的worker
  • func (p *Pool) Free() int //返回空闲的worker数量
  • func (p *Pool) Cap() int // 返回pool的容量
  • ……
  • func (p *Pool) retrieveWorker() (w *goWorker) //返回一个worker

workerArray

type workerArray interface {
   len() int                                          // worker的数量
   isEmpty() bool                                     // worker是否为0
   insert(worker *goWorker) error                     //将执行完的worker(goroutine)放回
   detach() *goWorker                                 // 获取worker
   retrieveExpiry(duration time.Duration) []*goWorker //取出所有的过期 worker;
   reset()                                            // 重置
}

workerStack

type workerStack struct {
   items  []*goWorker //空闲的worker
   expiry []*goWorker //过期的worker
   size   int
}

下面是对接口workerArray的实现

func (wq *workerStack) len() int {
   return len(wq.items)
}

func (wq *workerStack) isEmpty() bool {
   return len(wq.items) == 0
}

func (wq *workerStack) insert(worker *goWorker) error {
   wq.items = append(wq.items, worker)
   return nil
}

//返回items中最后一个worker
func (wq *workerStack) detach() *goWorker {
   l := wq.len()
   if l == 0 {
      return nil
   }
   w := wq.items[l-1]
   wq.items[l-1] = nil // avoid memory leaks
   wq.items = wq.items[:l-1]

   return w
}

func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
   n := wq.len()
   if n == 0 {
      return nil
   }

   expiryTime := time.Now().Add(-duration) //过期时间=现在的时间-1s
   index := wq.binarySearch(0, n-1, expiryTime)

   wq.expiry = wq.expiry[:0]
   if index != -1 {
      wq.expiry = append(wq.expiry, wq.items[:index+1]...) //因为以后进先出的模式去worker 所有过期的woker这样wq.items[:index+1]取
      m := copy(wq.items, wq.items[index+1:])
      for i := m; i < n; i++ { //m是存活的数量 下标为m之后的元素全部置为nil
         wq.items[i] = nil
      }
      wq.items = wq.items[:m] //抹除后面多余的内容
   }
   return wq.expiry
}

// 二分法查询过期的worker
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
   var mid int
   for l <= r {
      mid = (l + r) / 2
      if expiryTime.Before(wq.items[mid].recycleTime) {
         r = mid - 1
      } else {
         l = mid + 1
      }
   }
   return r
}

func (wq *workerStack) reset() {
   for i := 0; i < wq.len(); i++ {
      wq.items[i].task <- nil //worker的任务置为nil
      wq.items[i] = nil       //worker置为nil
   }
   wq.items = wq.items[:0] //items置0
}

流程分析

创建pool

func NewPool(size int, options ...Option) (*Pool, error) {
    opts := loadOptions(options...) // 导入配置
    根据不同项进行配置此处省略

    p := &Pool{
        capacity:      int32(size),
        lock:          internal.NewSpinLock(),
        stopHeartbeat: make(chan struct{}, 1), //开一个通道用于接收一个停止标志
        options:       opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }

    p.workers = newWorkerArray(stackType, 0)

    p.cond = sync.NewCond(p.lock)
    go p.purgePeriodically()
    return p, nil
}

提交任务(将worker于func绑定)

func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

    w = p.workers.detach() // 获取列表中最后一个worker
    if w != nil {          // 取出来的话直接解锁
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { //没取到但是容量为无限大或者容量未满
        p.lock.Unlock()
        spawnWorker() //开一个新的worker
    } else { // 没取到 而且容量已经满了
        if p.options.Nonblocking {  //默认为False
            p.lock.Unlock()
            return
        }
    retry:
        xxxx
            goto retry
        xxxx    

        p.lock.Unlock()
    }
    return
}

goworker的运行

func (w *goWorker) run() {
    w.pool.incRunning()  //增加正在运行的worker数量
    go func() {
        defer func() {
            w.pool.decRunning()
            w.pool.workerCache.Put(w)
            if p := recover(); p != nil {
                if ph := w.pool.options.PanicHandler; ph != nil {
                    ph(p)
                } else {
                    w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
                    var buf [4096]byte
                    n := runtime.Stack(buf[:], false)
                    w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
                }
            }
            // Call Signal() here in case there are goroutines waiting for available workers.
            w.pool.cond.Signal()
        }()

        for f := range w.task {  //阻塞接受task
            if f == nil {
                return
            }
            f()  //执行函数
            if ok := w.pool.revertWorker(w); !ok { // 将goworker放回items中
                return
            }
        }
    }()
}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章