Python 進程與線程 (筆記補充)
這篇專欄補全原視頻筆記沒有出現(xiàn)的進程與線程
專欄有錯誤的地方歡迎指出
WPS筆記及其他內(nèi)容百度網(wǎng)盤鏈接:
https://pan.baidu.com/s/1fTwjyoM81_OOAccPNhGE9Q?pwd=h3fg
參考:
https://www.bilibili.com/video/BV1qt4y1T7M2/?spm_id_from=333.999.0.0
https://docs.python.org/zh-cn/3.8/library/multiprocessing.html
https://docs.python.org/zh-cn/3.8/library/threading.html
https://www.cnblogs.com/shuaiyao666/p/16126113.html
https://www.cnblogs.com/shuaiyao666/p/16126624.html
https://blog.csdn.net/a883774913/article/details/125375440
https://www.shuzhiduo.com/A/l1dyVEjGze/
部分內(nèi)容來源網(wǎng)絡

(1)、程序(Program)
使用編程語言所編寫的指令的集合,用于實現(xiàn)一定的功能
(2)、多任務的方式
1.在一個應用程序內(nèi)使用多個進程
2.在一個進程內(nèi)使用多個線程
(3)、進程(Process)
1.概念
進程(Process)是計算機中的程序關(guān)于某數(shù)據(jù)集合上的一次運行活動,是系統(tǒng)進行資源分配和調(diào)度的基本單位,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ)。啟動后的程序在系統(tǒng)中就有一個進程,系統(tǒng)會為這個進程分配內(nèi)存空間
進程是資源分配的最小單位
# 可以在Windows任務管理器中查看進程
2.創(chuàng)建進程的方式
2.1.os.fork()函數(shù)
只適用于Unix、Linux、Mac操作系統(tǒng)
2.2.multiprocessing模塊中的Process類
2.3.自定義Process類的子類
2.4.Pool進程池
補充:os模塊的getpid()和getppid()函數(shù)
分別返回PID(當前進程ID)和PPID(父進程ID)
# PID(Process ID), PPID(Parent Process ID)
3.使用multiprocessing模塊創(chuàng)建進程
下面展示一個示例,具體內(nèi)容后文說明
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
主進程開始運行
子進程運行,PID是:5460,PPID是:6000
子進程運行,PID是:7236,PPID是:6000
子進程運行,PID是:1904,PPID是:6000
子進程運行,PID是:7284,PPID是:6000
子進程運行,PID是:8896,PPID是:6000
主進程結(jié)束運行,運行時間是3.252000093460083s
補充:進程的三種基本狀態(tài)
1.就緒狀態(tài)(Ready):只剩下CPU需要執(zhí)行外,其他所有資源都已分配完畢
2.執(zhí)行狀態(tài)(Running):CPU開始執(zhí)行該進程
3.阻塞狀態(tài)(Blocked):由于等待某個事件發(fā)生而無法執(zhí)行
(4)、Process類
Process是multiprocessing模塊中的一個類,通過創(chuàng)建一個Process對象然后調(diào)用對象的start()方法來生成進程
1.創(chuàng)建Process類的對象
應始終使用關(guān)鍵字參數(shù)調(diào)用構(gòu)造函數(shù)(或構(gòu)造方法/構(gòu)造器),構(gòu)造函數(shù)如下:
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group——應始終是None,兼容作用
target——由run()方法調(diào)用的可調(diào)用對象。默認為None,意味什么都不調(diào)用
name——進程名稱
args, kwargs——調(diào)用target時傳入的位置參數(shù)和關(guān)鍵字參數(shù)
daemon——進程的守護標志
# args可以傳入列表或元組
2.Process類的常用方法和屬性
2.1.name屬性
進程的名稱。該名稱是一個字符串,僅用于識別目的,沒有語義。可以為多個進程指定相同的名稱。初始名稱由構(gòu)造函數(shù)設定(默認為'Process-N1:N2:...:Nk',其中每個Nk是其父進程的第N個子進程)
2.2.pid屬性
進程的PID。在生成進程前的值為None
2.3.daemon屬性
進程的守護標志,是一個布爾值,必須在start()被調(diào)用之前設置
# 初始值是在構(gòu)造函數(shù)中設置的值。在構(gòu)造函數(shù)中設置的值如果是None(默認值),則從創(chuàng)建的進程繼承(主進程默認為False)
# 在start()被調(diào)用之前可以使用動態(tài)綁定屬性設置此屬性
不允許在守護程序中創(chuàng)建子進程。因為當守護進程由于父進程而退出中斷時,其子進程會變成孤兒進程
補充:守護進程(Daemon)
可以給子進程守護進程的標志,該進程會隨著父進程代碼執(zhí)行完畢而強制終止(為父進程守護)
# 注意,父進程代碼執(zhí)行完畢并不代表父進程結(jié)束
補充:孤兒進程(Orphan Process)
指的是在其父進程執(zhí)行完成或被終止后仍繼續(xù)運行的一類進程。也就是父進程不存在了,子進程還在運行
示例:
運行結(jié)果:
pro1正在運行??# 3秒后出現(xiàn)
pro2正在運行??# 3秒后出現(xiàn)
主進程代碼執(zhí)行完畢??# 5秒后出現(xiàn),此時主進程的守護進程pro1被終止
pro2正在運行??# 6秒后出現(xiàn)
pro2正在運行??# 9秒后出現(xiàn)
...
2.4.is_alive()
如果進程對象處于活動狀態(tài)則返回True,否則返回False
2.5.join([timeout])
阻塞調(diào)用的進程,直到調(diào)用join()方法對應的進程對象終止。如果指定了timeout,則最多會阻塞timeout秒。此方法在進程終止或方法超時時返回None
一個進程可以被join多次。進程無法join自己,因為這會導致死鎖。不能在啟動進程之前join進程
2.6.start()
啟動進程活動。此方法每個進程對象最多只能調(diào)用一次。它會將對象的run()方法安排在一個單獨的進程中調(diào)用
2.7.run()
表示進程活動??梢栽谧宇愔兄貙懘朔椒?。標準run()方法會調(diào)用在構(gòu)造函數(shù)中傳遞給對象的target參數(shù),并分別從args和kwargs參數(shù)中獲取位置和關(guān)鍵字參數(shù)
# 標準run()方法指在Process類中的run()方法而不是子類中的run()方法
2.8.terminate(), kill()
強制終止進程。不會執(zhí)行退出處理程序和finally子句等。進程的子進程不會被終止
# 在Windows上terminate()與kill()相同
# 注意,如果在start()后終止進程,或在終止進程后立刻調(diào)用is_alive(),因為啟動進程和終止進程需要時間,所以分別會出現(xiàn)不會啟動進程和調(diào)用is_alive()返回值是True
示例:
運行結(jié)果:
子進程正在運行 ?# 3秒后出現(xiàn)
子進程的子進程正在運行 ?# 3秒后出現(xiàn)
子進程被終止 ?# 5秒后出現(xiàn),此時子進程被終止,但子進程的子進程還在運行
子進程的子進程正在運行 ?# 6秒后出現(xiàn)
子進程的子進程正在運行 ?# 9秒后出現(xiàn)
子進程的子進程正在運行 ?# 12秒后出現(xiàn)
主進程代碼執(zhí)行完畢 ?# 15秒后出現(xiàn),主進程終止,同時子進程的子進程終止
2.9.exitcode屬性
子進程的退出代碼。如果進程尚未終止,則值為None
# 一般0代表一切正常
2.10.close()
關(guān)閉Process對象,釋放與之關(guān)聯(lián)的所有資源。如果底層進程仍在運行,則會引發(fā)ValueError異常。一旦close()成功返回,Process對象的大多其他方法和屬性將引發(fā)ValueError
# 這里的關(guān)閉是指關(guān)閉進程對象而不是進程,類似文件對象的釋放。此方法不能在進程運行時調(diào)用
3.自定義Process子類創(chuàng)建進程
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
主進程開始執(zhí)行
子進程的名稱:進程--1,PID:10244,PPID:1568
子進程的名稱:進程--0,PID:8328,PPID:1568
子進程的名稱:進程--2,PID:10172,PPID:1568
子進程的名稱:進程--3,PID:10356,PPID:1568
子進程的名稱:進程--4,PID:5044,PPID:1568
主進程執(zhí)行完畢
# 多個進程同時執(zhí)行的順序是無序的
(5)、Pool進程池
Pool進程池是multiprocessing模塊中的一個類
1.并發(fā)和并行
1.1.并發(fā)
兩個或多個事件同一時間間隔發(fā)生,多個進程被交替輪換著執(zhí)行
1.2.并行
兩個或多個事件在同一時刻發(fā)生,多條命令在同一時刻在多個處理器上同時執(zhí)行
示例: ?# '0'表示不執(zhí)行,'1'表示執(zhí)行,長度表示時間
并發(fā):
事件A:1111000000001111000000001111...
事件B:0000111100000000111100000000...
事件C:0000000011110000000011110000...
并行:
事件A:1111111111111111111111111111...
事件B:11111111111111111111111...
事件C:1111111111111111111111111...
2.為什么要有進程池
在程序?qū)嶋H處理問題過程中,忙時會有成千上萬的任務需要被執(zhí)行,如果對應著開啟成千上萬個進程,一是創(chuàng)建進程和銷毀進程需要消耗時間,二是即使開啟了成千上萬的進程,操作系統(tǒng)也不能讓它們同時執(zhí)行,這樣反而會影響程序的效率
3.進程池的概念
進程池是資源進程、管理進程組成的技術(shù)的應用
定義一個池子,在里面放入固定數(shù)量的進程,在有需求時就拿池中的一個進程來處理任務。處理完畢后,進程并不關(guān)閉,而是放回進程池繼續(xù)等待任務
如果池中的進程數(shù)量不夠,任務就要等到池中有空閑進程時才繼續(xù)執(zhí)行。也就是池中的進程數(shù)量是固定的,同一時間最多有固定數(shù)量的進程在運行
進程池技術(shù)的應用至少由以下兩部分組成:
3.1.資源進程(工作進程)
預先創(chuàng)建好的空閑進程
3.2.管理進程
管理進程負責創(chuàng)建資源進程,把工作交給空間資源進程處理,回收已經(jīng)處理完工作的資源進程
4.優(yōu)點
這樣即不會增加操作系統(tǒng)的調(diào)度難道,還節(jié)省了開關(guān)進程的時間,在一定程度上能夠?qū)崿F(xiàn)并發(fā)效果
5.創(chuàng)建Pool類(進程池類)的對象
構(gòu)造函數(shù)如下:
Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
processes——要使用的工作進程數(shù)目。如果值為None(默認),則使用os.cpu_count()返回的值
補充:os.cpu_count()
返回系統(tǒng)的CPU數(shù)量。不確定返回None
initializer, initargs——如果initializer不為None,則每個工作進程將會在啟動時調(diào)用initializer(*initargs)
maxtasksperchild——一個工作進程在它退出之前或被一個新的工作進程代替之前能完成的任務數(shù)量,為了釋放未使用的資源。此參數(shù)的默認值為None,意味著工作進程的壽命與進程池一致
6.Pool類的方法
# 注意,進程池對象的方法只有創(chuàng)建它的進程能夠調(diào)用
# 進程池類遵守上下文管理協(xié)議,__enter__()返回進程池對象,__exit__()會調(diào)用terminate()方法
6.1.apply([func[, args[, kwds]]])
使用阻塞方式在進程池中的一個工作進程中調(diào)用func并傳入args位置參數(shù)和kwds關(guān)鍵字參數(shù)(同步調(diào)用)
6.2.apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply()方法的一個變種。使用非阻塞方式在進程池中的一個工作進程中調(diào)用func并傳入args位置參數(shù)和kwds關(guān)鍵字參數(shù)(異步調(diào)用)。返回一個AsyncResult對象
# AsyncResult類見本章(5)
callback, error_callback——參數(shù)必須是一個接受單個參數(shù)的可調(diào)用對象。當func執(zhí)行成功時,callback會被用于處理執(zhí)行后的返回結(jié)果,否則,會將拋出的異常對象作為參數(shù)傳遞給error_callback執(zhí)行
回調(diào)函數(shù)callback/error_callback應立刻執(zhí)行完成,否則會阻塞負責處理結(jié)果的線程
# 進程池里的回調(diào)函數(shù)在父進程里執(zhí)行,原因是任務是父進程發(fā)起的,所以結(jié)果也應該交給父進程
補充:同步調(diào)用和異步調(diào)用
同步調(diào)用是一種阻塞式調(diào)用,一段代碼調(diào)用另一段代碼時,必須等另一段代碼執(zhí)行結(jié)束并返回結(jié)果后代碼才能繼續(xù)執(zhí)行下去
異步調(diào)用是一種非阻塞式調(diào)用,一段異步代碼還未執(zhí)行完時,可以繼續(xù)執(zhí)行下一段代碼邏輯,當代碼執(zhí)行完后通過回調(diào)函數(shù)返回繼續(xù)執(zhí)行相應的邏輯,而不耽誤其他代碼的執(zhí)行
6.3.close()
關(guān)閉進程池,阻止后續(xù)任務提交到進程池,即不再接收新的任務。當所有任務執(zhí)行完成后,工作進程會退出
6.4.terminate()
不必等待未完成的任務,立刻停止工作進程。當進程池對象被垃圾回收時會立即調(diào)用此方法
6.5.join()
等待工作進程結(jié)束(阻塞調(diào)用的進程)。此方法必須在調(diào)用close()或者terminate()后調(diào)用
補充:垃圾回收(GC)
當電腦上的一個動態(tài)內(nèi)存不再需要時,就應該予以釋放,以讓出內(nèi)存,這種內(nèi)存資源管理稱為垃圾回收(GC,全稱Garbage Collection)
# 主進程不會等待進程池中的任務都執(zhí)行完畢后再退出,如果主進程代碼執(zhí)行完畢,進程池的工作進程就會被停止(工作進程是主進程的守護進程)
示例1: ?# 并發(fā)
運行結(jié)果: ?# 其中一次運行結(jié)果
主進程開始執(zhí)行
子進程的PID是:4596,任務名稱:0,時間:0.177s
子進程的PID是:9140,任務名稱:1,時間:1.177s
子進程的PID是:8060,任務名稱:2,時間:2.177s
子進程的PID是:4596,任務名稱:3,時間:3.178s
子進程的PID是:9140,任務名稱:4,時間:4.178s
子進程的PID是:8060,任務名稱:5,時間:5.179s
子進程的PID是:4596,任務名稱:6,時間:6.179s
子進程的PID是:9140,任務名稱:7,時間:7.179s
子進程的PID是:8060,任務名稱:8,時間:8.179s
子進程的PID是:4596,任務名稱:9,時間:9.179s
主進程執(zhí)行完畢
# 并發(fā):十個任務同一時間間隔創(chuàng)建,三個子進程交替輪換執(zhí)行任務
示例2: ?# 并行
繼上個示例
# 非阻塞方式運行,一個任務未完成時繼續(xù)創(chuàng)建下一個任務
p.apply(func=task, args=(i, start))改為p.apply_async(func=task, args=(i, start))
# 因為任務以非阻塞方式運行,如果不p.join()阻塞主進程,主進程代碼執(zhí)行完畢后就會停止進程池的工作進程
將# p.join()的'# '刪去
運行結(jié)果: ?# 其中一次運行結(jié)果
主進程開始執(zhí)行
子進程的PID是:1260,任務名稱:0,時間:0.17s
子進程的PID是:3956,任務名稱:1,時間:0.175s
子進程的PID是:8876,任務名稱:2,時間:0.224s
子進程的PID是:1260,任務名稱:3,時間:1.17s
子進程的PID是:3956,任務名稱:4,時間:1.175s
子進程的PID是:8876,任務名稱:5,時間:1.224s
子進程的PID是:1260,任務名稱:6,時間:2.17s
子進程的PID是:3956,任務名稱:7,時間:2.175s
子進程的PID是:8876,任務名稱:8,時間:2.224s
子進程的PID是:1260,任務名稱:9,時間:3.17s
主進程執(zhí)行完畢
# 并行:十個任務同一時刻創(chuàng)建,每秒三個任務在三個子進程上同時執(zhí)行
示例3:
運行結(jié)果: ?# 其中一次運行結(jié)果
主進程開始執(zhí)行
初始化函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:7184,任務名稱:0
初始化函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:6668,任務名稱:1
初始化函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:8460,任務名稱:2
任務0執(zhí)行完畢
任務執(zhí)行,子進程PID:7184,任務名稱:3
回調(diào)函數(shù)被調(diào)用,傳入值:0
任務1執(zhí)行完畢
回調(diào)函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:6668,任務名稱:4
任務2執(zhí)行完畢
回調(diào)函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:8460,任務名稱:5
任務3執(zhí)行完畢
回調(diào)函數(shù)被調(diào)用,傳入值:0
任務4執(zhí)行完畢
回調(diào)函數(shù)被調(diào)用,傳入值:0
任務5執(zhí)行完畢
回調(diào)函數(shù)被調(diào)用,傳入值:0
初始化函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:216,任務名稱:6
初始化函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:8568,任務名稱:7
初始化函數(shù)被調(diào)用,傳入值:0
任務執(zhí)行,子進程PID:2516,任務名稱:8
主進程執(zhí)行完畢??# 此時工作進程被停止,任務6~9停止執(zhí)行
# 初始化函數(shù)在工作進程啟動時調(diào)用,回調(diào)函數(shù)在任務執(zhí)行完畢后調(diào)用,每個工作進程最多完成兩個任務(也就是上面子進程PID最多出現(xiàn)兩次)
6.6.map(func, iterable[, chunksize])
內(nèi)置map()函數(shù)的并行版本(但只支持一個iterable參數(shù),支持多個的見starmap())。此方法以阻塞方式運行
# 這里指的并行是可迭代對象中的元素對提供函數(shù)做映射時并行
此方法會將可迭代對象分割為許多塊,然后提交給進程池??梢詫hunksize設置為一個正整數(shù)從而(近似)指定每個塊的大小
# 對于很長的迭代對象,可能消耗很多內(nèi)存。可以考慮使用imap()或imap_unordered()并且顯式指定chunksize以提升效率
# 注意這里的func不能使用匿名函數(shù)
返回值是一個列表,元素是映射的結(jié)果
補充:內(nèi)置函數(shù)map()? # 由于在原筆記沒有出現(xiàn)此函數(shù),在專欄解釋一下
根據(jù)提供的函數(shù)對指定序列做映射
將function應用于iterable的每一個元素,返回函數(shù)輸出結(jié)果的迭代器。如果iterable有多個,則function必須接收相同個數(shù)的實參,并被應用于所有iterable中的并行的每一個元素
# 多個iterable時,迭代器的元素數(shù)量是最少的iterable的元素數(shù)量為基準
map(function, iterable, ...)
function——函數(shù) ?# 可調(diào)用對象
iterable——可迭代對象
示例:
運行結(jié)果:
<class 'map'>
['1', '2', '3', '4', '5']
[2, 12, 30, 56, 90]
6.7.map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()方法的一個變種,以非阻塞方式運行,返回一個AsyncResult對象
callback, error_callback——與apply_async()方法相同
6.8.imap(func, iterable[, chunksize])
map()方法的延遲執(zhí)行版本,大致等效與map(),可能比map()慢很多,但對于很長的迭代對象,給chunksize設置一個很大的值會比默認值1極大地加快執(zhí)行速度
# 實測如果chunksize值太大反而會減慢速度
返回值是一個迭代器。如果chunksize是1,則這個迭代器的next()方法有一個可選的timeout參數(shù)(默認值None):如果無法在timeout秒內(nèi)執(zhí)行得到結(jié)果,則會拋出multiprocessing.TimeoutError異常
6.9.imap_unordered(func, iterable[, chunksize])
和imap()相同,只不過迭代器返回的結(jié)果的順序是任意的
# 如果進程池只有一個工作進程,返回的結(jié)果順序才能認為是“有序”的
6.10.starmap(func, iterable[, chunksize])
和map()類似,不過iterable中的每一項會被解包再作為函數(shù)參數(shù)。返回值是一個列表
示例:可迭代對象[(1,2), (3, 4)]會轉(zhuǎn)化為等價于[func(1, 2), func(3, 4)]的調(diào)用
6.11.starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
相當于starmap()與map_async()的結(jié)合。返回值是一個AsyncResult對象
(6)、AsyncResult類
Pool.apply_async()和Pool.map_async返回對象所屬的類
1.AsyncResult類的方法
1.1.get([timeout])
用于獲取執(zhí)行結(jié)果。如果timeout不是None并且在timeout秒內(nèi)仍然沒有執(zhí)行完得到結(jié)果,則拋出multiprocessing.TimeoutError異常。如果遠程調(diào)用發(fā)生異常,這個異常會通過此函數(shù)重新拋出
# 如果timeout是None(默認),在沒執(zhí)行完得到結(jié)果前會阻塞
1.2.wait([timeout])
阻塞,直到返回結(jié)果,或者timeout秒后超時
1.3.ready()
返回執(zhí)行狀態(tài),是否已經(jīng)完成
1.4.successful()
判斷是否未引發(fā)異常。如果沒有執(zhí)行完會拋出ValueError異常
示例:
運行結(jié)果:
主進程開始執(zhí)行
執(zhí)行結(jié)果:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
執(zhí)行結(jié)果:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
錯誤任務執(zhí)行
極慢任務執(zhí)行
執(zhí)行結(jié)果:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
執(zhí)行出錯或超時??# 這里是因為出錯
執(zhí)行出錯或超時??# 這里是因為超時
主進程執(zhí)行完畢
補充:短路運算符
當有多個表達式時,左邊的表達式可以確定結(jié)果時,就不在繼續(xù)運算右邊的值
短路運算符例如:
x and y:只有x為真值時才會對第二個參數(shù)求值
x or y:只有x為假值時才會對第二個參數(shù)求值
# 所以上個示例中successful()才不會報錯,因為當ready()為False時不會執(zhí)行它
(7)、進程之間的通信
1.進程之間的關(guān)系
每個進程都是獨立的,都有自己的內(nèi)存、空間、地址等,進程之間的數(shù)據(jù)不共享。在父進程創(chuàng)建子進程時,子進程會完全復制一份父進程的環(huán)境,包括變量、函數(shù)、類等
示例:
運行結(jié)果:
子進程運行前,主進程a的值為: 100
func2中a的值為: 50
func1中a的值為: 150
子進程運行后,主進程a的值為: 100
# 子進程中對a的修改,并未影響到主進程和其他子進程
補充:在Windows系統(tǒng)使用多進程不以主程序的形式運行會報錯的原因
因為Python創(chuàng)建子進程的時候相當于會開一塊新的內(nèi)存空間去存儲主進程中的代碼,并且是通過導入包的形式去復刻主程序中的代碼的,如果不以主程序的形式運行,就會執(zhí)行在if name == 'main'外的代碼,就會無限遞歸地創(chuàng)建子進程,但multiprocessing.Process的源碼中對這種行為是不允許的,所以會拋出RuntimeError異常
示例:
運行結(jié)果:
if __name__ == '__main__'外的代碼被執(zhí)行??# 主進程執(zhí)行
if __name__ == '__main__'內(nèi)的代碼被執(zhí)行??# 主進程執(zhí)行
if __name__ == '__main__'外的代碼被執(zhí)行??# 子進程執(zhí)行
2.進程之間的通信
實現(xiàn)進程之間的通信有兩種消息機制:
·?管道(Pipe)
·?隊列(Queue)
3.管道(Pipe)
3.1.概念
從一個進程連接到另一個進程的數(shù)據(jù)流。管道可以用于在兩個進程間的通信
管道可分為單向管道和雙向管道。單向管道只能從一端發(fā)送消息,一端接收消息。雙向管道兩端都可以發(fā)送和接收消息
3.2.創(chuàng)建管道
使用multiprocessing模塊中的Pipe()函數(shù),函數(shù)語法如下:
Pipe([duplex])
返回一個由管道連接的連接對象(Connection對象)。返回值是一個二元組(conn1, conn2),返回的兩個連接對象conn1, conn2表示管道的兩端
duplex——如果為True(默認值),則管道是雙向的,否則管道是單向的,即conn1只能用于接收消息,conn2只能用于發(fā)送消息
# 注意如果兩個進程(或線程)同時嘗試讀取或?qū)懭牍艿赖耐欢?,則管道中的數(shù)據(jù)可能會損壞。如果是不同端不會有損壞的風險
# 如果一個進程在試圖讀寫管道時被終止了,那么管道中的數(shù)據(jù)很可能是不完整的,因為此時可能無法確定消息的邊界
補充:序列化和反序列化
序列化是將數(shù)據(jù)結(jié)構(gòu)或?qū)ο筠D(zhuǎn)換成二進制串的過程。反序列化是將在序列化過程中所生成的二進制串轉(zhuǎn)換成數(shù)據(jù)結(jié)構(gòu)或者對象的過程
3.3.連接對象(Connection對象)的方法
3.3.1.send(obj)
將一個可序列化對象obj發(fā)送到連接的另一端,可以用recv()方法讀取
過大的對象(接近32MiB+,此值取決于操作系統(tǒng))有可能引發(fā)ValueError異常
3.3.2.recv()
返回一個由另一端使用send()發(fā)送的對象。該方法會一直阻塞直到接收到對象。如果對端關(guān)閉了連接并且沒有東西可接收,將拋出EOFError異常
3.3.3.fileno()
返回由連接對象使用的描述符或者句柄
3.3.4.close()
關(guān)閉連接對象。當連接對象被垃圾回收時會自動調(diào)用
3.3.5.poll([timeout])
返回連接對象中是否有可以讀取的數(shù)據(jù),如果有則返回True,否則返回False
timeout——如果未指定,此方法會立刻返回。如果是一個數(shù)字,則指定了最大阻塞的秒數(shù)。如果是None,則會一直等待,不會超時
3.3.6.send_bytes(buffer[, offset[, size])
從一個字節(jié)類對象中取出字節(jié)數(shù)組并作為一條完整消息發(fā)送
buffer, offset——如果給定了offset,在buffer中指定的offset位置讀取數(shù)據(jù)
# offset值為0時對應buffer第1個字節(jié),1對應第2個,以此類推
size——如果給定了size,會從緩沖區(qū)buffer中讀取size個字節(jié)
# 緩沖區(qū)長度不能小于offset或offset + size,offset和size必須為非負整數(shù)
過大的緩沖區(qū)(接近32MiB+,此值取決于操作系統(tǒng))有可能引發(fā)ValueError異常
3.3.7.recv_bytes([maxlength])
與recv()類似,返回的是字節(jié)數(shù)據(jù)
maxlength——如果給定了maxlength并且消息長于maxlength,則拋出OSError異常并且該連接對象將不再可讀
3.3.8.recv_bytes_into(buffer[, offset])
與recv()類似,會將一條完整的字節(jié)數(shù)據(jù)讀入buffer中并返回消息的字節(jié)數(shù)
buffer必須是一個可寫的字節(jié)類對象(例如bytearray)
offset——如果給定了offset,則消息從指定的offset位置寫入緩沖區(qū)。offset必須是小于緩沖區(qū)長度(字節(jié))的非負整數(shù)
如果緩沖區(qū)太小,則引發(fā)BufferTooShort異常,并且完整的消息將會存放在異常實例e的e.args[0]中
示例1:
運行結(jié)果:
b'345'
[1, None, '']
True
False
11
bytearray(b'\x00\x00\x00Hello World\x00')??# 在第四個字節(jié)及以后插入了消息
示例2:
# 這個示例由https://blog.csdn.net/a883774913/article/details/125375440修改而成
運行結(jié)果: ?# 其中一次運行結(jié)果
發(fā)送消息:hello
發(fā)送消息:hey
發(fā)送消息:hru?
發(fā)送消息:END
收到消息:hello
收到消息:hey
收到消息:hru?
# 有一定的延遲,所以在發(fā)送消息后沒有立刻收到
4.隊列(Queue)
4.1.概念
隊列是一種遵循先進先出(First In, First Out)數(shù)據(jù)操作規(guī)則的線性數(shù)據(jù)結(jié)構(gòu)
隊列的頭部稱為隊首,隊列尾部稱為隊尾,將把元素加入隊尾的操作稱為入隊,刪除隊首元素的操作稱為出隊。隊列可以在多個生產(chǎn)者和消費者之間通信(多個進程之間通信)
補充:單向隊列和雙向隊列
單向隊列只能從隊尾添加元素,只能從隊首刪除元素,遵循先進先出原則
雙向隊列可以從隊尾或隊首添加或刪除元素
# 下面使用的是單向隊列
4.2.創(chuàng)建隊列
使用multiprocessing模塊中的Queue類,構(gòu)造函數(shù)如下:
Queue([maxsize])
返回一個使用一個管道和少量鎖和信號量實現(xiàn)的共享隊列實例。當一個進程將一個對象放入隊列中時,一個寫入線程會啟動并將對象從緩沖區(qū)寫入管道中
# 鎖、信號量、線程,以及前面提到的生產(chǎn)者和消費者在后文說明
maxsize——是一個整數(shù),用于設置隊列中可以放入的消息個數(shù)的上限(項目數(shù)的上限)。如果值小于或等于零,則隊列尺寸為無限大
4.3.Queue類的方法
4.3.1.qsize()
返回隊列的大致長度(當前包含的消息數(shù)量)
4.3.2.empty()
如果隊列為空則返回True,否則返回False
4.3.3.full()
如果隊列為滿則返回True,否則返回False
4.3.4.put(obj[, block[, timeout]])
將obj放入隊列
block, timeout——如果blcok為True(默認值)而且timeout為None(默認值),將會阻塞當前進程,直到有空的緩沖槽。如果timeout為正數(shù),將會在阻塞了最多timeout秒后還沒有可用的緩沖槽時拋出queue.Full異常。如果block為False,則會忽略timeout參數(shù),僅當有可用緩沖槽時才放入對象,否則拋出queue.Full異常
# 如果隊列已經(jīng)關(guān)閉,則拋出ValueError異常
# queue.Full異常以及下面提到的queue.Empty異常都是queue模塊中的
4.3.5.put_nowait(obj)
相當于put(obj, False)
4.3.6.get([block[, timeout]])
從隊列中取出并返回對象
block, timeout——如果blcok為True(默認值)而且timeout為None(默認值),將會阻塞當前進程,直到隊列中出現(xiàn)可用的對象。如果timeout為正數(shù),將會在阻塞了最多timeout秒后還沒有可用的對象時拋出queue.Empty異常。如果block為False,則會忽略timeout參數(shù),僅當有可用對象時才取出并返回,否則拋出queue.Empty異常
# 如果隊列已經(jīng)關(guān)閉,則拋出ValueError異常
4.3.7.get_nowait()
相當于get(False)
# 由于多線程或多進程的環(huán)境,qsize()、empty()、full()返回值是不可靠的
# 當一個對象被放入一個隊列時,這個對象首先會被一個后臺線程用pickle序列化,并將序列化的數(shù)據(jù)通過一個底層管道的管道傳遞到隊列中。所以可能需要極小的延遲,empty()才會返回False、close()才不會拋出BrokenPipeError異常等等
# 如果有多個進程同時將對象放入隊列,那么在隊列的另一端接收到對象可能是無序的。但是由同一個進程放入的多個對象的順序在另一端輸出時總是一樣的
# 如果一個子進程已將一些對象放入隊列中(并且沒有使用cancel_join_thread()方法),那么這個進程在所有緩沖區(qū)的對象被刷新進管道之前,是不會終止的。這就意味著如果除非確定所有放入隊列中的對象都已經(jīng)被消費了,否則如果試圖等待這個進程,可能會陷入死鎖狀態(tài)
示例1: ?# 演示使用隊列實現(xiàn)進程之間的通信
運行結(jié)果:
a入隊時的值: 90
a入隊時的值: 80
a入隊時的值: 70
a入隊時的值: 60
a入隊時的值: 50
出隊時a的值: 90
出隊時a的值: 80
出隊時a的值: 70
出隊時a的值: 60
出隊時a的值: 50
示例2: ?# 演示放入對象的延遲和死鎖狀態(tài)
運行結(jié)果1:
True
[進入死鎖]
運行結(jié)果2:
False
[進入死鎖]
# 死鎖是因為子進程(工作進程)將消息放入隊列時,隊列產(chǎn)生具有q.put('X' * 1000000)的feeder線程,feeder線程將消息傳入管道。子進程中的主線程在等待所有緩沖區(qū)的對象被刷新進管道(也就是下面join_thread()提到的等待后臺線程),但是消息量太大,無法放入緩沖區(qū),而主進程在等待子進程運行結(jié)束,兩個進程永遠在互相等待,造成死鎖
# 盡量避免在進程間傳遞大量數(shù)據(jù),越少越好
4.3.8.close()
指示當前進程將不會再往隊列中放入對象。一旦緩沖區(qū)中的數(shù)據(jù)被寫入管道之后,后臺的數(shù)據(jù)會退出。這個方法在隊列被垃圾回收時會自動調(diào)用
# 注意只是當前進程,并不影響其他進程
4.3.9.join_thread()
等待后臺線程。這個方法僅在調(diào)用了close()方法之后可用。這會阻塞當前進程,直到后臺線程退出,確保所有緩沖區(qū)中的數(shù)據(jù)被寫入管道中
默認情況下,如果一個不是隊列創(chuàng)建者的進程試圖退出,他會嘗試等待這個隊列的后臺線程
4.3.10.cancel_join_thread()
防止join_thread()方法阻塞當前進程。具體來說這防止進程退出時自動等待后臺線程退出
此方法僅當不關(guān)心底層管道中可能丟失數(shù)據(jù),只希望進程能馬上結(jié)束時使用
4.4.SimpleQueue類的方法
SimpleQueue是multiprocessing模塊中的一個類,是一個簡化的Queue類的實現(xiàn),很像帶鎖的Pipe
# SimpleQueue類不能指定放入消息的上限
# 實測SimpleQueue類的empty()方法返回值可靠,但消息的上限比Queue類的少
4.4.1.empty()
如果隊列為空則返回True,否則返回False
4.4.2.put(item)
將item放入隊列
4.4.3.get()
從隊列中取出并返回對象
示例:
運行結(jié)果:
1 1 1 1 1 1 1 1 1 1
4.5.JoinableQueue類的方法
JoinableQueue是multiprocessing模塊中的一個類,是Queue的子類,額外添加了以下方法:
4.5.1.task_done(), join()
task_done()指出之前進入隊列的任務已經(jīng)完成。由隊列的消費者進程使用
join()阻塞至隊列中所有的項目(任務)都被接收和處理完畢
每當一個項目添加到隊列的時候,未完成任務的計數(shù)會增加。對于每個調(diào)用get()獲取的任務,執(zhí)行完畢后調(diào)用task_done()告訴隊列該任務已處理完成,未完成計數(shù)就會減少。當未完成計數(shù)降到零的時候,join()阻塞將被解除
# 當然也可以不調(diào)用get()獲取任務就調(diào)用task_done()告訴任務已經(jīng)完成
如果task_done()被調(diào)用的次數(shù)多于放入隊列中的項目數(shù)量,將引發(fā)ValueError異常
示例:
運行結(jié)果:
主進程執(zhí)行結(jié)束
4.6.生產(chǎn)者和消費者模型
多個進程(線程)之間的通信可以采用生產(chǎn)者和消費者模型
生產(chǎn)者負責生產(chǎn)數(shù)據(jù)
消費者負責處理數(shù)據(jù)
它們通過一個容器(中央倉庫)進行通信,這個容器可以是隊列,相當于一個緩沖區(qū):生產(chǎn)者 --<生產(chǎn)數(shù)據(jù)放入容器>--> 容器 --<從容器取出數(shù)據(jù)>--> 消費者
4.6.1.作用
·?平衡生產(chǎn)者與消費者之間的速度差
·?解決生產(chǎn)者和消費者的強耦合問題
# 耦合是程序之間關(guān)聯(lián)性(依賴程度)
# 在并發(fā)編程中使用生產(chǎn)者和消費者模式能解決絕大多數(shù)并發(fā)問題,并通過平衡生產(chǎn)進程(線程)和消費進程(線程)的工作能力來提高程序的整體處理數(shù)據(jù)的速度
示例: ?# 使用JoinableQueue實現(xiàn)生產(chǎn)者和消費者模型
運行結(jié)果: ?# 其中一次運行結(jié)果
產(chǎn)生第一輪數(shù)據(jù)
產(chǎn)生第一輪數(shù)據(jù)
數(shù)據(jù)[1, 0, 'p1']消費完成
數(shù)據(jù)[1, 1, 'p1']消費完成
數(shù)據(jù)[1, 0, 'p2']消費完成
數(shù)據(jù)[1, 1, 'p2']消費完成
產(chǎn)生第二輪數(shù)據(jù)
產(chǎn)生第二輪數(shù)據(jù)
生產(chǎn)者進程退出
生產(chǎn)者進程退出
數(shù)據(jù)[2, 0, 'p2']消費完成
數(shù)據(jù)[2, 1, 'p2']消費完成
數(shù)據(jù)[2, 0, 'p1']消費完成
數(shù)據(jù)[2, 1, 'p1']消費完成
(8)、multiprocessing模塊的部分函數(shù)
1.active_children()
返回當前進程存活的子進程的列表
# 列表的元素是Process對象
2.cpu_count()
參見os.cpu_count()
# 可能引發(fā)NotImplementedError
3.current_process()
返回與當前進程對應的Process對象
4.parent_process()
返回父進程的Process對象,相當于父進程調(diào)用current_process()。如果在主進程調(diào)用,則會返回None
5.freeze_support()
為使用了multiprocessing模塊的程序,提供凍結(jié)代碼以產(chǎn)生Windows可執(zhí)行文件的模塊的支持(例如PyInstaller模塊)
需要在main模塊(主模塊)的if __name__ == '__main__'行之后立刻調(diào)用此函數(shù)
# 如果不調(diào)用,產(chǎn)生的可執(zhí)行文件大概率會出錯
(9)、線程(Thread)
1.概念
線程(Thread)是CPU可調(diào)度的最小單位,線程被包含在進程中,是進程中實際的運作單位。一個進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務
# 一個進程中至少有一個線程在工作
2.創(chuàng)建線程的方式
2.1.threading模塊中的Thread類
2.2.自定義Thread類的子類
3.分別使用兩種方式創(chuàng)建線程
3.1.使用threading模塊中的Thread類創(chuàng)建線程
# 可以類比著創(chuàng)建進程
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
主線程開始執(zhí)行
線程9068正在執(zhí)行:0
線程8648正在執(zhí)行:0
線程8648正在執(zhí)行:1
線程9068正在執(zhí)行:1
線程8648正在執(zhí)行:2
線程9068正在執(zhí)行:2
主線程結(jié)束執(zhí)行
# 多個線程同時執(zhí)行的順序是無序的
3.2.自定義Thread類的子類創(chuàng)建線程
示例:
(10)、Thread類
Thread是threading模塊中的一個類,通過創(chuàng)建一個Thread對象然后調(diào)用對象的start()方法來生成線程
1.創(chuàng)建Thread類的對象
應始終使用關(guān)鍵字參數(shù)調(diào)用構(gòu)造函數(shù),構(gòu)造函數(shù)如下:
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
# 關(guān)于參數(shù)請參見本章(4)、1
2.Thread類的常用方法和屬性
2.1.name屬性
只用于識別的字符串,沒有語義。多個線程可以賦予相同的名稱。初始名稱由構(gòu)造函數(shù)設定(默認為'Thread-N',其中N是第N個子線程)
2.2.ident屬性, native_id屬性
ident屬性是這個線程的“線程標識符”,是個非零整數(shù)。它的值沒有直接含義,主要用作magic cookie
native_id屬性是這個線程的原生線程ID(TID),是個非負整數(shù)。它的值可用來在整個系統(tǒng)中唯一地標識這個特定線程
兩個屬性在線程終結(jié)之后可能會被復用
# 類似于進程ID,線程ID的有效期(全系統(tǒng)范圍內(nèi)保證唯一)將從線程被創(chuàng)建開始直到線程被終結(jié)
# 實測通常兩個屬性值相同(沒有發(fā)現(xiàn)不同的情況)
2.3.daemon屬性
線程的守護標志,是一個布爾值,必須在start()被調(diào)用之前設置,否則在之后設置會拋出RuntimeError異常
# 初始值是構(gòu)造函數(shù)中設置的值(值為None則從創(chuàng)建的線程繼承)
# 主線程不是守護線程,默認為False
# 當沒有存活的非守護線程時,整個Python程序才會退出
# 可以在守護線程中創(chuàng)建子線程
2.4.is_alive()
返回線程是否存活。當run()方法剛開始知道run()方法剛結(jié)束時此方法返回True
2.5.join(timeout=None)
阻塞調(diào)用的線程,直到調(diào)用join()方法對應的線程對象終止
# 終止包括正常終止、拋出未處理異常、發(fā)生超時
timeout——同Process類的join()方法,值可以是以秒為單位的浮點數(shù)或分數(shù)
# 因為join()總是返回None,所以判斷是否發(fā)生超時要在join()后調(diào)用is_alive()(返回True則join()超時)
# fractions模塊支持分數(shù)
# 一個線程可以被join多次。線程join自己或join一個未開始的線程會拋出RuntimeError
2.6.start()
同Process類的start()方法
# 同一個線程對象調(diào)用此方法大于一次會拋出RuntimeError異常
2.7.run()
同Process類的run()方法
# 注意Thread類沒有terminate(), kill()方法
(11)、threading模塊的部分函數(shù)和常量
1.enumerate()
以列表形式返回當前所有存活的Thread對象。該列表包含守護線程,current_thread()創(chuàng)建的虛擬線程對象和主線程。它不包含已終結(jié)的線程和尚未開始的線程
2.active_count()
返回當前存活的Thread對象的數(shù)量。返回值與enumerate()函數(shù)所返回的列表長度一致
3.current_thread()
返回當前對應調(diào)用者的控制線程的Thread對象。如果調(diào)用者的控制線程不是利用threading模塊創(chuàng)建,則會返回一個功能受限的虛擬線程對象
4.get_ident()
返回當前線程的“線程標識符”
5.get_native_id()
返回內(nèi)核分配給當前線程的原生集成線程ID
6.main_thread()
返回主線程的Thread對象
7.TIMEOUT_MAX常量
阻塞函數(shù)中形參timeout允許的最大值,傳入值超過此值會拋出OverflowError異常
(12)、線程之間的通信
1.線程之間的關(guān)系
一個進程內(nèi)的所有線程共享數(shù)據(jù)
示例:
運行結(jié)果:
主線程開始執(zhí)行,a的值為:100
加的線程開始執(zhí)行
a的值為:130??# 100 + 30 = 130
加的線程執(zhí)行結(jié)束
減的線程開始執(zhí)行
a的值為:80??# 130 - 50 = 80
減的線程執(zhí)行結(jié)束
主線程結(jié)束執(zhí)行,a的值為:80
(13)、線程同步
1.多線程共享數(shù)據(jù)所帶來的安全性問題
多線程都是在同一個進程中運行的,因此在進程中的全局變量所有線程都可以共享,這就造成了一個問題,因為線程的順序是無序的,有可能會造成數(shù)據(jù)錯亂
2.鎖機制——互斥鎖
互斥鎖有鎖定和非鎖定兩種狀態(tài)
當線程訪問共享資源時,先將資源的狀態(tài)變成“鎖定”,其它線程不能修改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”時,其它線程才能操作共享資源
互斥鎖是線程的相互排斥,誰先搶到資源,誰就上鎖修改資源,同一時間只能有一個線程可以進行修改,犧牲了速度卻保證了數(shù)據(jù)的安全性
3.使用鎖的原則
·?把盡量少的和不耗時的代碼放到鎖中執(zhí)行
·?代碼執(zhí)行完成后要記得釋放鎖
4.鎖/互斥鎖/同步鎖/原始鎖(Lock)
在Python中,鎖是一個在鎖定時不屬于特定線程的能用的最低級的同步基元組件
使用threading模塊中的Lock類
一旦一個線程獲得一個鎖,會阻塞隨后嘗試獲得鎖的線程,直到它被釋放;任何線程都可以釋放它
# 使用類的實例化創(chuàng)建鎖對象
4.1.Lock類的方法
4.1.1.acquire(blocking=True, timeout=-1)
可以阻塞或非阻塞地獲得鎖(上鎖)
blocking——如果值為True(默認值),阻塞直到鎖被釋放,然后將鎖鎖定并返回True;如果值為False,不會阻塞,如果鎖已被鎖定則返回False,否則將鎖鎖定并返回True
timeout——值為浮點型,如果值為非負數(shù),只要無法得到鎖,將最多阻塞timeout設定的秒數(shù);如果值為-1,將無限等待;如果值為除-1以外的負數(shù),或當blocking值為False時,將此參數(shù)的值設置為-1以外的值會拋出ValueError異常
如果成功獲得鎖,則返回True,否則返回False
4.1.2.release()
釋放一個鎖。此方法可以在任何線程中調(diào)用,不限于獲得鎖的線程
當鎖被鎖定時,將它重置為未鎖定。如果其他線程正在等待這個鎖解鎖而被阻塞,只允許其中一個獲得鎖并繼續(xù)運行
當鎖未被鎖定時,會引發(fā)RuntimeError異常
此方法沒有返回值
4.1.3.locked()
返回鎖是否處于鎖定狀態(tài)
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
5468正在出傳第5張票
9580正在出傳第4張票
2520正在出傳第3張票
5468正在出傳第2張票
5468正在出傳第1張票
# 先執(zhí)行到上鎖語句的線程上鎖,后執(zhí)行到的被阻塞。當票被出傳完釋放鎖后,先獲得鎖的線程上鎖,其余線程繼續(xù)被阻塞
補充:multiprocessing模塊中的Lock類
類似于threading.Lock,以下是不同之處:
threading中的Lock只能用于同一個進程,multiprocessing中的能用于多進程
# 在多進程使用鎖需要在創(chuàng)建進程時傳遞鎖對象
在acquire()方法中,multiprocessing中的第一個參數(shù)名是block而不是blocking。第二個參數(shù)默認值為None,無限等待。如果第二個參數(shù)值為負數(shù),則效果和值為0一樣而不是拋出異常。當block為False時,timeout會被忽略而不是拋出異常
在release()方法中,multiprocessing中的此方法可以在任何進程、線程中使用,不限于鎖的擁有者。當嘗試釋放一個沒有被持有的鎖時拋出的異常是ValueError而不是RuntimeError
multiprocessing中沒有l(wèi)ocked()方法
補充:
在multiprocessing模塊和threading模塊提供的所有帶有acquire()和release()方法的對象,都可以被用作with語句的上下文管理器。當進入語句塊是調(diào)用acquire()方法,退出語句塊時調(diào)用release()方法
示例:
補充:死鎖(Deadlock) ?# 摘編自百度百科
死鎖是指兩個或兩個以上的進程或線程在執(zhí)行過程中,由于競爭資源或者由于彼此通信而造成的一種阻塞的現(xiàn)象,若無外力作用,它們都將無法推進下去。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠在互相等待的進程或線程稱為死鎖進程或線程
5.遞歸鎖/可重入鎖(RLock)
遞歸鎖是一個可以被同一個線程多次獲取的同步基礎(chǔ)元件。在內(nèi)部,它在基元鎖的鎖定/非鎖定狀態(tài)上附加了“所屬線程”和“遞歸等級”的概念。在鎖定狀態(tài)下屬于某些線程;在非鎖定狀態(tài)下不屬于任何線程。如果某個線程拿到了遞歸鎖,這個線程可以再次拿到這個鎖而不需要等待。但是這個線程拿鎖操作和釋放鎖操作的次數(shù)相同遞歸鎖才會處于非鎖定狀態(tài)
使用threading模塊中的RLock類
# 使用類的實例化創(chuàng)建遞歸鎖對象
5.1.RLock類的方法
5.1.1.acquire(blocking=True, timeout=-1)
與Lock類中的acquire()方法類似,區(qū)別是如果調(diào)用此方法的線程已經(jīng)擁有鎖,遞歸等級加一,不進行阻塞并返回True。否則,如果其他線程擁有該鎖,則阻塞至該鎖解鎖。一旦鎖被解鎖(不屬于任何線程),則搶奪所有權(quán),設置遞歸等級為一并繼續(xù)運行。如果多個線程被阻塞,等待鎖被解鎖,一次只有一個線程搶到鎖的所有權(quán)并繼續(xù)運行
5.1.2.release()
釋放鎖,自減遞歸等級。如果減到零,則將鎖重置為非鎖定狀態(tài)(不被任何線程擁有)。如果自減后遞歸等級不是零,則鎖保持鎖定,仍由調(diào)用線程擁有
只有當前線程擁有鎖才能調(diào)用此方法,如果鎖被釋放后調(diào)用此方法或非擁有鎖的線程調(diào)用此方法,則拋出RuntimeError異常
此方法沒有返回值
補充:Lock和RLock實際上是一個工廠函數(shù)(功能和類相似但是是一個函數(shù)),返回平臺支持的具體遞歸鎖類中最有效的版本的實例
補充:multiprocessing模塊中的RLock類
類似于threading.RLock,以下是不同之處:
threading中的RLock只能用于同一個進程,multiprocessing中的能用于多進程
在acquire()方法中,multiprocessing中的此方法的區(qū)別同multiprocessing.Lock
在release()方法中,multiprocessing中的此方法的區(qū)別與multiprocessing.Lock相似,不同之處在于拋出的異常是AssertionError
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
my_thread獲得鎖
my_thread獲得鎖
my_thread釋放鎖
my_thread釋放鎖
0獲得鎖
0釋放鎖
1獲得鎖
2獲得鎖 ?# 這里是因為在1釋放鎖后打印內(nèi)容前,2已經(jīng)獲得鎖并打印內(nèi)容
2釋放鎖
1釋放鎖
6.信號量(Semaphore)
一個信號量管理一個內(nèi)部計數(shù)器,每調(diào)用一次acquire(),計數(shù)器減1;每調(diào)用一次release(),計數(shù)器加1。計數(shù)器的值永遠不會小于零。當計數(shù)器為0時,調(diào)用acquire()將會阻塞直到其他線程調(diào)用release()方法
使用threading模塊中的Semaphore類,構(gòu)造函數(shù)如下:
Semaphore(value=1)
value——賦予內(nèi)部計數(shù)器初始值,默認值為1。如果value被賦予小于0的值,將會引發(fā)ValueError異常
6.1.Semaphore類的方法
方法與RLock類的方法類似,區(qū)別是上文所說的內(nèi)部計數(shù)器
# 因為調(diào)用release()方法是釋放一個信號量,將內(nèi)部計數(shù)器的值增加1,所以不會拋出異常
補充:multiprocessing模塊中的Semaphore類,類似于threading.Semaphore,不同之處與multiprocessing.RLock和threading.RLock的不同之處大體相同
示例:
運行結(jié)果:
False ?# 說明沒有獲得信號量
# 在大多數(shù)情況下,信號量用于保護數(shù)量有限的資源。如果信號量被釋放的次數(shù)過多,則表明出現(xiàn)了異常
7.有界信號量
有界信號量通過檢查以確保它當前的值不會超過初始值
使用threading模塊中的BoundedSemaphore類,構(gòu)造函數(shù)如下:
BoundedSemaphore(value=1)
value——指定內(nèi)部計數(shù)器不會超過的初始值(默認值為1)。如果超過了初始值,將會引發(fā)ValueError異常
# value也是賦予內(nèi)部計數(shù)器的初始值
# 方法與Semaphore類相同
補充:multiprocessing模塊中的BoundedSemaphore類,不同之處與multiprocessing.RLock和threading.RLock的不同之處大體相同.
示例:
運行結(jié)果:
任務被執(zhí)行
任務被執(zhí)行
任務被執(zhí)行
# 兩秒之后
任務被執(zhí)行
任務被執(zhí)行
8.事件(Event)
線程之間通信的最簡單機制之一:一個線程發(fā)出事件信號,其他線程等待該信號
一個事件對象管理一個內(nèi)部標志(內(nèi)部旗標),調(diào)用set()方法可將其設置為True,調(diào)用clear()方法可將其設置為False,調(diào)用wait()方法將進入阻塞直到標志為True
使用threading模塊中的Event類,內(nèi)部標志初始為False
# 使用類的實例化創(chuàng)建事件對象
8.1.Event類的方法
8.1.1.is_set()
如果內(nèi)部標志為True時返回True,否則返回False
8.1.2.set()
將內(nèi)部標志設置為True
8.1.3.clear()
將內(nèi)部標志設置為False
8.1.4.wait(timeout=None)
阻塞線程直到內(nèi)部標志為True。如果調(diào)用內(nèi)部標志為True,則立刻返回,返回值為True。否則阻塞線程,直到調(diào)用set()方法將標志設置為True(這種情況返回值為True)或者發(fā)生可選的超時
timeout——是一個浮點數(shù),代表操作的超時時間。默認值為None,表示不會超時。如果值為負數(shù),則效果和值為0一樣。超時后返回值是False
補充:multiprocessing模塊中的Event類,類似于threading.Event,區(qū)別是能用于多進程
示例: ?# 模擬紅綠燈
# 這個示例由https://www.shuzhiduo.com/A/l1dyVEjGze/修改而成
9.條件變量(Condition)
條件變量用來管理那些已獲得了一個鎖但是因為需要滿足某一條件才能運行的線程。條件變量總是與某種類型的鎖對象關(guān)聯(lián)(互斥鎖或遞歸鎖)。在線程獲得鎖的時候調(diào)用wait()方法釋放鎖,然后阻塞直到其他線程通知(也就是滿足條件)后等待鎖被其他線程釋放后獲得鎖繼續(xù)執(zhí)行
使用threading模塊中的Condition類,構(gòu)造函數(shù)如下:
Condition(lock=Lone)
lock——如果值為None,則會創(chuàng)建新的RLock對象作為底層鎖,否則值必須為Lock對象或RLock對象,它將被用作底層鎖
9.1.Condition類的方法
9.1.1.acquire(*args)
獲得底層鎖。此方法調(diào)用底層鎖的相應方法,返回值是底層鎖相應方法的返回值
9.1.2.release()
釋放底層鎖。此方法調(diào)用底層鎖的相應方法,沒有返回值
9.1.3.wait(timeout=None)
釋放底層鎖,然后阻塞,直到被通知或發(fā)生超時,重新獲得鎖并繼續(xù)執(zhí)行。如果線程在調(diào)用此方法時沒有獲得鎖,將會引發(fā)RuntimeError異常
# 當?shù)讓渔i是RLock時,不會使用它的release()方法釋放鎖,會使用RLock類的內(nèi)部接口,所以即使多次遞歸獲取它,也能將它解鎖。然后在重新獲得鎖時使用另一個內(nèi)部接口來恢復遞歸等級
timeout——是一個浮點數(shù),代表操作的超時時間。默認值為None,表示不會超時。如果值為負數(shù),則效果和值為0一樣。超時后返回值是False,否則是True
9.1.4.wait_for(predicate, timeout=None)
重復調(diào)用wait(),直到滿足判斷式predicate或發(fā)生超時。返回值是判斷式最后一個返回值或者發(fā)生超時返回False
predicate——是一個可調(diào)用對象且返回值可解釋為一個布爾值
timeout——同wait()
忽略超時功能,調(diào)用此方法大致相當于編寫:
while not predicate:
????cv.wait() ?# cv是條件變量
9.1.5.notify(n=1)
喚醒最多n個(默認為1個)正在等待這個條件變量的線程。如果線程在調(diào)用此方法時沒有獲得鎖,將會引發(fā)RuntimeError異常
# 如果沒有線程在等待,這是一個空操作
# 注意notify()方法不釋放鎖,被喚醒的線程直到它可以重新獲得鎖才繼續(xù)執(zhí)行
9.1.6.notify_all()
與notify()類似,區(qū)別是會喚醒所有等待這個條件變量線程
補充:multiprocessing模塊中的Condition類,類似于threading.Condition,區(qū)別是底層鎖是multiprocessing模塊中的且能用于多進程
示例1:
示例2: ?# 演示wait_for()
運行結(jié)果:
子線程開始運行,獲得鎖
子線程等待被通知且滿足判斷式
第一次通知 ?# 3秒后,wait_5s()函數(shù)返回False
第二次通知 ?# 6秒后,wait_5s()函數(shù)返回True
子線程結(jié)束運行
10.柵欄(Barrier)
柵欄可以將任務集中到柵欄,當所有線程都準備好時,所有線程被釋放并繼續(xù)執(zhí)行,而柵欄重置以便下次使用。線程調(diào)用wait()方法后將阻塞,直到所有線程(指定數(shù)量的線程)都調(diào)用了wait()方法,此時所有線程將被同時釋放
使用threading模塊中的Barrier類,構(gòu)造函數(shù)如下:
Barrier(parties, action=None, timeout=None)
parties——柵欄對象需要的線程數(shù)量
action——當所有線程被釋放時在其中一個線程中自動調(diào)用
timeout——默認的超時時間,作為wait()方法中timeout參數(shù)的默認值
10.1.Barrier類的方法和屬性
10.1.1.wait(timeout=None)
沖出柵欄。當柵欄中所有線程都已經(jīng)調(diào)用了這個函數(shù)時,它們將同時被釋放
timeout——是一個浮點數(shù),代表操作的超時時間。默認值為構(gòu)造函數(shù)中指定的timeout,表示不會超時。如果值為負數(shù),則效果和值為0一樣。如果發(fā)生了超時,柵欄對象將進入損壞狀態(tài)
如果創(chuàng)建柵欄對象時在構(gòu)造函數(shù)提供了action參數(shù),它將在其中一個線程釋放前被調(diào)用。如果此調(diào)用引發(fā)了異常,柵欄對象將進入損壞狀態(tài)
如果柵欄對象進入損壞狀態(tài),但仍有線程等待釋放,這些線程將會收到BrokenBarrierError異常
函數(shù)的返回值是一個整數(shù),取值返回在0到parties-1,在每個線程中的返回值不同??捎糜趶乃芯€程中選擇唯一的一個線程執(zhí)行一些特別的任務
10.1.2.reset()
重置柵欄為默認的初始狀態(tài)。如果柵欄中仍有線程等待釋放,這些線程將會收到BrokenBarrierError異常
# 使用此參數(shù)時,如果存在狀態(tài)未知的其他線程,則可能需要執(zhí)行外部同步,例如想要一個狀態(tài)未知的線程進入重置后的柵欄,需要使用同步語句先將柵欄重置,再使這個線程進入柵欄
# 處于損壞狀態(tài)的柵欄對象重置后將離開損壞狀態(tài),可以繼續(xù)使用,但最好將其廢棄并新建一個
10.1.3.abort()
使柵欄處于損壞狀態(tài)
10.1.4.parties屬性
沖出柵欄所需要的線程數(shù)量
# 值是構(gòu)造函數(shù)中傳入parties參數(shù)的值
10.1.5.n_waiting屬性
當前時刻正在柵欄中被阻塞的線程數(shù)量
10.1.6.broken屬性
是一個布爾值,如果柵欄處于損壞狀態(tài)值為True,否則為False
補充:multiprocessing模塊中的Barrier類,類似于threading.Barrier,區(qū)別是能用于多進程
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
Thread-1將進入柵欄,當前柵欄中的線程數(shù):0
Thread-4將進入柵欄,當前柵欄中的線程數(shù):1
Thread-2將進入柵欄,當前柵欄中的線程數(shù):2
# Thread-2未進入柵欄前有兩個線程,進入柵欄后有3個線程,釋放所有線程
Thread-4沖出柵欄
Thread-2沖出柵欄
Thread-1沖出柵欄
wait()返回值為0的線程執(zhí)行該語句,是Thread-1
Thread-3將進入柵欄,當前柵欄中的線程數(shù):0
Thread-6將進入柵欄,當前柵欄中的線程數(shù):1
Thread-5將進入柵欄,當前柵欄中的線程數(shù):2
Thread-5沖出柵欄
Thread-3沖出柵欄
wait()返回值為0的線程執(zhí)行該語句,是Thread-3
Thread-6沖出柵欄
(14)、定時器(Timer)
定時器Timer是threading模塊中的一個類,它是Thread類的子類。此類表示一個操作應該在等待一定時間之后運行——相當于一個定時器,構(gòu)造函數(shù)如下:
Timer(interval, function, args=None, kwargs=None)
創(chuàng)建一個定時器,在通過start()函數(shù)啟動后,經(jīng)過interval秒的間隔時間后,將會用位置參數(shù)args和關(guān)鍵字參數(shù)kwargs調(diào)用function。如果args為None(默認值),則會使用一個空列表。如果kwargs為None(默認值),則會使用一個空字典
1.1.Timer類的新增方法
# Timer是Thread類的子類,擁有Thread類的所有方法
1.1.1.cancel()
停止定時器并取消執(zhí)行定時器將要執(zhí)行的操作。僅當定時器處于等待狀態(tài)時有效
(15)、線程隊列/內(nèi)置模塊queue
內(nèi)置模塊queue實現(xiàn)了多生產(chǎn)者、多消費者隊列,提供了同步的、線程安全的隊列類
1.FIFO(先進先出)隊列Queue類
先添加到隊列的條目(元素)先被取回
2.LIFO(后進先出)隊列LifoQueue類
后被添加到隊列的條目先被取回(類似一個堆棧)
3.優(yōu)先級隊列PriorityQueue類
條目將保持排序(使用heapq模塊),最小值條目先被取出
# 放進優(yōu)先級隊列的條目經(jīng)典模式是以下形式的元組:(優(yōu)先級數(shù)字, 數(shù)據(jù))
# 如果條目之間無法進行比較會引發(fā)TypeError異常
4.Queue類, LifoQueue類, PriorityQueue類的方法
類似于multiprocessing模塊中的JoinableQueue類,區(qū)別是沒有close(), join_thread(), cancel_join_thread()方法且條目放入隊列時不需要極小的延遲才讓empty()之類的方法返回值可信
# 構(gòu)造函數(shù)同樣也有maxsize參數(shù)來指定放入條目的上限
# qsize() > 0保證后續(xù)調(diào)用的get()不被阻塞,qsize() < maxsize也不保證put()不被阻塞。例如在判斷后有其他線程調(diào)用get()或put()
示例:
運行結(jié)果: ?# 其中一次運行結(jié)果
(2, <__main__.A object at 0x0000000002363A00>) ?# 2最先進入q隊列
(3, <__main__.A object at 0x0000000002363C70>) ?# 3最后進入lq隊列
(1, <__main__.A object at 0x0000000002363CD0>) ?# 1在pq隊列中最小
5.SimpleQueue類
無界的FIFO簡單隊列,缺少任務跟蹤等高級功能
# 使用類的實例化創(chuàng)建SimpleQueue類的對象
5.1.SimpleQueue類的方法
5.1.1.qsize()
返回隊列的大致大小
# qsize() > 0不保證后續(xù)調(diào)用的get()不被阻塞
5.1.2.empty()
如果隊列為空返回True,否則返回False
# empty()返回False不保證后續(xù)調(diào)用的get()不被阻塞
5.1.3.put(item, block=True, timeout=None)
將item放入隊列。此方法永不阻塞,始終成功(除了潛在的低級錯誤,例如內(nèi)存分配失?。?/span>
block, timeout——為保持Queue.put()方法的兼容性而提供,其值被忽略
5.1.4.put_nowait(item)
相當于put(item),為保持Queue.put_nowait()兼容性而提供
5.1.5.get(block=True, timeout=None)
相當于Queue.get()
5.1.6.get_nowait()
相當于get(False)