golang源码学习之mutex

老实说呢,Mutex源码我看了好多遍,依旧没完全看懂。各种状态逻辑很难理解。(golang 1.12.7)
先来看看Mutex的核心注释
    // Mutex fairness.
    //
    // Mutex can be in 2 modes of operations: normal and starvation.
    // In normal mode waiters are queued in FIFO order, but a woken up waiter
    // does not own the mutex and competes with new arriving goroutines over
    // the ownership. New arriving goroutines have an advantage -- they are
    // already running on CPU and there can be lots of them, so a woken up
    // waiter has good chances of losing. In such case it is queued at front
    // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
    // it switches mutex to the starvation mode.
    //
    // In starvation mode ownership of the mutex is directly handed off from
    // the unlocking goroutine to the waiter at the front of the queue.
    // New arriving goroutines don't try to acquire the mutex even if it appears
    // to be unlocked, and don't try to spin. Instead they queue themselves at
    // the tail of the wait queue.
    //
    // If a waiter receives ownership of the mutex and sees that either
    // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
    // it switches mutex back to normal operation mode.
    //
    // Normal mode has considerably better performance as a goroutine can acquire
    // a mutex several times in a row even if there are blocked waiters.
    // Starvation mode is important to prevent pathological cases of tail latency.
    
    Mutex是公平的。
    Mutex有两种模式:普通和饥饿模式。
    
    在普通模式下,等待者是按FIFO排队等待,但是被唤醒的等待者不是直接拥有锁,而是要和新来的goroutine
    竞争。由于新来的goroutine已经在CPU上了,所以唤醒的等待者很有可能获取不到锁。在这种情况下,被唤醒
    的等待者直接加入到队列首部。如果被唤醒的等待者超过1ms没获取到锁,那么mutex将切换到饥饿模式。
    
    在饥饿模式下,锁直接交给队列的第一个等待者。新来的goroutine不要去尝试获取锁,也不要自旋,即便是
    mutex处于unlock状态。直接将goroutine加到队列尾部。
    
    如果一个等待者获取了锁,在下面两种情况之一将转换为普通模式:1.等待者处于队列尾部;2.等待者少于1ms。
    
    普通模式具有良好的性能可以连续多次获取锁。
    饥饿模式解决了队尾一直无法获取锁的问题。
    

数据结构

/// src/sync/Mutex.go
type Mutex struct {
    state int32  // 状态
    sema  uint32 // 信号
}

Mutex的数据结构看似很简单就两个变量,但有时候看起来简单的东西其实并不简单。state是一个32bit的状态变量,它包含了4个部分。
第一位:锁标志。 0:未被锁; 1: 被锁
第二位:唤醒标志。 0:未被唤醒; 1:唤醒
第三位:饥饿标志。 0:普通模式; 1:饥饿模式
其余位:等待个数

常量

mutexLocked = 1 << iota //锁标志,对应state第一位
mutexWoken              //唤醒标志,对应state第二位
mutexStarving           //饥饿标志,对应state第三位
mutexWaiterShift = iota // 等待个数在state的位移
starvationThresholdNs = 1e6  // 1ms,用于判断被唤醒的等待者是否超时

Lock

func (m *Mutex) Lock() {
    // 如果state处于空闲状态,那么直接获取锁。state=0说明state的各个部分都为0
    // 000...000 000
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    var waitStartTime int64 //goroutine的开始时间
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // 如果state已锁且处于正常模式,且可自旋,那么就进入自旋。
        // runtime_canSpin(iter)可以看出能否可以自旋和iter有一定的关系。
        // 如果进入runtime_canSpin(runtime/lock_sema.go)可以知道iter>=4就不能自旋了。当然
        // 能否自旋还和cpu核数有关。
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 如果goroutine自身未唤醒 + state未唤醒 + 等待数>0, 那么将state设为唤醒  
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                // 自身设为唤醒
                awoke = true
            }
            // 自旋
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        // new 指的是期望设置的状态
        new := old
        // 如果state 正常模式,new设置为locked状态。
        // 反过来 如果是饥饿状态呢?饥饿状态的规则是把锁交给队列头的等待者。当前的goroutine是不会获取到锁的
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 如果state为locked或者饥饿模式,那么new等待者+1
        // case1 locked: 说明没获取到锁,加入等待 
        // case2 饥饿: 饥饿模式锁只会交给等待队列头,所以也不会获取到锁,加入等待队列
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        // goroutine 饥饿 + state locked, new 设为饥饿
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // goroutine是唤醒的,那么old state必然也是唤醒的。见自旋那部分
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            // new 设为未唤醒状态
            new &^= mutexWoken
        }
        // new 替换 old
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果old为unlock + 普通模式,那么就直接获取锁
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 如果是新来的goroutine,设置开始时间
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 进入sleep, 等待唤醒
            // queueLifo = true, 说明是唤醒的goroutine,加入队列头
            // queueLifo = false, 说明是新来的goroutine,加入队列尾
            runtime_SemacquireMutex(&m.sema, queueLifo)
            // 如果唤醒时间 > 1ms,那么goroutine设为饥饿模式? 那如果 < 1ms 呢?
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            
            // 如果state是饥饿模式,那么说明唤醒的队列头
            // 为什么是队列头呢? 其实在unlock()部分定义的
            if old&mutexStarving != 0 {
            
                // state 检查
                // 如果是饥饿模式唤醒,那么state肯定是unlock + 未唤醒。为什么一定是未唤醒呢? 看unlock()。
                // 饥饿状态被唤醒,等待者肯定也是>0的,不然怎么把锁交给队列头
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                
                // 等待者 - 1 
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // goroutine普通模式  或者  等待者只剩一个,设置成普通模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                // 更新state, 获取到锁
                atomic.AddInt32(&m.state, delta)
                break
            }
            
            // 普通模式, 随机唤醒队列
            
            // goroutine 唤醒, 置零自旋
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

反正呢代码很简单,但逻辑有点复杂,我也没理很顺。

Unlock

func (m *Mutex) Unlock() {

    // 设置unlock
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 先减一个mutexLocked,再加一个mutexLocked,等于原始状态。
    // 如果正在解锁一个处于unlock的锁, 非法。
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    
    // 正常模式
    if new&mutexStarving == 0 {
        old := new
        for {
            // 等待者为0   或者   state为锁定、唤醒、饥饿
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 设置成唤醒
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            // 设置新值
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 随机唤醒等待队列
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 唤醒队列头
        // 注意:这里没有设置unlock,在上面的Lock()过程中,饥饿模式下对于被唤醒的等待者不用关心是否被锁也可以获取锁
        // 那对于新来的goroutine呢,会获取锁吗?新来的goroutine只能在普通模式才能获取锁。
        runtime_Semrelease(&m.sema, true)
    }
}

解锁过程相对于加锁要简单很多。普通模式随机唤醒,饥饿模式唤醒队列头。

后记

加锁过程涉及到很多逻辑状态,一时半会还无法理解,有缘再琢磨琢磨。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容