Python 中最常用的線程鎖有哪些?
鎖是Python提供給我們能夠自行操控線程切換的一種手段,使用鎖可以讓線程的切換變的有序。
一旦線程的切換變的有序后,各個(gè)線程之間對(duì)數(shù)據(jù)的訪問(wèn)、修改就變的可控,所以若要保證線程安全,就必須使用鎖。
hreading模塊中提供了5種最常見(jiàn)的鎖,下面是按照功能進(jìn)行劃分:
·?同步鎖:lock(一次只能放行一個(gè))
·?遞歸鎖:rlock(一次只能放行一個(gè))
·?條件鎖:condition(一次可以放行任意個(gè))
·?事件鎖:event(一次全部放行)
·?信號(hào)量鎖:semaphore(一次可以放行特定個(gè))
1、Lock() 同步鎖
基本介紹
Lock鎖的稱呼有很多,如:
·?同步鎖
·?互斥鎖
它們是什么意思呢?如下所示:
00001.?互斥指的是某一資源同一時(shí)刻僅能有一個(gè)訪問(wèn)者對(duì)其進(jìn)行訪問(wèn),具有唯一性和排他性,但是互斥無(wú)法限制訪問(wèn)者對(duì)資源的訪問(wèn)順序,即訪問(wèn)是無(wú)序的
00002.?同步是指在互斥的基礎(chǔ)上(大多數(shù)情況),通過(guò)其他機(jī)制實(shí)現(xiàn)訪問(wèn)者對(duì)資源的有序訪問(wèn)
00003.?同步其實(shí)已經(jīng)實(shí)現(xiàn)了互斥,是互斥的一種更為復(fù)雜的實(shí)現(xiàn),因?yàn)樗诨コ獾幕A(chǔ)上實(shí)現(xiàn)了有序訪問(wèn)的特點(diǎn)
下面是threading模塊與同步鎖提供的相關(guān)方法:
?
使用方式
同步鎖一次只能放行一個(gè)線程,一個(gè)被加鎖的線程在運(yùn)行時(shí)不會(huì)將執(zhí)行權(quán)交出去,只有當(dāng)該線程被解鎖時(shí)才會(huì)將執(zhí)行權(quán)通過(guò)系統(tǒng)調(diào)度交由其他線程。
如下所示,使用同步鎖解決最上面的問(wèn)題:
import threading
?
num = 0
?
?
def add():
????lock.acquire()
????global num
????for i in range(10_000_000):
????????num += 1
????lock.release()
?
?
def sub():
????lock.acquire()
????global num
????for i in range(10_000_000):
????????num -= 1
????lock.release()
?
if __name__ == "__main__":
????lock = threading.Lock()
?
????subThread01 = threading.Thread(target=add)
????subThread02 = threading.Thread(target=sub)
?
????subThread01.start()
????subThread02.start()
?
????subThread01.join()
????subThread02.join()
?
????print("num result : %s" % num)
?
# 結(jié)果三次采集
# num result : 0
# num result : 0
# num result : 0
?
這樣這個(gè)代碼就完全變成了串行的狀態(tài),對(duì)于這種計(jì)算密集型I/O業(yè)務(wù)來(lái)說(shuō),還不如直接使用串行化單線程執(zhí)行來(lái)得快,所以這個(gè)例子僅作為一個(gè)示例,不能概述鎖真正的用途。
死鎖現(xiàn)象
對(duì)于同步鎖來(lái)說(shuō),一次acquire()必須對(duì)應(yīng)一次release(),不能出現(xiàn)連續(xù)重復(fù)使用多次acquire()后再重復(fù)使用多次release()的操作,這樣會(huì)引起死鎖造成程序的阻塞,完全不動(dòng)了,如下所示:
import threading
?
num = 0
?
?
def add():
????lock.acquire() ?# 上鎖
????lock.acquire() ?# 死鎖
????# 不執(zhí)行
????global num
????for i in range(10_000_000):
????????num += 1
????lock.release()
????lock.release()
?
?
def sub():
????lock.acquire() ?# 上鎖
????lock.acquire() ?# 死鎖
????# 不執(zhí)行
????global num
????for i in range(10_000_000):
????????num -= 1
????lock.release()
????lock.release()
?
?
if __name__ == "__main__":
????lock = threading.Lock()
?
????subThread01 = threading.Thread(target=add)
????subThread02 = threading.Thread(target=sub)
?
????subThread01.start()
????subThread02.start()
?
????subThread01.join()
????subThread02.join()
?
????print("num result : %s" % num)
?
with語(yǔ)句
由于threading.Lock()對(duì)象中實(shí)現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語(yǔ)句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading
?
num = 0
?
?
def add():
????with lock:
????????# 自動(dòng)加鎖
????????global num
????????for i in range(10_000_000):
????????????num += 1
????????# 自動(dòng)解鎖
?
?
def sub():
????with lock:
????????# 自動(dòng)加鎖
????????global num
????????for i in range(10_000_000):
????????????num -= 1
????????# 自動(dòng)解鎖
?
?
if __name__ == "__main__":
????lock = threading.Lock()
?
????subThread01 = threading.Thread(target=add)
????subThread02 = threading.Thread(target=sub)
?
????subThread01.start()
????subThread02.start()
?
????subThread01.join()
????subThread02.join()
?
????print("num result : %s" % num)
????
# 結(jié)果三次采集
# num result : 0
# num result : 0
# num result : 0
?
2、RLock() 遞歸鎖
基本介紹
遞歸鎖是同步鎖的一個(gè)升級(jí)版本,在同步鎖的基礎(chǔ)上可以做到連續(xù)重復(fù)使用多次acquire()后再重復(fù)使用多次release()的操作,但是一定要注意加鎖次數(shù)和解鎖次數(shù)必須一致,否則也將引發(fā)死鎖現(xiàn)象。
下面是threading模塊與遞歸鎖提供的相關(guān)方法:
?
使用方式
以下是遞歸鎖的簡(jiǎn)單使用,下面這段操作如果使用同步鎖則會(huì)發(fā)生死鎖現(xiàn)象,但是遞歸鎖不會(huì):
import threading
?
num = 0
?
?
def add():
????lock.acquire()
????lock.acquire()
????global num
????for i in range(10_000_000):
????????num += 1
????lock.release()
????lock.release()
?
?
def sub():
????lock.acquire()
????lock.acquire()
????global num
????for i in range(10_000_000):
????????num -= 1
????lock.release()
????lock.release()
?
?
if __name__ == "__main__":
????lock = threading.RLock()
?
????subThread01 = threading.Thread(target=add)
????subThread02 = threading.Thread(target=sub)
?
????subThread01.start()
????subThread02.start()
?
????subThread01.join()
????subThread02.join()
?
????print("num result : %s" % num)
?
# 結(jié)果三次采集
# num result : 0
# num result : 0
# num result : 0
?
with語(yǔ)句
由于threading.RLock()對(duì)象中實(shí)現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語(yǔ)句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading
?
num = 0
?
?
def add():
????with lock:
????????# 自動(dòng)加鎖
????????global num
????????for i in range(10_000_000):
????????????num += 1
????????# 自動(dòng)解鎖
?
?
def sub():
????with lock:
????????# 自動(dòng)加鎖
????????global num
????????for i in range(10_000_000):
????????????num -= 1
????????# 自動(dòng)解鎖
?
?
if __name__ == "__main__":
????lock = threading.RLock()
?
????subThread01 = threading.Thread(target=add)
????subThread02 = threading.Thread(target=sub)
?
????subThread01.start()
????subThread02.start()
?
????subThread01.join()
????subThread02.join()
?
????print("num result : %s" % num)
?
# 結(jié)果三次采集
# num result : 0
# num result : 0
# num result : 0
?
3、Condition() 條件鎖
基本介紹
條件鎖是在遞歸鎖的基礎(chǔ)上增加了能夠暫停線程運(yùn)行的功能。并且我們可以使用wait()與notify()來(lái)控制線程執(zhí)行的個(gè)數(shù)。
注意:條件鎖可以自由設(shè)定一次放行幾個(gè)線程。
下面是threading模塊與條件鎖提供的相關(guān)方法:
?
使用方式
下面這個(gè)案例會(huì)啟動(dòng)10個(gè)子線程,并且會(huì)立即將10個(gè)子線程設(shè)置為等待狀態(tài)。
然后我們可以發(fā)送一個(gè)或者多個(gè)通知,來(lái)恢復(fù)被等待的子線程繼續(xù)運(yùn)行:
import?threading
currentRunThreadNumber?=?0maxSubThreadNumber?=?10
?
def?task():
????global?currentRunThreadNumber
????thName?=?threading.currentThread().name
?
????condLock.acquire()??# 上鎖
????print("start and wait run thread : %s"?%?thName)
?
????condLock.wait()??# 暫停線程運(yùn)行、等待喚醒
????currentRunThreadNumber?+=?1
????print("carry on run thread : %s"?%?thName)
?
????condLock.release()??# 解鎖
?
if?__name__?==?"__main__":
????condLock?=?threading.Condition()
?
????for?i?in?range(maxSubThreadNumber):
????????subThreadIns?=?threading.Thread(target=task)
????????subThreadIns.start()
?
????while?currentRunThreadNumber?<?maxSubThreadNumber:
????????notifyNumber?=?int(
????????????input("Please enter the number of threads that need to be notified to run:"))
?
????????condLock.acquire()
????????condLock.notify(notifyNumber)??# 放行
????????condLock.release()
?
????print("main thread run end")
????# 先啟動(dòng)10個(gè)子線程,然后這些子線程會(huì)全部變?yōu)榈却隣顟B(tài)# start and wait run thread : Thread-1# start and wait run thread : Thread-2# start and wait run thread : Thread-3# start and wait run thread : Thread-4# start and wait run thread : Thread-5# start and wait run thread : Thread-6# start and wait run thread : Thread-7# start and wait run thread : Thread-8# start and wait run thread : Thread-9# start and wait run thread : Thread-10
# 批量發(fā)送通知,放行特定數(shù)量的子線程繼續(xù)運(yùn)行# Please enter the number of threads that need to be notified to run:5 ?# 放行5個(gè)# carry on run thread : Thread-4# carry on run thread : Thread-3# carry on run thread : Thread-1# carry on run thread : Thread-2# carry on run thread : Thread-5
# Please enter the number of threads that need to be notified to run:5 ?# 放行5個(gè)# carry on run thread : Thread-8# carry on run thread : Thread-10# carry on run thread : Thread-6# carry on run thread : Thread-9# carry on run thread : Thread-7
# Please enter the number of threads that need to be notified to run:1# main thread run end
?
with語(yǔ)句
由于threading.Condition()對(duì)象中實(shí)現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語(yǔ)句進(jìn)行上下文管理形式的加鎖解鎖操作:
import?threading
currentRunThreadNumber?=?0maxSubThreadNumber?=?10
?
def?task():
????global?currentRunThreadNumber
????thName?=?threading.currentThread().name
?
????with?condLock:
????????print("start and wait run thread : %s"?%?thName)
????????condLock.wait()??# 暫停線程運(yùn)行、等待喚醒
????????currentRunThreadNumber?+=?1
????????print("carry on run thread : %s"?%?thName)
?
if?__name__?==?"__main__":
????condLock?=?threading.Condition()
?
????for?i?in?range(maxSubThreadNumber):
????????subThreadIns?=?threading.Thread(target=task)
????????subThreadIns.start()
?
????while?currentRunThreadNumber?<?maxSubThreadNumber:
????????notifyNumber?=?int(
????????????input("Please enter the number of threads that need to be notified to run:"))
?
????????with?condLock:
????????????condLock.notify(notifyNumber)??# 放行
?
????print("main thread run end")
?
4、Event() 事件鎖
基本介紹
事件鎖是基于條件鎖來(lái)做的,它與條件鎖的區(qū)別在于一次只能放行全部,不能放行任意個(gè)數(shù)量的子線程繼續(xù)運(yùn)行。
我們可以將事件鎖看為紅綠燈,當(dāng)紅燈時(shí)所有子線程都暫停運(yùn)行,并進(jìn)入“等待”狀態(tài),當(dāng)綠燈時(shí)所有子線程都恢復(fù)“運(yùn)行”。
下面是threading模塊與事件鎖提供的相關(guān)方法:
?
使用方式
事件鎖不能利用with語(yǔ)句來(lái)進(jìn)行使用,只能按照常規(guī)方式。
如下所示,我們來(lái)模擬線程和紅綠燈的操作,紅燈停,綠燈行:
import?threading
maxSubThreadNumber?=?3
?
def?task():
????thName?=?threading.currentThread().name
????print("start and wait run thread : %s"?%?thName)
????eventLock.wait()??# 暫停運(yùn)行,等待綠燈
????print("green light, %s carry on run"?%?thName)
????print("red light, %s stop run"?%?thName)
????eventLock.wait()??# 暫停運(yùn)行,等待綠燈
????print("green light, %s carry on run"?%?thName)
????print("sub thread %s run end"?%?thName)
?
if?__name__?==?"__main__":
?
????eventLock?=?threading.Event()
?
????for?i?in?range(maxSubThreadNumber):
????????subThreadIns?=?threading.Thread(target=task)
????????subThreadIns.start()
?
????eventLock.set()??# 設(shè)置為綠燈
????eventLock.clear()??# 設(shè)置為紅燈
????eventLock.set()??# 設(shè)置為綠燈
# start and wait run thread : Thread-1# start and wait run thread : Thread-2# start and wait run thread : Thread-3
# green light, Thread-1 carry on run# red light, Thread-1 stop run# green light, Thread-1 carry on run# sub thread Thread-1 run end
# green light, Thread-3 carry on run# red light, Thread-3 stop run# green light, Thread-3 carry on run# sub thread Thread-3 run end
# green light, Thread-2 carry on run# red light, Thread-2 stop run# green light, Thread-2 carry on run# sub thread Thread-2 run end
?
5、Semaphore() 信號(hào)量鎖
基本介紹
信號(hào)量鎖也是根據(jù)條件鎖來(lái)做的,它與條件鎖和事件鎖的區(qū)別如下:
·?條件鎖:一次可以放行任意個(gè)處于“等待”狀態(tài)的線程
·?事件鎖:一次可以放行全部的處于“等待”狀態(tài)的線程
·?信號(hào)量鎖:通過(guò)規(guī)定,成批的放行特定個(gè)處于“上鎖”狀態(tài)的線程
下面是threading模塊與信號(hào)量鎖提供的相關(guān)方法:
?
使用方式
以下是使用示例,你可以將它當(dāng)做一段限寬的路段,每次只能放行相同數(shù)量的線程:
import?threadingimport?time
maxSubThreadNumber?=?6
?
def?task():
????thName?=?threading.currentThread().name
????semaLock.acquire()
????print("run sub thread %s"?%?thName)
????time.sleep(3)
????semaLock.release()
?
if?__name__?==?"__main__":
????# 每次只能放行2個(gè)
????semaLock?=?threading.Semaphore(2)
?
????for?i?in?range(maxSubThreadNumber):
????????subThreadIns?=?threading.Thread(target=task)
????????subThreadIns.start()
?
# run sub thread Thread-1# run sub thread Thread-2
# run sub thread Thread-3# run sub thread Thread-4
# run sub thread Thread-6# run sub thread Thread-5
?
with語(yǔ)句
由于threading.Semaphore()對(duì)象中實(shí)現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語(yǔ)句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading
import time
?
maxSubThreadNumber = 6
?
?
def task():
????thName = threading.currentThread().name
????with semaLock:
????????print("run sub thread %s" % thName)
????????time.sleep(3)
?
?
if __name__ == "__main__":
?
????semaLock = threading.Semaphore(2)
?
????for i in range(maxSubThreadNumber):
????????subThreadIns = threading.Thread(target=task)
????????subThreadIns.start()
?
鎖關(guān)系淺析
上面5種鎖可以說(shuō)都是基于同步鎖來(lái)做的,這些你都可以從源碼中找到答案。
首先來(lái)看RLock遞歸鎖,遞歸鎖的實(shí)現(xiàn)非常簡(jiǎn)單,它的內(nèi)部會(huì)維護(hù)著一個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)器不為0的時(shí)候該線程不能被I/O操作和時(shí)間輪詢機(jī)制切換。但是當(dāng)計(jì)數(shù)器為0的時(shí)候便不會(huì)如此了:
def?__init__(self):
????self._block?=?_allocate_lock()
????self._owner?=?None
????self._count?=?0??# 計(jì)數(shù)器
?
而Condition條件鎖的內(nèi)部其實(shí)是有兩把鎖的,一把底層鎖(同步鎖)一把高級(jí)鎖(遞歸鎖)。
低層鎖的解鎖方式有兩種,使用wait()方法會(huì)暫時(shí)解開(kāi)底層鎖同時(shí)加上一把高級(jí)鎖,只有當(dāng)接收到別的線程里的notfiy()后才會(huì)解開(kāi)高級(jí)鎖和重新上鎖低層鎖,也就是說(shuō)條件鎖底層是根據(jù)同步鎖和遞歸鎖的不斷切換來(lái)進(jìn)行實(shí)現(xiàn)的:
def?__init__(self,?lock=None):
????if?lock?is?None:
????????lock?=?RLock()??# 可以看到條件鎖的內(nèi)部是基于遞歸鎖,而遞歸鎖又是基于同步鎖來(lái)做的
????self._lock?=?lock
?
????self.acquire?=?lock.acquire
????self.release?=?lock.release
????try:
????????self._release_save?=?lock._release_save
????except?AttributeError:
????????pass
????try:
????????self._acquire_restore?=?lock._acquire_restore
????except?AttributeError:
????????pass
????try:
????????self._is_owned?=?lock._is_owned
????except?AttributeError:
????????pass
????self._waiters?=?_deque()
?
Event事件鎖內(nèi)部是基于條件鎖來(lái)做的:
class?Event:
?
????def?__init__(self):
????????self._cond?=?Condition(Lock())??# 實(shí)例化出了一個(gè)條件鎖。
????????self._flag?=?False
?
????def?_reset_internal_locks(self):
????????# private! ?called by Thread._reset_internal_locks by _after_fork()
????????self._cond.__init__(Lock())
?
????def?is_set(self):
????????"""Return true if and only if the internal flag is true."""
????????return?self._flag
?
????isSet?=?is_set
?
Semaphore信號(hào)量鎖內(nèi)部也是基于條件鎖來(lái)做的:
class?Semaphore:
?
????def?__init__(self,?value=1):
????????if?value?<?0:
????????????raise?ValueError("semaphore initial value must be >= 0")
????????self._cond?=?Condition(Lock())?# 可以看到,這里是實(shí)例化出了一個(gè)條件鎖
????????self._value?=?value
?
基本練習(xí)題
條件鎖的應(yīng)用
需求:一個(gè)空列表,兩個(gè)線程輪番往里面加值(一個(gè)加偶數(shù),一個(gè)加奇數(shù)),最終讓該列表中的值為 1 - 100 ,且是有序排列的。
import?threading
lst?=?[]
?
def?even():
????"""加偶數(shù)"""
????with?condLock:
????????for?i?in?range(2,?101,?2):
????????????# 判斷當(dāng)前列表的長(zhǎng)度處于2是否能處盡
????????????# 如果能處盡則代表需要添加奇數(shù)
????????????# 否則就添加偶數(shù)
????????????if?len(lst)?%?2?!=?0:
????????????????# 添偶數(shù)
????????????????lst.append(i)??????# 先添加值
????????????????condLock.notify()??# 告訴另一個(gè)線程,你可以加奇數(shù)了,但是這里不會(huì)立即交出執(zhí)行權(quán)
????????????????condLock.wait()????# 交出執(zhí)行權(quán),并等待另一個(gè)線程通知加偶數(shù)
????????????else:
????????????????# 添奇數(shù)
????????????????condLock.wait()??# 交出執(zhí)行權(quán),等待另一個(gè)線程通知加偶數(shù)
????????????????lst.append(i)????
????????????????condLock.notify()
????????condLock.notify()
?
def?odd():
????"""加奇數(shù)"""
????with?condLock:
????????for?i?in?range(1,?101,?2):
????????????if?len(lst)?%?2?==?0:
????????????????lst.append(i)
????????????????condLock.notify()
????????????????condLock.wait()
????????condLock.notify()
?
if?__name__?==?"__main__":
????condLock?=?threading.Condition()
?
????addEvenTask?=?threading.Thread(target=even)
????addOddTask?=?threading.Thread(target=odd)
?
????addEvenTask.start()
????addOddTask.start()
?
????addEvenTask.join()
????addOddTask.join()
?
????print(lst)
?
事件鎖的應(yīng)用
有2個(gè)任務(wù)線程來(lái)扮演李白和杜甫,如何讓他們一人一句進(jìn)行對(duì)答?文本如下:
杜甫:老李啊,來(lái)喝酒!
李白:老杜啊,不喝了我喝不下了!
杜甫:老李啊,再來(lái)一壺?
杜甫:...老李?
李白:呼呼呼...睡著了..
代碼如下:
import?threading
?
def?libai():
????event.wait()??
????print("李白:老杜啊,不喝了我喝不下了!")
????event.set()
????event.clear()
????event.wait()
????print("李白:呼呼呼...睡著了..")
def?dufu():
????print("杜甫:老李啊,來(lái)喝酒!")
????event.set()??
????event.clear()
????event.wait()
????print("杜甫:老李啊,再來(lái)一壺?")
????print("杜甫:...老李?")
????event.set()
?
if?__name__?==?'__main__':
?
????event?=?threading.Event()
?
????t1?=?threading.Thread(target=libai)
????t2?=?threading.Thread(target=dufu)
?
????t1.start()
????t2.start()
????t1.join()
????t2.join()
?