并发情况下,多个线程或协程会同时操作同一个资源,例如变量、数据结构、文件等。如果不保证并发安全,就可能导致数据竞争、脏读、脏写、死锁、活锁、饥饿等一系列并发问题,产生重大的安全隐患,比如12306抢到同一张火车票、多个用户抢到只剩一件库存的商品。而并发安全就是为了避免这些问题。Golang 中有一些原则和工具来保证并发安全,例如:
关于更为详细的并发安全性:可以参考:理解Golang 赋值的并发安全性。
所有资源竞争就是多个 goroutine 访问某个共享的资源,我们来看一个资源竞争的例子:
var wg sync.WaitGroup
func add(count *int) {
defer wg.Done()
for i := 0; i < 10000; i++ {
*count = *count + 1
}
}
func main() {
count := 0
wg.Add(3)
for i := 0; i < 3; i++ {
go add(&count)
}
wg.Wait()
fmt.Println(count)
}
该程序的每一次执行结果都不同, 就是因为协程之间出现了资源竞争,在读取更新 count 这个过程中,被其他协程横插了一脚,改变了 count 的值,没有保证原子性。下面我们通过互斥锁来锁住在读取更新过程的 count 的值,来使 count 的值打印正确。
sync 包提供了通过 sync.Mutex
和 sync.RWMutex
来实现互斥锁和读写互斥锁。
sync 互斥锁(sync.Mutex)是一种最简单的锁类型,当一个 goroutine 获得了资源后,其他 goroutine 就只能等待这个 goroutine 释放该资源。互斥锁可以保证对共享资源的原子访问,避免并发冲突。
sync 读写互斥锁(sync.RWMutex)是一种更复杂的锁类型,它允许多个 goroutine 同时获取读锁,但只允许一个 goroutine 获取写锁。读写互斥锁适用于读多写少的场景下,它比互斥锁更高效。
sync.Mutex 使用 Lock() 加锁,Unlock() 解锁,如果对未解锁的 Mutex 使用 Lock() 会阻塞当前程序运行,我们来看加入了互斥锁后的程序:
var wg sync.WaitGroup
var l sync.Mutex
func add(count *int) {
defer wg.Done()
l.Lock() // 锁住 count 资源,阻塞程序运行,直到 Unlock
for i := 0; i < 10000; i++ {
*count = *count + 1
}
l.Unlock()
}
func main() {
count := 0
wg.Add(3)
for i := 0; i < 3; i++ {
go add(&count)
}
wg.Wait()
fmt.Println(count)
}
RWMutex 是单写多读锁,该锁可以加多个读锁或者一个写锁。
读锁占用的情况下会阻止写,不会阻止读,多个 goroutine 可以同时获取资源,使用 RLock
和 RUnlock
加锁解锁。
写锁会阻止其他 goroutine 进来,读写不论,整个锁住的资源由该 goroutine 独占,使用 Lock
和 Unlock
加锁解锁。
应该只在频繁读取,少量写入的情况下使用读写互斥锁
var m sync.RWMutex
var i = 0
func main() {
go write()
go write()
go read()
go read()
go read()
time.Sleep(2 * time.Second)
}
func read() {
fmt.Println(i, "我准备获取读锁了")
m.RLock()
fmt.Println(i, "我要开始读数据了,所有写数据的都需要等待1s")
time.Sleep(1 * time.Second)
m.RUnlock()
fmt.Println(i, "我已经释放了读锁,可以继续写数据了")
}
func write() {
fmt.Println(i, "我准备获取写锁了")
m.Lock()
fmt.Println(i, "我要开始写数据了,所有人都需要等待1s")
time.Sleep(1 * time.Second)
i++
m.Unlock()
fmt.Println(i, "我已经释放了写锁,你们可以继续了")
}
// 结果
0 我准备获取读锁了
0 我要开始读数据了,所有写数据的都需要等待1s
0 我准备获取读锁了
0 我要开始读数据了,所有写数据的都需要等待1s
0 我准备获取读锁了
0 我要开始读数据了,所有写数据的都需要等待1s
0 我准备获取写锁了
0 我准备获取写锁了
0 我已经释放了读锁,可以继续写数据了
0 我已经释放了读锁,可以继续写数据了
0 我已经释放了读锁,可以继续写数据了
0 我要开始写数据了,所有人都需要等待1s
1 我已经释放了写锁,你们可以继续了
读写互斥锁有点难以理解,但是只要记住读写互斥永远是互斥的,就理解了大半。为了应对读锁长久占用,导致写锁迟迟不能更新数据,导致并发饥饿问题,所以在 Golang 的读写互斥锁中,写锁比读锁优先级更高。
sync.once 是一个极为强大的功能,它可以确保一个函数只能被执行一次。通常做来在并发执行前初始化一次的共享资源。
func main() {
once := &sync.Once{}
for i := 0; i < 10; i++ {
go func(i int) {
once.Do(func() {
fmt.Printf("i的值 %d\n", i)
})
}(i)
}
time.Sleep(1 * time.Second)
}
这段代码始终只会打印一次 i 的值。
为了实现变量值的并发情况下安全赋值,除了互斥锁外,Golang 还提供了 atomic 包,他能保证在变量在读写时不受其他 goroutine 影响。atomic 是通过 CPU 指令在硬件层面上实现的,比互斥锁性能更好。当然,互斥锁一般来说是对代码块的并发控制,atomic 是对某个变量的并发控制,二者侧重点不同。另外,atomic 是一个很底层的包,除非在一些非常追求的性能的地方,否则其他地方都不推荐使用。
add 方法比较容易理解,就是对一个值进行增加操作:
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
使用示例:
var a int32 = 1
atomic.AddInt32(&a, 2)
fmt.Println(a) // 输出3
atomic.AddInt32(&a, -1) // delta 是负值的话会减少该值
fmt.Println(a) // 输出2
CompareAndSwap用作比较置换值,如果等于,则更新值,返回 true,否则返回 false:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
使用示例:
var (
a int32 = 1
b bool
)
b = atomic.CompareAndSwapInt32(&a, 1, 2)
fmt.Println(a) // 输出2
fmt.Println(b) // 输出true
b = atomic.CompareAndSwapInt32(&a, 1, 3)
fmt.Println(a) // 输出2
fmt.Println(b) // 输出false
Swap方法不比较,直接置换值:
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
使用示例:
var (
a int32 = 1
old int32
)
old = atomic.SwapInt32(&a, 2)
fmt.Println(a) // 输出2
fmt.Println(old) // 输出1
Load 用来读取值:
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
使用示例:
var (
a int32 = 1
value int32
)
value = atomic.LoadInt32(&a)
fmt.Println(value) // 输出1
Store 用来将一个值存到变量中,Load 不会读取到存到一半的值:
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
使用示例:
var a int32
atomic.StoreInt32(&a, 1)
fmt.Println(a) // 输出1
Value 实现了对任意值的存储、读取、置换、比较置换:
func (v *Value) Store(val any)
func (v *Value) Load() (val any)
func (v *Value) Swap(new any) (old any)
func (v *Value) CompareAndSwap(old, new any)
使用示例:
var v atomic.Value
v.Store(1)
fmt.Println(v.Load()) // 1
v.Swap(2)
fmt.Println(v.Load()) // 2
b := v.CompareAndSwap(2, 3)
fmt.Println(v.Load()) // 3
fmt.Println(b)
使用Swap置换值时,必须要保持原有的数据类型,否则就会 panic: sync/atomic: swap of inconsistently typed value into Value [recovered]。
需要注意的是,atomic.value 对于复杂的数据结构不能保证原子操作,如切片、映射等。
go 在并发下,同时读 map 是安全的,但是读写 map 会引发竞争,导致 panic: fatal error: concurrent map read and map write。
// 创建一个map
m := make(map[int]int)
// 开启两个协程不停的对map写入数据
go func() {
for {
m[1] = 1
}
}()
go func() {
for {
_ = m[1]
}
}()
for {
}
// 结果
fatal error: concurrent map read and map write
为了解决这个问题,可以在写 map 之前加入锁:
l := sync.Mutex{}
l.Lock()
m[1] = 1
l.Unlock()
这样处理程序上运行是没问题了,但是性能并不高。go 在 1.9 版本中加入了效率较高的并发安全:sync.map:
func (m *Map) Store(key, value any) // 储存一个数据
func (m *Map) Load(key any) (value any, ok bool) // 读取一个数据
func (m *Map) Delete(key any) // 删除一个数据
func (m *Map) Range(f func(key, value any) bool) // 遍历数据
实例:
var smap sync.Map
// 保存数据
smap.Store("shanghai", 40000)
smap.Store("nanjing", 10000)
smap.Store("wuhan", 20000)
smap.Store("shenzhen", 30000)
// 读取值
if v, ok := smap.Load("nanjing"); ok {
fmt.Printf("键名:%s,值:%v\n", "nanjing", v)
}
// 删除
smap.Delete("wuhan")
if v, ok := smap.Load("wuhan"); !ok {
fmt.Printf("键名:%s,值:%v\n", "wuhan", v)
}
// 遍历数据
smap.Range(func(k, v interface{}) bool {
fmt.Printf("键名:%s,值:%v\n", k, v)
return true
})
// 结果
键名:nanjing,值:10000
键名:wuhan,值:<nil>
键名:shenzhen,值:30000
键名:shanghai,值:40000
键名:nanjing,值:10000
sync.map
并没有获取长度的方法,只能在遍历的时候自行计算。
本系列文章:
手机扫一扫
移动阅读更方便
你可能感兴趣的文章