作者:iuoui
來源:SegmentFault 思否
目前GO已經更新到了1.14的版本
咱們一般人如果直接去看mutex的源碼的話,其實是比較難理解為什麼寫成了現在這個樣子,尤其是加鎖裡面的各種邏輯判斷太多了,各種位運算一臉懵逼,其實我們只要掌握它最初的設計思想,那麼後面新增的邏輯,理解起來都很簡單了。
Mutex最初版本
Mutex第一版代碼加上注釋不過才109行。非常精簡,下面介紹一下我對第一版Mutex源碼的理解
我們接下來看它的Lock方法
// CAS獲取鎖失敗// awoke 默認是未喚醒狀態awoke := falsefor {// 當前state賦值給oldold := m.state// 給old上鎖new := old | mutexLocked// 如果old本身就已經上了鎖的話if old&mutexLocked != 0 {// goroutine等待數 + 1new = old + 1<
if raceenabled {raceAcquire(unsafe.Pointer(m))}}
簡單總結一下Lock的邏輯,分幾種情況說明一下
第一種情況:第一次上鎖的時候,直接走第一步CAS上鎖,成功返回
第二種情況:Mutex已經被另一個g上鎖,那麼state的g等待數+1,更新當前的鎖狀態,然後就進入隊列,等待被喚醒,等到另個一g調用了Unlock方法之後,當前g被喚醒,然後設置awoken=true,再執行一遍for循環,此時locked位就是未上鎖狀態(0),new就是代表上鎖,然後清除woken位,然後再CAS更新new到state上,因為之前的鎖是未上鎖狀態,那麼就代表搶鎖成功,break,返回
第三種情況:和第二種一樣,只不過,在CAS更新new到state上時,有其他g先改掉了state的值,那麼就繼續for循環,然後重複到第二種情況。
接下來看下Unlock方法
// 一開始也是直接去掉加鎖狀態new := atomic.AddInt32(&m.state, -mutexLocked)// 判斷一下是否解鎖了一個未加鎖的Mutexif (new+mutexLocked)&mutexLocked == 0 {// 直接panicpanic("sync: unlock of unlocked mutex")}
// 把解鎖後的值賦值給oldold := newfor {// 如果此時沒有需要等待獲取鎖的G// 或者當前Mutex已經被搶鎖成功或者已經有被喚醒的G,那麼就可以直接返回if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {return}
// g等待數-1,然後設置喚醒標記位new = (old - 1<
Unlock要比Lock簡單很多,所以這裡不總結了,看注釋就能明白
到這裡,最初版本的Mutex源碼已經分析完了,關鍵還是在上鎖的方法裡面。上鎖邏輯非常簡單粗暴,直接CAS獲取鎖,失敗就G等待數+1,然後進入隊列,等待被喚醒。
那麼,如果仔細想想,就會發現性能上還是有可以改進的地方。
我們應用Mutex的時候,肯定把鎖粒度控制的越小越好,那麼這樣的話就很可能會出現這麼一個問題,當第一次上鎖CAS失敗的時候,mutex已經被其他G解鎖了,但是當前G就還是直接進入隊列,等待被喚醒,這樣的話其實就會帶來額外的調度開銷。
所以,Mutex後面引進了自旋鎖的概念自旋鎖提交代碼
Mutex 引入自旋鎖
Currently sync.Mutex is fully cooperative. That is, once contention is discovered,
the goroutine calls into scheduler. This is suboptimal as the resource can become
free soon after (especially if critical sections are short). Server software
usually runs at ~~50% CPU utilization, that is, switching to other goroutines
is not necessary profitable.
This change adds limited active spinning to sync.Mutex if:
running on a multicore machine and
GOMAXPROCS>1 and
there is at least one other running P and
local runq is empty. As opposed to runtime mutex we don't do passive spinning, because there can be work on global runq on on other
Ps.
簡單概括一下,就是為了解決鎖粒度非常小的時候,給系統帶來的不必要的調度開銷
不過自旋要先滿足幾個條件
首先程序要跑在多核的機器上,然後GOMAXPROCS要大於1,並且此時有至少一個P的local runq是空的,才能進入到自旋的狀態
自旋是一種多線程同步機制,當前的進程在進入自旋的過程中會一直保持 CPU 的占用,持續檢查某個條件是否為真。在多核的 CPU 上,自旋可以避免 Goroutine 的切換,使用恰當會對性能帶來很大的增益,但是使用的不恰當就會拖慢整個程序,所以 Goroutine 進入自旋的條件非常苛刻
看一下更新之後的Lock方法
awoke := falseiter := 0 // 自旋的次數( <= 4)for {old := m.statenew := old | mutexLocked// 沒有解鎖if old&mutexLocked != 0 {// 判斷是否滿足自旋的狀態if runtime_canSpin(iter) {// 當woken標記位沒有被設置,而且等待G數量不等於0,並設置woken標記位成功// 這裡設置woken標記位的原因是,通知Unlock不用去喚醒等待隊列裡面的G了if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {// 標記awoke=trueawoke = true}// runtime_doSpin -> sync_runtime_doSpin// 每次自旋30個時鐘周期,最多120個周期runtime_doSpiniter++// 再次執行for循環continue}// 自旋結束之後,G等待數量+1new = old + 1<
if raceenabled {raceAcquire(unsafe.Pointer(m))}}
相比於第一版的Mutex,這裡只在加鎖的方法裡面增加了自旋鎖的邏輯
當Mutex已經上鎖的時候,當前G在滿足自旋條件下,進入自旋狀態,在自旋中,其他G解鎖了Mutex,那麼當前G就設置了woken標記位,這樣其他G在Unlock的時候就不會去等待隊列裡面喚醒G了,然後當前G就順理成章的搶到了鎖
這樣自旋鎖在鎖粒度非常小的場景下的能對其性能帶來一定的優化。
引入自旋鎖之後,又帶來了一個問題。就是G等待隊列的長尾問題。因為從等待隊列裡面被喚醒,然後再去搶鎖,對本身就在執行的G來說,被喚醒的G其實是很難搶過當前執行的G的,這樣的話,等待隊列裡面的G,就會被餓死(長時間獲取不到鎖),這樣對等待隊列的G來說其實是不公平的。
所以Mutex後面引入了飢餓模式飢餓模式代碼
Mutex引入飢餓模式
本次代碼變動還是挺大的
先看下提交者的介紹
Add new starvation mode for Mutex.
In starvation mode ownership is directly handed off from
unlocking goroutine to the next waiter. New arriving goroutines
don't compete for ownership.
Unfair wait time is now limited to 1ms.
Also fix a long standing bug that goroutines were requeued
at the tail of the wait queue. That lead to even more unfair
acquisition times with multiple waiters.
Performance of normal mode is not considerably affected.
簡單概括一下,就是解決了等待G隊列的長尾問題
飢餓模式下,直接由unlock把鎖交給等待隊列中排在第一位的G,同時,飢餓模式下,新進來的G不會參與搶鎖也不會進入自旋狀態,會直接進入等待隊列的尾部。
飢餓模式的觸發條件,當一個G等待鎖時間超過1毫秒時,Mutex切換到飢餓模式
飢餓模式的取消條件,當一個G獲取到鎖且在等待隊列的末尾,或者這個G獲取鎖的等待時間在1ms內,那麼Mutex切換回正常模式
帶來的改變
Mutex.state的倒數第三位,變成了mutexStarving標記位,0表示正常模式,1表示飢餓模式,與此同時,支持的最大等待G數量從2^30^個 變成了2^29^個
接下來還是主要關注Lock方法,我只在新增的邏輯上添加註釋了,我直接貼1.14的Lock代碼,較1.9的版本沒什麼改變
func(m *Mutex)lockSlow{ varwaitStartTime int64starving := false// 默認是正常模式awoke := falseiter := 0old := m.statefor{ // 當前Mutex在飢餓模式下已經被鎖了的話,當前G不進入自旋// 只有Mutex在正常模式且被鎖了的情況下,並且滿足自旋的條件,才會進入到自旋邏輯裡面ifold&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense.// Try to set mutexWoken flag to inform Unlock// to not wake other blocked goroutines.if!awoke && old&mutexWoken == 0&& old>>mutexWaiterShift != 0&& atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpiniter++old = m.statecontinue}new:= old // 如果當前不是飢餓模式ifold&mutexStarving == 0{ // 加鎖new|= mutexLocked }// 如果Mutex已經被鎖,或者是在飢餓模式ifold&(mutexLocked|mutexStarving) != 0{ // 等待的G數量+1new+= 1<< mutexWaiterShift }// The current goroutine switches mutex to starvation mode.// But if the mutex is currently unlocked, don't do the switch.// Unlock expects that starving mutex has waiters, which will not// be true in this case.// 如果已經是飢餓模式,並且Mutex是被鎖的狀態ifstarving && old&mutexLocked != 0{ // 切換成飢餓模式new|= mutexStarving }ifawoke { // The goroutine has been woken from sleep,// so we need to reset the flag in either case.ifnew&mutexWoken == 0{ throw( "sync: inconsistent mutex state") }new&^= mutexWoken }// 更新state值ifatomic.CompareAndSwapInt32(&m.state, old, new) { // 非飢餓模式下搶鎖成功ifold&(mutexLocked|mutexStarving) == 0{ // 退出break// locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.// 如果之前已經設置過waitStartTime的話,queueLifo就是true了queueLifo := waitStartTime != 0// 沒有設置過,獲取下運行時間ifwaitStartTime == 0{ waitStartTime = runtime_nanotime}// 阻塞,等待被喚醒runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 如果等待時間超過1ms,設置starving = true,否則就是falsestarving = starving || runtime_nanotime-waitStartTime > starvationThresholdNsold = m.state// 如果Mutex已經是飢餓模式ifold&mutexStarving != 0{ // If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.// 如果當前G是在飢餓模式下被喚醒的// 加個判斷state是否正確設置的邏輯ifold&(mutexLocked|mutexWoken) != 0|| old>>mutexWaiterShift == 0{ throw( "sync: inconsistent mutex state") }// delta = -7 (1..... 0111)delta := int32(mutexLocked - 1<
ifrace.Enabled { race.Acquire(unsafe.Pointer(m))}}
Unlock方法改動就非常小了
// Fast path: drop lock bit.new:= atomic.AddInt32(&m.state, -mutexLocked) ifnew!= 0{ // Outlined slow path to allow inlining the fast path.// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.m.unlockSlow( new) }}
func(m *Mutex)unlockSlow( newint32) { if( new+mutexLocked)&mutexLocked == 0{ throw( "sync: unlock of unlocked mutex") }// 不是飢餓模式ifnew&mutexStarving == 0{ old := newfor{ // G等待隊列==0,直接返回// (或者,處於woken模式,直接返回// 或者,處於locked模式,直接返回// 或者處於飢餓模式,直接返回)ifold>>mutexWaiterShift == 0|| old&(mutexLocked|mutexWoken|mutexStarving) != 0{ return}new= (old - 1<
總結
Mutex經過兩次演進,都解決了不同的問題。Mutex用法非常簡單,裡面的原理不感興趣的話其實沒必要深究,知道個大概的邏輯就行了。
補充:
mutex的等待G隊列的順序是FIFO
飢餓模式下,性能其實很低,主要就是為了解決長尾問題的
SegmentFault 思否社區和文章作者展開更多互動和交流。
文章來源: https://twgreatdaily.com/zh-tw/4Btla3MBnkjnB-0zvb-r.html