sync包
sync包是 golang 一个官方的异步库,提供了一些各种基础的异步的实现,如互斥锁等。sync 包主要包括了以下几种类型:
- sync.Mutex 和 sync.WaitGroup
- sync.Once
- sync.Map
- sync.Pool
- sync.Cond
sync.Once
sync.Once
是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。 sync.Once
比较多用在初始化,注册和对象创建上。sync.Once
源码:
type Once struct { |
sync.Once用法
sync.Once 其中一个应用就是实现一个单例模式,比如在一个结构体中申明,并在初始化的时候使用:var capInstance struct {
once sync.Once
lock sync.Mutex
capabilities *Capabilities
}
// Initialize the capability set. This can only be done once per binary, subsequent calls are ignored.
func Initialize(c Capabilities) {
// Only do this once
capInstance.once.Do(func() {
capInstance.capabilities = &c
})
}
sync.Map
sync.Map
是一个线程安全的map结构,一般用于多读少写的并发操作,下图是sync.Map
的数据结构
图引至码农桃花源公众号
type Map struct { |
mu
是Map
的互斥锁用于对并发操作进行加锁保护,read
是用于存储只读内容的,可以提供高并发的读操作。 dirty
是一个原始的map
结构体,对dirty
的操作需要加锁,dirty
包涵了全量的数据,在读数据的时候会先读取read
,read
读取不到再读dirty
。 misses
是read
读取失败的次数,当多次读取失败后 misses
累计特定值,dirty
就会升级成read
。sync.Map
这里采用的策略类似数据库常用的”读写分离”,技术都是相通的O(∩_∩)O
sync.Map用法
func main() { |
sync.Pool
sync.Pool
是一个用来缓存大量重复对象,减少大量对象创建给GC压力,是sync
异步包中很重要的一种数据结构,看其基本数据结构:
type Pool struct { |
图引至码农桃花源公众号
sync.Pool 的用法
sync.Pool的用法很简单,就三个方法://初始化pool对象
var pool sync.Pool
type shikanon struct {
num int
}
// 创建新对象创建方法
func initPool() {
pool = sync.Pool{
New: func() interface{} {
return &shikanon{num: rand.Int()}
},
}
}
func main() {
initPool()
// 从pool对象池中取对象
p1 := pool.Get().(*shikanon)
fmt.Println("p1", p1.num)
// 将对象放入pool对象池
pool.Put(p1)
p2 := pool.Get().(*shikanon)
fmt.Println("p2", p2.num)
}
sync.Cond
sync.Cond
是用于条件变量(condition variable)实现 —— 它可以让 Goroutine 都在满足特定条件时被唤醒,因此通常和锁一起使用,比如 Mutex 或 RWMutex。Cond 就是 condition 的意思。
sync.Cond 的数据结构:
type Cond struct { |
从数据结构可以看出,sync.Cond 等于在sync.Mutext的基础上,增加了一个通知列表notify做条件通知。
sync.Cond 主要有三种方法:等待通知(wait),单发通知(signal),广播通知(broadcast)。// 生成一个cond,需要传入一个Locker,
// 因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于Locker来实现的。
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// 用于等待通知
func (c *Cond) Wait() {
// 检查cond是否被拷贝
c.checker.check()
// 将获得锁的goroutine加入等待队列
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 将当前 Goroutine 追加到notifyList链表的末端,并让其处于休眠状态,这个操作是阻塞的,
// 让当前 goroutine 休眠主要是通过调用 runtime.goparkunlock 实现
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// 用于发送单个通知
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// 用于广播
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime_notifyListNotifyOne
、runtime_notifyListNotifyAll
、runtime_notifyListAdd
、runtime_notifyListWait
这几个函数都是用的runtime
下的sema.go
文件link过来的,这里先不深究,看看notifyList
数据结构:type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32
// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
sync.Cond 的用法
sync.Cond 主要用于消息广播中(主要是单通知大家用的更多地是 channel + select )。比较经典的sync.Cond实现有etcd
的 FIFO Scheduler 的实现:func NewFIFOScheduler() Scheduler {
f := &fifo{
resume: make(chan struct{}, 1),
donec: make(chan struct{}, 1),
}
// 生成一个Cond对象
f.finishCond = sync.NewCond(&f.mu)
f.ctx, f.cancel = context.WithCancel(context.Background())
go f.run()
return f
}
// WaitFinish 用于 等待至少 n 个任务被完成 或所有 pending任务被完成
func (f *fifo) WaitFinish(n int) {
f.finishCond.L.Lock()
for f.finished < n || len(f.pendings) != 0 {
// 等待通知
f.finishCond.Wait()
}
f.finishCond.L.Unlock()
}
func (f *fifo) run() {
...
for {
var todo Job
...
// 完成一个上下文
todo(f.ctx)
// 加锁
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
// 广播通知唤醒其他所有goroutine
f.finishCond.Broadcast()
f.finishCond.L.Unlock()
}
}