今天小编给大家分享一下Go语言中锁如何实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
Lock
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 上锁,成功返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
//已经锁上的写成进入慢锁流程
m.lockSlow()
}
lockSlow
func (m *Mutex) lockSlow() {
var waitStartTime int64 //执行时间
starving := false //当前请求是否是饥饿模式
awoke := false //当前请求是否是唤醒状态
iter := 0 //自旋次数
old := m.state //旧state值
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
//旧state值已上锁,并且未进入饥饿模式,且可以自旋,进入自旋逻辑
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 当前协程未唤醒
//&& old.state 为未唤起状态,就是说没有其他被唤起的waiter
//&& waiter数>0
//&& m.state标记为唤起状态成功
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//标记当前协程为唤起状态
//r: 这是为了通知在解锁Unlock()中不要再唤醒其他的waiter了
awoke = true
}
//自旋
runtime_doSpin()
//自旋计数器
iter++
old = m.state
continue
}
//r: old是锁当前的状态,new是期望的状态,以期于在后面的CAS操作中更改锁的状态
//new代表期望的state值
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
//old不是饥饿状态,new带上上锁标志位,也就是饥饿状态不上锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
//旧state值是上锁状态或饥饿状态,新state waiter数+1
//r: 表示当前goroutine将被作为waiter置于等待队列队尾
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
//当前协程为饥饿状态&&旧state已上锁,新state加饥饿标志位
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//r:? 当awoke为true,则表明当前goroutine在自旋逻辑中,成功修改锁的Woken状态位为1
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//新state关闭唤醒标志位
//r: 因为在后续的逻辑中,当前goroutine要么是拿到锁了,要么是被挂起。
// 如果是挂起状态,那就需要等待其他释放锁的goroutine来唤醒。
// 假如其他goroutine在unlock的时候发现Woken的位置不是0,则就不会去唤醒,那该goroutine就无法再醒来加锁。(见unlock逻辑)
new &^= mutexWoken
}
//r: 尝试将锁的状态更新为期望状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//旧state不是锁或饥饿状态,上锁成功,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
//r: 如果走到这里,那就证明当前goroutine没有获取到锁
// 这里判断waitStartTime != 0就证明当前goroutine之前已经等待过了,则需要将其放置在等待队列队头
//进入队列是否排在最前
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//r: 被信号量唤醒之后检查当前goroutine是否应该表示为饥饿
// (这里表示为饥饿之后,会在下一轮循环中尝试将锁的状态更改为饥饿模式)
// 1. 如果当前goroutine已经饥饿(在上一次循环中更改了starving为true)
// 2. 如果当前goroutine已经等待了1ms以上
//被信号量唤醒后当前协程是否进入饥饿状态
//1. 之前是饥饿状态
//2. 运行时间超过1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 再次获取锁状态
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
//饥饿模式协程是在Unlock()时handoff到当前协程的
//r:? 如果当前锁既不是被获取也不是被唤醒状态,或者等待队列为空
// 这代表锁状态产生了不一致的问题
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
//m.state 上锁,waiter数-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//当前协程不是饥饿状态或旧state的waiter数=1,则m.state饥饿标志位置0
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
//拿到锁,退出.
break
}
awoke = true
iter = 0
} else {
//执行循环前的语句,恢复最新现场
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlock
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
//m.state取消锁状态,返回值new代表修改后的新值
//如果为0代表没有其他锁了,退出;否则进入unlockSlow()
//锁空闲有两种情况:
//1. 所有位为0,代表没有锁了
//2. 标志位为0, waiter数量>0,还有协程在等待解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
UnlockSlow
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
//解锁,结束,退出
//1. 没有waiter了
//2. 已上锁
//3. 锁处于唤醒状态,表示有协程被唤醒
//4. 饥饿模式, 所有权交给了被解锁饥饿模式的waiter
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 如果能走到这,那就是上面的if判断没通过
// 说明当前锁是空闲状态,但是等待队列中有waiter,且没有goroutine被唤醒
// 所以,这里我们想要把锁的状态设置为被唤醒,等待队列waiter数-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//通过信号量唤醒某一个waiter,退出
runtime_Semrelease(&m.sema, false, 1)
return
}
//失败的话,更新old信息,进入下个循环
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
//饥饿模式,唤醒等待队列队头waiter
runtime_Semrelease(&m.sema, true, 1)
}
}
其他关键函数
runtime_canSpin
是否可自旋,不展开
runtime_doSpin
核心是汇编实现,循环执行三十次PAUSE指令
runtime_SemacquireMutex
信号量上锁
sem来自单词semaphore 信号量
runtime_Semrelease
信号量释放
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
If handoff is true, pass count directly to the first waiter.
handoff 就是传球的意思,handoff 为 false 时,仅仅唤醒等待队列中第一个协程,但是不会立马调度该协程;当 handoff 为 true 时,会立马调度被唤醒的协程,此外,当 handoff = true 时,被唤醒的协程会继承当前协程的时间片。具体例子,假设每个 goroutine 的时间片为 2ms,gorounte A 已经执行了 1ms,假设它通过 runtime_Semrelease(handoff = true) 唤醒了 goroutine B,则 goroutine B 剩余的时间片为 2 - 1 = 1ms。
golang 中 sync.Mutex 的实现
semrelease1(addr, handoff, skipframes) 参数handoff若为true,则让被唤醒的g立刻继承当前g的时间片继续执行。若handoff为false,则把刚被唤醒的g放到当前p的runq中。
Golang sync.Mutex 源码分析
RWMutex
很简单,看源码就行
[Go并发] - RWMutex源码解析
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers 当前读锁数量
readerWait int32 // number of departing readers 要离开的读锁数量,暨等待写锁解锁,解锁后可以释放的读锁数量
}
Lock()
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock() //通过sync.Lock()限制多写锁进入下边的逻辑
// Announce to readers there is a pending writer.
//r值不变, rwmutexMaxReaders值为1<<30
//可以理解为只要读锁的数量小于1<<30位,rw.readerCount值<0表示有写锁.
//也可以理解为加上一个负数,将31位以上都标记为1,代表有写锁, 剩余30位记录读锁数量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
//r!=0 有读锁,不能释放写锁
//将readerCount转移到readerWait,readerWait的新值!=0 (以上可以翻译为有读锁,将读锁数转移到读等待数,然后写锁阻塞,)
// 满足上面两个条件,写锁阻塞, 等待唤醒,不返回
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
UnLock()
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
//将Lock()方法减去的值加回来,变成正数
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
//唤醒在RLock()方法阻塞的读操作,数量为r
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
RLock()
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//<0表示已上写锁,阻塞
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
UnRLock()
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
//<0表示已上写锁,慢解锁
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
//最后一个读等待,唤醒写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
以上就是Go语言中锁如何实现的详细内容,更多关于Go语言中锁如何实现的资料请关注九品源码其它相关文章!