Sync包
阅读原文时间:2023年07月10日阅读:1

Mutex互斥锁:

能够保证在同一时间段内仅有一个goroutine持有锁,有且仅有一个goroutine访问共享资源,其他申请锁的goroutine将会被阻塞直到锁被释放。然后重新争抢锁的持有权。

结构体和方法:

type Locker interface {
    Lock()
    UnLocker
}
func (m *Mutex) Lock()
func (m *Mutex) UnLock()


package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    //互斥锁
    var lock sync.Mutex
    go func() {
        //加锁
        lock.Lock()
        //释放锁
        defer lock.Unlock()
        fmt.Println("func1 get lock at " + time.Now().String())
        time.Sleep(time.Second)
        fmt.Println("func1 release lock " + time.Now().String())
    }()

    time.Sleep(time.Second / 10)

    go func() {
        lock.Lock()
        defer lock.Unlock()
        fmt.Println("func2 get lock at " + time.Now().String())
        time.Sleep(time.Second)
        fmt.Println("func1 release lock " + time.Now().String())
    }()

    //等待所有goroutine执行完毕
    time.Sleep(time.Second * 4)
}

输出结果:

RWMutex读写锁:

将读锁和写锁分离开来满足以下条件

  • 在同一时间段只能有一个gorountine获取到写锁
  • 在同一时间段可以有任意多个gorountine获取到读锁
  • 在同一时间段只能存在读锁和写锁

结构体和方法:

type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}
func (rw *RWMutex) Lock()        //写加锁
func (rw *RWMutex) UnLock()        //写解锁
func (rw *RWMutex) RLock()        //读加锁
func (rw *RWMutex) RUnLock()    //读解锁


package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

var rwLock sync.RWMutex

func main() {
    //获取读锁
    for i := 0; i < 5; i++ {
        go func(i int) {
            rwLock.RLocker()
            defer rwLock.RLocker()
            fmt.Println("read func " + strconv.Itoa(i) + " get lock at " + time.Now().String())
            time.Sleep(time.Second)
        }(i)
    }

    time.Sleep(time.Second / 10)

    //获取写锁
    for i := 0; i < 5; i++ {
        go func(i int) {
            rwLock.Lock()
            defer rwLock.Unlock()
            fmt.Println("write func " + strconv.Itoa(i) + " get lock at " + time.Now().String())
            time.Sleep(time.Second)
        }(i)
    }
    //保证所有的goroutine执行结束
    time.Sleep(time.Second * 4)
}

输出结果:

WaitGroup并发等待数组:

sync.WaitGroup的goroutine会等待预设好的数量的goroutine都提交执行结束后,才会继续往下执行代码,调用Wait方法之前,必须先执行Add方法,还需要保证Done方法和Add添加的等待数量一致,过少会导致等待goroutine死锁,过多会导致程序panic,适用于执行批量操作,等待所有goroutine执行结束后统一返回结果。

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

func main() {
    var waitGroup sync.WaitGroup
    //添加等待goroutine数量为5
    waitGroup.Add(5)

    for i := 0; i < 5; i++ {
        go func(i int) {
            fmt.Println("work " + strconv.Itoa(i) + " is done at " + time.Now().String())
            //等待1s后减少等待数1
            time.Sleep(time.Second)
            waitGroup.Done()
        }(i)
    }
    waitGroup.Wait()
    fmt.Println("all works are done at " + time.Now().String())
}

输出结果:

Map并发安全字典:

go中的原生map并不是并发安全的,Go语言1.9之后有sync.Map

package main

import (
    "fmt"
    "strconv"
    "sync"
)

var syncMap sync.Map
var waitGroup sync.WaitGroup

func main() {
    routineSize := 5
    //让主线程等待数据添加完毕
    waitGroup.Add(routineSize)
    //并发添加数据
    for i := 0; i < routineSize; i++ {
        go addNumber(i * 10)
    }
    waitGroup.Wait()
    var size int
    //统计数量
    syncMap.Range(func(key, value interface{}) bool {
        size++
        // fmt.Println("key-value pair is ", key, value, " ")
        return true
    })
    fmt.Println("syncMap current size is " + strconv.Itoa(size))
    //获取键为0的值
    value, ok := syncMap.Load(0)
    if ok {
        fmt.Println("key 0 has value", value, " ")
    }
}

func addNumber(begin int) {
    //往syncMap中放入数据
    for i := begin; i < begin+3; i++ {
        syncMap.Store(i, 1)
    }
    //通知数据已添加完毕
    waitGroup.Done()
}

输出结果:

Once只执行一次

提供了初始化延迟功能,done用来记录执行的次数,用m来保证只有一个goroutine在执行Do方法

package main

import (
    "fmt"
    "sync"
)

var once sync.Once
var waitGroup sync.WaitGroup

func main() {
    for i := 0; i < 10; i++ {
        waitGroup.Add(1)
        go func() {
            defer waitGroup.Done()
            once.Do(OnlyOnce)
        }()
    }
    waitGroup.Wait()
}

func OnlyOnce() {
    fmt.Println("only once")
}

输出结果:

Cond同步等待条件:

通过弄个条件控制多个goroutine,不满足条件进行等待,进入等待后即使后续满足条件需要通过Broadcast()或者Signal()来唤醒notifyList内的goroutine

结构体和方法:

type Cond struct {
    noCopy noCopy
    //L用来读写Cond时加锁
    L Locker
    //以下是包外不可见变量
    notify notifyList    //通知列表
    checker copyChecker
}
func NewCond(l Locker) *Cond
//BroadCast用于向所有等待的goroutine发送通知,通知条件已经满足
func (c *Cond) BroadCast()
//Singnal方法用于向特定的单个goroutine发送通知
func (c *Cond) Singnal()
func (c *Cond) Wait()


package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    ready     = false
    singerNum = 3
)

func Sing(singerId int, c *sync.Cond) {
    fmt.Printf("Singer (%d) is ready\n", singerId)
    c.L.Lock()
    for !ready {
        fmt.Printf("Singer (%d) is waiting\n", singerId)
        c.Wait()
    }
    fmt.Printf("Singer (%d) sing a song\n", singerId)
    ready = false
    c.L.Unlock()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})
    for i := 0; i < singerNum; i++ {
        go Sing(i, cond)
    }
    time.Sleep(3 * time.Second)

    for i := 0; i < singerNum; i++ {
        ready = true
        cond.Broadcast()
        // cond.Signal()
        time.Sleep(3 * time.Second)
    }
}

Broadcast方法测试:

Signal方法测试:

Pool对象池:

并发安全的,大小可伸缩,仅受限于内存。存入Pool的对象可能会在不通知的情况下被释放,比如一些socket长连接就不适合放入Pool内

结构体和方法:

type Pool struct {
    noCopy noCopy
    local unsafe.Pointer    //本地缓冲池指针,每个处理器分配一个,其类型是一个{p}poolLocal的数组
    lcoalSize uintptr        //数组大小

    New func() interface {}    //缓存池中没有对象时,调用此方法创建一个
}

//从池中获取对象,如果没有对象调用New创建一个,未设置New返回nil
func (p *Pool) Get() interface{}
//向池中添加对象
func (p *Pool) Put(interface{})

Pool在运行时为每个操作Pool的goroutine所关联的P(GMP模型中的P)都创建一个本地池。在执行Get方法的时候,会先从本地池中获取,如果本地池没有则从其他P的本地池获取。这种特性让Pool的存储压力基于P进行了分摊。

package main

import (
    "fmt"
    "sync"
    "time"
)

var byteSlicePool = sync.Pool{
    New: func() interface{} {
        b := make([]byte, 1024)
        return &b
    },
}

func main() {
    t1 := time.Now().Unix()
    //不使用Pool
    for i := 0; i < 10000000000; i++ {
        bytes := make([]byte, 1024)
        _ = bytes
    }
    t2 := time.Now().Unix()
    //使用Pool
    for i := 0; i < 10000000000; i++ {
        bytes := byteSlicePool.Get().(*[]byte)
        _ = bytes
        byteSlicePool.Put(bytes)
    }
    t3 := time.Now().Unix()
    fmt.Printf("不使用Pool:%d s\n", t2-t1)
    fmt.Printf("使用Pool:%d s\n", t3-t2)
}

输出结果: