深入理解Go語言中的sync.Cond
1. 簡介
本文將介紹 Go 語言中的?sync.Cond
并發(fā)原語,包括?sync.Cond
的基本使用方法、實現(xiàn)原理、使用注意事項以及常見的使用使用場景。能夠更好地理解和應(yīng)用 Cond 來實現(xiàn) goroutine 之間的同步。
2. 基本使用
2.1 定義
sync.Cond
是Go語言標(biāo)準(zhǔn)庫中的一個類型,代表條件變量。條件變量是用于多個goroutine之間進(jìn)行同步和互斥的一種機(jī)制。sync.Cond
可以用于等待和通知goroutine,以便它們可以在特定條件下等待或繼續(xù)執(zhí)行。
2.2 方法說明
sync.Cond
的定義如下,提供了Wait
?,Singal
,Broadcast
以及NewCond
方法
type Cond struct {
? noCopy noCopy ? // L is held while observing or changing the condition
? L Locker
? notify ?notifyList
? checker copyChecker
}func NewCond(l Locker) *Cond {}func (c *Cond) Wait() {}func (c *Cond) Signal() {}func (c *Cond) Broadcast() {}
NewCond
方法: 提供創(chuàng)建Cond
實例的方法Wait
方法: 使當(dāng)前線程進(jìn)入阻塞狀態(tài),等待其他協(xié)程喚醒Singal
方法: 喚醒一個等待該條件變量的線程,如果沒有線程在等待,則該方法會立即返回。Broadcast
方法: 喚醒所有等待該條件變量的線程,如果沒有線程在等待,則該方法會立即返回。
2.3 使用方式
當(dāng)使用sync.Cond
時,通常需要以下幾個步驟:
定義一個互斥鎖,用于保護(hù)共享數(shù)據(jù);
創(chuàng)建一個
sync.Cond
對象,關(guān)聯(lián)這個互斥鎖;在需要等待條件變量的地方,獲取這個互斥鎖,并使用
Wait
方法等待條件變量被通知;在需要通知等待的協(xié)程時,使用
Signal
或Broadcast
方法通知等待的協(xié)程。最后,釋放這個互斥鎖。
下面是一個簡單的代碼的示例,展示了大概的代碼結(jié)構(gòu):
var ( ? ?// 1. 定義一個互斥鎖
? ?mu ? ?sync.Mutex
? ?cond ?*sync.Cond
? ?count int)func init() { ? ?// 2.將互斥鎖和sync.Cond進(jìn)行關(guān)聯(lián)
? ?cond = sync.NewCond(&mu)
}go func(){ ? ?// 3. 在需要等待的地方,獲取互斥鎖,調(diào)用Wait方法等待被通知
? ?mu.Lock() ? ?// 這里會不斷循環(huán)判斷 是否滿足條件
? ?for !condition() {
? ? ? cond.Wait() // 等待任務(wù)
? ?}
? ?mu.Unlock()
}go func(){ ? ? // 執(zhí)行業(yè)務(wù)邏輯
? ? // 4. 滿足條件,此時調(diào)用Broadcast喚醒處于等待狀態(tài)的協(xié)程
? ? cond.Broadcast()
}
2.4 使用例子
下面通過描述net/http
中的?connReader
,來展示使用sync.Cond
實現(xiàn)阻塞等待通知的機(jī)制。這里我們不需要理解太多,只需要知道connReader
下面兩個方法:
func (cr *connReader) Read(p []byte) (n int, err error) {}func (cr *connReader) abortPendingRead() {}
Read
方法則是用于從HTTP
連接中讀取數(shù)據(jù),不允許并發(fā)訪問的。而abortPendingRead
則是用于終止正在讀取的連接。
從abortPendingRead
方法的語意來看,是需要成功終止其他協(xié)程進(jìn)行數(shù)據(jù)的讀取之后,才能正常返回,也就是此時沒有協(xié)程再繼續(xù)讀取數(shù)據(jù)了,才可以返回。
那abortPendingRead
如何得知是否還有協(xié)程在讀取數(shù)據(jù)呢,其實是可以通過定時輪訓(xùn)connReader
的狀態(tài),從而判斷當(dāng)前Read方法是否仍在讀取數(shù)據(jù)。但是定時輪訓(xùn)效率太低,可能會造成cpu的大量空轉(zhuǎn)。更好的方式,應(yīng)該是讓協(xié)程進(jìn)入阻塞狀態(tài),然后等條件滿足了,其他協(xié)程再來喚醒當(dāng)前協(xié)程,然后再繼續(xù)運(yùn)行下去。
這個其實就是sync.Cond
設(shè)計的用途,當(dāng)不滿足運(yùn)行條件時,先進(jìn)入阻塞狀態(tài),等待條件滿足時,再由其他協(xié)程來喚醒,然后再繼續(xù)運(yùn)行下去,能夠提高程序的執(zhí)行效率。其中Wait
方法便是讓協(xié)程進(jìn)入阻塞狀態(tài),而Singal
和Boardcast
便是喚醒處于阻塞狀態(tài)的協(xié)程,告知其條件滿足了,可以繼續(xù)向下執(zhí)行了。
回到我們connReader
的例子,我們使用sync.Cond
實現(xiàn)阻塞等待通知的效果。
type connReader struct { ? ?// 是否正在讀取數(shù)據(jù)
? ?inRead bool
? ?mu ? ? ?sync.Mutex // guards following
? ?cond ? ?*sync.Cond
}func (cr *connReader) abortPendingRead() { ? ?if !cr.inRead{ ? ? ? ?return
? ?} ? ?//1. 通過一定手段,讓Read方法中斷
? ?cr.mu.Lock() ? ?// 判斷Read方法是否仍然在讀取數(shù)據(jù)
? ?for cr.inRead { ? ? ? ?//2. 此時Read方法仍然在讀取數(shù)據(jù), 不滿足條件,等待通知
? ? ? ?cr.cond.Wait()
? ?}
? ?cr.mu.Unlock()
}func (cr *connReader) Read(p []byte) (n int, err error) {
? ? cr.mu.Lock()
? ? cr.inRead = true
? ?// 1. 讀取數(shù)據(jù)
? ?// 2. abortPendingRead通過某種手段,讓Read方法中斷
? ?
? ?cr.inRead = false
? ?cr.mu.Unlock() ? ?// 3. 現(xiàn)在已經(jīng)滿足abortPendingRead繼續(xù)執(zhí)行下去的條件了,可以喚醒a(bǔ)bortPendingRead協(xié)程了
? ?cond.Boardcast()
}
這里abortPendingRead
方法首先判斷是否還在讀取數(shù)據(jù),是的話,調(diào)用Wait
方法進(jìn)入阻塞狀態(tài),等待條件滿足后繼續(xù)執(zhí)行。
對于Read
方法,因為其不運(yùn)行并發(fā)訪問,當(dāng)其將退出時,說明此時已經(jīng)沒有協(xié)程在讀取數(shù)據(jù)了,滿足abortPendingRead
繼續(xù)執(zhí)行下去的條件了,此時可以調(diào)用Boardcast
來喚醒等待條件滿足的協(xié)程。之后調(diào)用abortPendingRead
方法的協(xié)程此時能夠接收到通知,便能夠順利被喚醒,從而正確返回。
這里便展示了一個簡單的,使用sync.Cond
實現(xiàn)阻塞等待通知的例子。
3. 原理
3.1 基本原理
在Sync.Cond
存在一個通知隊列,保存了所有處于等待狀態(tài)的協(xié)程。通知隊列定義如下:
type notifyList struct {
? wait ? uint32
? notify uint32
? lock ? uintptr // key field of the mutex
? head ? unsafe.Pointer
? tail ? unsafe.Pointer
}
當(dāng)調(diào)用Wait
方法時,此時Wait
方法會釋放所持有的鎖,然后將自己放到notifyList
等待隊列中等待。此時會將當(dāng)前協(xié)程加入到等待隊列的尾部,然后進(jìn)入阻塞狀態(tài)。
當(dāng)調(diào)用Signal
?時,此時會喚醒等待隊列中的第一個協(xié)程,其他繼續(xù)等待。如果此時沒有處于等待狀態(tài)的協(xié)程,調(diào)用Signal
不會有其他作用,直接返回。當(dāng)調(diào)用BoradCast
方法時,則會喚醒notfiyList
中所有處于等待狀態(tài)的協(xié)程。
sync.Cond
的代碼實現(xiàn)比較簡單,協(xié)程的喚醒和阻塞已經(jīng)由運(yùn)行時包實現(xiàn)了,sync.Cond
的實現(xiàn)直接調(diào)用了運(yùn)行時包提供的API。
3.2 實現(xiàn)
3.2.1 Wait方法實現(xiàn)
Wait
方法首先調(diào)用runtime_notifyListAd
方法,將自己加入到等待隊列中,然后釋放鎖,等待其他協(xié)程的喚醒。
func (c *Cond) Wait() { ? // 將自己放到等待隊列中
? t := runtime_notifyListAdd(&c.notify) ? // 釋放鎖
? c.L.Unlock() ? // 等待喚醒
? runtime_notifyListWait(&c.notify, t) ? // 重新獲取鎖
? c.L.Lock()
}
3.2.2 Singal方法實現(xiàn)
Singal
方法調(diào)用runtime_notifyListNotifyOne
喚醒等待隊列中的一個協(xié)程。
func (c *Cond) Signal() { ? // 喚醒等待隊列中的一個協(xié)程
? runtime_notifyListNotifyOne(&c.notify)
}
3.2.3 Broadcast方法實現(xiàn)
Broadcast
方法調(diào)用runtime_notifyListNotifyAll
喚醒所有處于等待狀態(tài)的協(xié)程。
func (c *Cond) Broadcast() { ? // 喚醒等待隊列中所有的協(xié)程
? runtime_notifyListNotifyAll(&c.notify)
}
4.使用注意事項
4.1 調(diào)用Wait方法前未加鎖
4.1.1 問題
如果在調(diào)用Wait
方法前未加鎖,此時會直接panic
,下面是一個簡單例子的說明:
package mainimport ( ? ?"fmt"
? ?"sync"
? ?"time")var (
? count int
? cond ?*sync.Cond
? lk ? ?sync.Mutex
)func main() {
? ?cond = sync.NewCond(&lk)
? ?wg := sync.WaitGroup{}
? ?wg.Add(2) ? ?go func() { ? ? ? defer wg.Done() ? ? ? for {
? ? ? ? ?time.Sleep(time.Second)
? ? ? ? ?count++
? ? ? ? ?cond.Broadcast()
? ? ? }
? ?}() ? ?
? ?go func() { ? ? ? defer wg.Done() ? ? ? for {
? ? ? ? ?time.Sleep(time.Millisecond * 500) ? ? ? ? ?
? ? ? ? ?//cond.L.Lock()
? ? ? ? ?for count%10 != 0 {
? ? ? ? ? ? ? cond.Wait()
? ? ? ? ?}
? ? ? ? ?t.Logf("count = %d", count) ? ? ? ? ?//cond.L.Unlock() ?
? ? ? }
? ?}()
? ?wg.Wait()
}
上面代碼中,協(xié)程一每隔1s,將count字段的值自增1,然后喚醒所有處于等待狀態(tài)的協(xié)程。協(xié)程二執(zhí)行的條件為count的值為10的倍數(shù),此時滿足執(zhí)行條件,喚醒后將會繼續(xù)往下執(zhí)行。
但是這里在調(diào)用sync.Wait
方法前,沒有先獲取鎖,下面是其執(zhí)行結(jié)果,會拋出 fatal error: sync: unlock of unlocked mutex 錯誤,結(jié)果如下:
count = 0fatal error: sync: unlock of unlocked mutex
因此,在調(diào)用Wait
方法前,需要先獲取到與sync.Cond
關(guān)聯(lián)的鎖,否則會直接拋出異常。
4.1.2 為什么調(diào)用Wait方法前需要先獲取該鎖
強(qiáng)制調(diào)用Wait方法前需要先獲取該鎖。這里的原因在于調(diào)用Wait
方法如果不加鎖,有可能會出現(xiàn)競態(tài)條件。
這里假設(shè)多個協(xié)程都處于等待狀態(tài),然后一個協(xié)程調(diào)用了Broadcast喚醒了其中一個或多個協(xié)程,此時這些協(xié)程都會被喚醒。
如下,假設(shè)調(diào)用Wait
方法前沒有加鎖的話,那么所有協(xié)程都會去調(diào)用condition
方法去判斷是否滿足條件,然后都通過驗證,執(zhí)行后續(xù)操作。
for !condition() {
? ?c.Wait()
}
c.L.Lock()// 滿足條件情況下,執(zhí)行的邏輯c.L.Unlock()
此時會出現(xiàn)的情況為,本來是需要在滿足condition
方法的前提下,才能執(zhí)行的操作?,F(xiàn)在有可能的效果,為前面一部分協(xié)程執(zhí)行時,還是滿足condition
條件的;但是后面的協(xié)程,盡管不滿足condition
條件,還是執(zhí)行了后續(xù)操作,可能導(dǎo)致程序出錯。
正常的用法應(yīng)該是,在調(diào)用Wait
方法前便加鎖,只會有一個協(xié)程判斷是否滿足condition
條件,然后執(zhí)行后續(xù)操作。這樣子就不會出現(xiàn)即使不滿足條件,也會執(zhí)行后續(xù)操作的情況出現(xiàn)。
c.L.Lock()for !condition() {
? ?c.Wait()
}// 滿足條件情況下,執(zhí)行的邏輯c.L.Unlock()
4.2 Wait方法接收到通知后,未重新檢查條件變量
調(diào)用sync.Wait
方法,協(xié)程進(jìn)入阻塞狀態(tài)后被喚醒,沒有重新檢查條件變量,此時有可能仍然處于不滿足條件變量的場景下。然后直接執(zhí)行后續(xù)操作,有可能會導(dǎo)致程序出錯。下面舉一個簡單的例子:
package mainimport ( ? ?"fmt"
? ?"sync"
? ?"time")var (
? count int
? cond ?*sync.Cond
? lk ? ?sync.Mutex
)func main() {
? ?cond = sync.NewCond(&lk)
? ?wg := sync.WaitGroup{}
? ?wg.Add(3) ? ?go func() { ? ? ? defer wg.Done() ? ? ? for {
? ? ? ? ?time.Sleep(time.Second)
? ? ? ? ?cond.L.Lock() ? ? ? ? ?// 將flag 設(shè)置為true
? ? ? ? ?flag = true
? ? ? ? ?// 喚醒所有處于等待狀態(tài)的協(xié)程
? ? ? ? ?cond.Broadcast()
? ? ? ? ?cond.L.Unlock()
? ? ? }
? ?}() ? ?
? ?for i := 0; i < 2; i++ { ? ? ? go func(i int) { ? ? ? ? ?defer wg.Done() ? ? ? ? ?for {
? ? ? ? ? ? time.Sleep(time.Millisecond * 500)
? ? ? ? ? ? cond.L.Lock() ? ? ? ? ? ? // 不滿足條件,此時進(jìn)入等待狀態(tài)
? ? ? ? ? ? if !flag {
? ? ? ? ? ? ? ?cond.Wait()
? ? ? ? ? ? } ? ? ? ? ? ? // 被喚醒后,此時可能仍然不滿足條件
? ? ? ? ? ? fmt.Printf("協(xié)程 %d flag = %t", i, flag)
? ? ? ? ? ? flag = false
? ? ? ? ? ? cond.L.Unlock()
? ? ? ? ?}
? ? ? }(i)
? ?}
? ?wg.Wait()
}
在這個例子,我們啟動了一個協(xié)程,定時將flag
設(shè)置為true,相當(dāng)于每隔一段時間,便滿足執(zhí)行條件,然后喚醒所有處于等待狀態(tài)的協(xié)程。
然后又啟動了兩個協(xié)程,在滿足條件的前提下,開始執(zhí)行后續(xù)操作,但是這里協(xié)程被喚醒后,沒有重新檢查條件變量,具體看第39行。這里會出現(xiàn)的場景是,第一個協(xié)程被喚醒后,此時執(zhí)行后續(xù)操作,然后將flag
重新設(shè)置為false,此時已經(jīng)不滿足條件了。之后第二個協(xié)程喚醒后,獲取到鎖,沒有重新檢查此時是否滿足執(zhí)行條件,直接向下執(zhí)行,這個就和我們預(yù)期不符,可能會導(dǎo)致程序出錯,代碼執(zhí)行效果如下:
協(xié)程 1 flag = true協(xié)程 0 flag = false協(xié)程 1 flag = true協(xié)程 0 flag = false
可以看到,此時協(xié)程0執(zhí)行時,flag
的值均為false
,說明此時其實并不符合執(zhí)行條件,可能會導(dǎo)致程序出錯。因此正確用法應(yīng)該像下面這樣子,被喚醒后,需要重新檢查條件變量,滿足條件之后才能繼續(xù)向下執(zhí)行。
c.L.Lock()// 喚醒后,重新檢查條件變量是否滿足條件for !condition() {
? ?c.Wait()
}// 滿足條件情況下,執(zhí)行的邏輯c.L.Unlock()
5.總結(jié)
本文介紹了 Go 語言中的 sync.Cond 并發(fā)原語,它是用于實現(xiàn) goroutine 之間的同步的重要工具。我們首先學(xué)習(xí)了?sync.Cond
?的基本使用方法,包括創(chuàng)建和使用條件變量、使用Wait
和Signal
/Broadcast
方法等。
在接下來的部分中,我們介紹了?sync.Cond
?的實現(xiàn)原理,主要是對等待隊列的使用,從而sync.Cond
有更好的理解,能夠更好得使用它。同時,我們也講述了使用sync.Cond
的注意事項,如調(diào)用Wait
方法前需要加鎖等。
基于以上內(nèi)容,本文完成了對?sync.Cond
?的介紹,希望能夠幫助大家更好地理解和使用Go語言中的并發(fā)原語。