2023-06-19:講一講Redis分布式鎖的實(shí)現(xiàn)?
2023-06-19:講一講Redis分布式鎖的實(shí)現(xiàn)?
答案2023-06-19:
Redis分布式鎖最簡單的實(shí)現(xiàn)
要實(shí)現(xiàn)分布式鎖,確實(shí)需要使用具備互斥性的Redis操作。其中一種常用的方式是使用SETNX
命令,該命令表示"SET if Not Exists",即只有在key不存在時(shí)才設(shè)置其值,否則不進(jìn)行任何操作。通過這種方式,兩個(gè)客戶端進(jìn)程可以執(zhí)行SETNX
命令來實(shí)現(xiàn)互斥,從而達(dá)到分布式鎖的目的。
下面是一個(gè)示例:
客戶端1申請加鎖,加鎖成功:
SETNX?lock_key?1
客戶端2申請加鎖,由于它處于較晚的時(shí)間,加鎖失?。?/p>
SETNX?lock_key?1
通過這種方式,您可以使用Redis的互斥性來實(shí)現(xiàn)簡單的分布式鎖機(jī)制。

對于加鎖成功的客戶端,可以執(zhí)行對共享資源的操作,比如修改MySQL的某一行數(shù)據(jù)或調(diào)用API請求。
操作完成后,需要及時(shí)釋放鎖,以便后續(xù)的請求能夠訪問共享資源。釋放鎖非常簡單,只需使用DEL
命令來刪除相應(yīng)的鎖鍵(key)即可。
下面是釋放鎖的示例邏輯:
DEL?lock_key
通過執(zhí)行以上DEL
命令,成功釋放鎖,以讓后續(xù)的請求能夠獲得鎖并執(zhí)行操作共享資源的邏輯。
這樣,通過使用SETNX
命令進(jìn)行加鎖,然后使用DEL
命令釋放鎖,您就可以實(shí)現(xiàn)基本的分布式鎖機(jī)制。

但是,它存在一個(gè)很大的問題,當(dāng)客戶端 1 拿到鎖后,如果發(fā)生下面的場景,就會造成「死鎖」:
1、程序處理業(yè)務(wù)邏輯異常,沒有及時(shí)釋放鎖。
2、進(jìn)程崩潰或意外停止,無法釋放鎖。
在這種情況下,客戶端將永遠(yuǎn)占用該鎖,其他客戶端將無法獲取該鎖。如何解決這個(gè)問題呢?
如何避免死鎖?
當(dāng)考慮在申請鎖時(shí)為其設(shè)置一個(gè)「租期」時(shí),可以在Redis中通過設(shè)置「過期時(shí)間」來實(shí)現(xiàn)。假設(shè)我們假設(shè)操作共享資源的時(shí)間不會超過10秒,在加鎖時(shí),可以給該key設(shè)置一個(gè)10秒的過期時(shí)間即可。這樣做可以確保在申請鎖后的一段時(shí)間內(nèi),如果鎖的持有者在該時(shí)間內(nèi)沒有更新鎖的過期時(shí)間,鎖將會自動過期,從而防止鎖被永久占用
SETNX?lock?1????//?加鎖
EXPIRE?lock?10??//?10s后自動過期

這樣一來,無論客戶端是否異常,這個(gè)鎖都可以在 10s 后被「自動釋放」,其它客戶端依舊可以拿到鎖。
但現(xiàn)在還是有問題:
當(dāng)前的操作是將加鎖和設(shè)置過期時(shí)間作為兩個(gè)獨(dú)立的命令執(zhí)行,存在一個(gè)問題,即可能只執(zhí)行了第一條命令而第二條命令卻未能及時(shí)執(zhí)行,從而導(dǎo)致問題。例如:
??SETNX 命令執(zhí)行成功后,由于網(wǎng)絡(luò)問題導(dǎo)致 EXPIRE 命令執(zhí)行失敗。
??SETNX 命令執(zhí)行成功后,Redis 異常宕機(jī),導(dǎo)致 EXPIRE 命令沒有機(jī)會執(zhí)行。
??SETNX 命令執(zhí)行成功后,客戶端異常崩潰,同樣導(dǎo)致 EXPIRE 命令沒有機(jī)會執(zhí)行。
總之,這兩條命令不能保證是原子操作(一起成功),就有潛在的風(fēng)險(xiǎn)導(dǎo)致過期時(shí)間設(shè)置失敗,依舊發(fā)生「死鎖」問題。
幸運(yùn)的是,在 Redis 2.6.12 版本之后,Redis 擴(kuò)展了 SET 命令的參數(shù)。用這一條命令就可以了:
SET?lock?1?EX?10?NX

鎖被別人釋放怎么辦?
上面的命令執(zhí)行時(shí),每個(gè)客戶端在釋放鎖時(shí),并沒有進(jìn)行嚴(yán)格的驗(yàn)證,存在釋放別人鎖的潛在風(fēng)險(xiǎn)。為了解決這個(gè)問題,可以在加鎖時(shí)為每個(gè)客戶端設(shè)置一個(gè)唯一的標(biāo)識符(unique identifier),并在解鎖時(shí)對比標(biāo)識符來驗(yàn)證是否有權(quán)釋放鎖。
例如,可以是自己的線程 ID,也可以是一個(gè) UUID(隨機(jī)且唯一),這里我們以UUID 舉例:
SET?lock?$uuid?EX?20?NX
之后,在釋放鎖時(shí),要先判斷這把鎖是否還歸自己持有,偽代碼可以這么寫:
if?redis.get("lock")?==?$uuid:
????redis.del("lock")
這里釋放鎖使用的是 GET + DEL 兩條命令,這時(shí),又會遇到我們前面講的原子性問題了。這里可以使用lua腳本來解決。
安全釋放鎖的 Lua 腳本如下:
if?redis.call("GET",KEYS[1])?==?ARGV[1]
then
????return?redis.call("DEL",KEYS[1])
else
????return?0
end
好了,這樣一路優(yōu)化,整個(gè)的加鎖、解鎖的流程就更「嚴(yán)謹(jǐn)」了。
這里我們先小結(jié)一下,基于 Redis 實(shí)現(xiàn)的分布式鎖,一個(gè)嚴(yán)謹(jǐn)?shù)牡牧鞒倘缦拢?/p>
1、加鎖
SET?lock_key?$unique_id?EX?$expire_time?NX
2、操作共享資源
3、釋放鎖:Lua 腳本,先 GET 判斷鎖是否歸屬自己,再DEL 釋放鎖
go代碼實(shí)現(xiàn)分布式鎖
package?main
import?(
????"context"
????"fmt"
????"sync"
????"time"
????"github.com/go-redis/redis/v8"
????"github.com/google/uuid"
)
const?(
????LockTime?????????=?5?*?time.Second
????RS_DISTLOCK_NS???=?"tdln:"
????RELEASE_LOCK_LUA?=?`
????????if?redis.call('get',KEYS[1])==ARGV[1]?then
????????????return?redis.call('del',?KEYS[1])
????????else
????????????return?0
????????end
????`
)
type?RedisDistLock?struct?{
????id??????????string
????lockName????string
????redisClient?*redis.Client
????m???????????sync.Mutex
}
func?NewRedisDistLock(redisClient?*redis.Client,?lockName?string)?*RedisDistLock?{
????return?&RedisDistLock{
????????lockName:????lockName,
????????redisClient:?redisClient,
????}
}
func?(this?*RedisDistLock)?Lock()?{
????for?!this.TryLock()?{
????????time.Sleep(100?*?time.Millisecond)
????}
}
func?(this?*RedisDistLock)?TryLock()?bool?{
????if?this.id?!=?""?{
????????//?處于加鎖中
????????return?false
????}
????this.m.Lock()
????defer?this.m.Unlock()
????if?this.id?!=?""?{
????????//?處于加鎖中
????????return?false
????}
????ctx?:=?context.Background()
????id?:=?uuid.New().String()
????reply?:=?this.redisClient.SetNX(ctx,?RS_DISTLOCK_NS+this.lockName,?id,?LockTime)
????if?reply.Err()?==?nil?&&?reply.Val()?{
????????this.id?=?id
????????return?true
????}
????return?false
}
func?(this?*RedisDistLock)?Unlock()?{
????if?this.id?==?""?{
????????//?未加鎖
????????panic("解鎖失敗,因?yàn)槲醇渔i")
????}
????this.m.Lock()
????defer?this.m.Unlock()
????if?this.id?==?""?{
????????//?未加鎖
????????panic("解鎖失敗,因?yàn)槲醇渔i")
????}
????ctx?:=?context.Background()
????reply?:=?this.redisClient.Eval(ctx,?RELEASE_LOCK_LUA,?[]string{RS_DISTLOCK_NS?+?this.lockName},?this.id)
????if?reply.Err()?!=?nil?{
????????panic("釋放鎖失?。?#34;)
????}?else?{
????????this.id?=?""
????}
}
func?main()?{
????client?:=?redis.NewClient(&redis.Options{
????????Addr:?"172.16.11.111:64495",
????})
????const?LOCKNAME?=?"百家號:福大大架構(gòu)師每日一題"
????lock?:=?NewRedisDistLock(client,?LOCKNAME)
????lock.Lock()
????fmt.Println("加鎖main")
????ch?:=?make(chan?struct{})
????go?func()?{
????????lock?:=?NewRedisDistLock(client,?LOCKNAME)
????????lock.Lock()
????????fmt.Println("加鎖go程")
????????lock.Unlock()
????????fmt.Println("解鎖go程")
????????ch?<-?struct{}{}
????}()
????time.Sleep(time.Second?*?2)
????lock.Unlock()
????fmt.Println("解鎖main")
????<-ch
}

鎖過期時(shí)間不好評估怎么辦?

看上面這張圖,加入key的失效時(shí)間是10s,但是客戶端C在拿到分布式鎖之后,然后業(yè)務(wù)邏輯執(zhí)行超過10s,那么問題來了,在客戶端C釋放鎖之前,其實(shí)這把鎖已經(jīng)失效了,那么客戶端A和客戶端B都可以去拿鎖,這樣就已經(jīng)失去了分布式鎖的功能了?。?!
比較簡單的妥協(xié)方案是,盡量「冗余」過期時(shí)間,降低鎖提前過期的概率,但是這個(gè)并不能完美解決問題,那怎么辦呢?
分布式鎖加入看門狗
在加鎖過程中,可以設(shè)置一個(gè)過期時(shí)間,并啟動一個(gè)守護(hù)線程(也稱為「看門狗」線程),定時(shí)檢測鎖的剩余有效時(shí)間。如果鎖即將過期,但共享資源操作尚未完成,守護(hù)線程可以自動對鎖進(jìn)行續(xù)期,重新設(shè)置過期時(shí)間。
為什么要使用守護(hù)線程:

go中的紅鎖
package?main
import?(
????"fmt"
????"time"
????"github.com/go-redis/redis/v8"
????"github.com/go-redsync/redsync/v4"
????"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func?main()?{
????client?:=?redis.NewClient(&redis.Options{
????????Addr:?????"172.16.11.111:64495",
????????Password:?"",?//?如果有密碼,請?zhí)峁┟艽a
????????DB:???????0,??//?如果使用不同的數(shù)據(jù)庫,請修改為準(zhǔn)確的數(shù)據(jù)庫編號
????})
????pool?:=?goredis.NewPool(client)
????const?LOCKNAME?=?"百家號:福大大架構(gòu)師每日一題"
????redsync?:=?redsync.New(pool)
????mutex?:=?redsync.NewMutex(LOCKNAME)
????if?err?:=?mutex.Lock();?err?!=?nil?{
????????fmt.Println("加鎖失敗:",?err)
????????return
????}
????fmt.Println("加鎖main")
????ch?:=?make(chan?struct{})
????go?func()?{
????????mutex?:=?redsync.NewMutex(LOCKNAME)
????????if?err?:=?mutex.Lock();?err?!=?nil?{
????????????fmt.Println("加鎖失敗:",?err)
????????????return
????????}
????????fmt.Println("加鎖go程")
????????mutex.Unlock()
????????fmt.Println("解鎖go程")
????????ch?<-?struct{}{}
????}()
????time.Sleep(time.Second?*?2)
????mutex.Unlock()
????fmt.Println("解鎖main")
????<-ch
}
