最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

apscheduler明燈

2023-02-25 07:26 作者:韭菜怎么賣  | 我要投稿

????作為一個python一個非常好用的定時框架,apscheduler擁有非常強大的定時功能、非常簡陋的描述文檔和非常糟糕的示例,甚至國內的文章在介紹這個框架的時候,大多只是簡單地把apscheduler文檔的首頁機翻了一遍,這造成了非常壞的影響。寫這篇文章給大家排排坑。

????首先我們做一個測試,由于我的項目暫時還沒用到job_stores,所以就主要針對executors和schedulers。(寫完才發(fā)現(xiàn)這里logger傳參應該是__file__,生米煮成熟飯就不統(tǒng)一改了,只是影響展示而已)

????在例子中定義了一個能在控制臺打印的logger,方便我們觀察job運行的時間和信息;裝飾器默認打印出任務的所在的進程和線程;為了適應復雜的需要,我定義了多種任務:包括異步的、阻塞的、文件IO的等等。

????首先測試的第一種scheduler:BlockingScheduler。查閱官方文檔,它說明了在apscheduler高層,希望用戶調用的apscheduler無非以下幾類:

????我的項目中常用的是前三種,因為并沒有用到tornado等框架。使用時需要根據(jù)框架具體選擇何種scheduler:比如當我的任務都是異步任務的時候,那就選擇AsyncIOScheduler。

????BlockingScheduler很明確的說明了:當你的scheduler在start后,主進程不需要執(zhí)行其他的動西時,就采用該框架。這并不代表著scheduler只能執(zhí)行一個任務,而是說當scheduler.start之后,主進程中所有的內容都會阻塞于此,效果相當于在之后調用asyncio.get_event_loop().run_forever(),或者更確切的來說:相當于啟用process或者thread后立馬join它們

????輸出如下:

????通過觀察結果可以得出以下結論;

????①.在沒有使用executor的情況下,每個任務觸發(fā)時是在主線程下的不同進程。更準確的來說,我們根據(jù)apscheduler的文檔:

????可以得知,在不指定executor的情況下,apscheduler默認使用一個大小為5的線程池,復用每一個線程。這也就是為什么當blocking_job運行時期間觸發(fā)了print_job,兩者使用了不同的線程,而其他時候(比如執(zhí)行print_job后馬上執(zhí)行os_job)復用了同一個線程。

????也就是說apscheduler默認的線程池,并不是每一個任務綁定一個線程,而是任務開啟時向線程池固定的、已創(chuàng)建的5個線程中acquire一個,運行完成后再將這個線程release回線程池,每一個任務的每一次執(zhí)行都可能是在不同的線程下。

????②.當我們把blocking_job的interval參數(shù)調整為1時(這里我就不仔細講trigger觸發(fā)器,以及如何給job傳參,看看示例就能明白),會觸發(fā)錯誤:

????這是取決于job_defaults中的參數(shù),這些參數(shù)共享給每一個job,當然你也可以在add_job中給每一個job單獨指定這些參數(shù)。簡單講述下參數(shù):

????coalesce:合并,當因為某些原因一個任務積攢多次未運行時,True會讓多次任務只運行一次,而False則是積攢多少次就運行多少次。

????misfire_grace_time:容許延遲,當任務因為線程池占用滿等原因被迫等待時,若等待超過延時,則會自動skip。默認參數(shù)是Undifine,會無限等待下去。

????replace_existing: 當任務觸發(fā)時,scheduler中有相同id的任務,則會自動替代它。這個功能常用在非阻塞的scheduler中,也就是說當你一開始創(chuàng)建一個id為"1"(要求id是字符串)的任務,在scheduler啟動后(在BlockingScheduler中會阻塞,所以沒辦法演示在啟動后再增加任務)再增加一個同樣id為"1"但是function完全不同的任務,那么之前的任務就會被頂?shù)?。也就是說apscheduler區(qū)別任務不是通過函數(shù)名,而是通過任務的id。

????max_instances:同一id任務運行同時運行的最大次數(shù)限制,就像上文報錯內容一樣,因為我們指定同一個任務同時只能運行一個。這個參數(shù)默認是1,且該行為是先于executor的。也就是說當blocking_job的max_instances為2,而線程池只有1的大小時,第二個blocking_job是不會報錯的,但第三第四第五個會立馬報錯;而如果max_instances為1且線程池也為1時,則第二個任務就會報錯??偨Y一下就是:一個任務只有在滿足max_instances的情況下才會被放行去aquire線程池,至于acquire的結果是否阻塞,并不在考慮的范圍中。

????其他參數(shù)還有顯式設置id,指定第一次運行時間,指定特定的job存儲方式(使用內存還算數(shù)據(jù)庫),使用何種executor等等。

????③.當我們試圖添加async_job進入BlockingScheduler時,啟動時則會立即報錯說該任務never awaited,也就是說它是無法跑任何協(xié)程的。


????我們嘗試使用進程池:

????這里非常值得提的一點就是,網(wǎng)絡上的寫法大致是這樣的:

????這樣寫并沒有任何問題,但需要注意的是:是創(chuàng)建scheduler的時候其實并不會將所有任務自動調用processpool,它相當于只是給這個scheduler創(chuàng)立了一個屬性,別名叫做processpool。所有的規(guī)定executor都必須要手動使用add_executor,或者在創(chuàng)建任務時添加executor參數(shù),參數(shù)內容就用別名就行。

????輸出:

????可以看出原理同線程池一樣,它會復用所有的進程。由于blocking_job執(zhí)行需要5秒,執(zhí)行間隔是4秒,且max_instances為1,所以第二次執(zhí)行的時候就自動被skip了。如果在線程池中跑異步任務會直接報錯無法pickle,這很容易理解(因為你沒有顯式創(chuàng)建event_loop),總之它無法跑異步。

????需要注意的是,add_scheduler時,如果在executor中已經(jīng)指定了default,它會自動先使用default。邏輯在于:如果你沒有規(guī)定default也沒有手動add_executor,它就用默認線程池;如果你規(guī)定了default,它就自動調用default進行一個add_executor操作;如果在有default的情況下還想add_executor,那么你需要先把default給remove掉。所以我建議一切添加executor的行為都手動使用add_executor,而不是寫在一個dict里然后傳給scheduler實例化

????

????我們換做BackgroundScheduler:

????我們新增了一個print_job_substitute來演示replace_existing的效果,需要注意的是scheduler啟動后新增的job需要手動添加參數(shù),它不會繼承job_defaults中的內容。所以我建議所有的job_settings都手動在add_job中解決,越是共用就越麻煩。

????輸出:

????我們可以注意到:

????①.使用該種方法和BlockingScheduler的唯一區(qū)別就在于,start之后非阻塞,而是后臺運行。其他功能和BlockingScheduler別無二致(實際上研讀源碼你也會知道,BackgroundScheduler是繼承BlockingScheduler的,且改動很少)。所以start后的print被打印了出來。

????②.print_job_substitute確實用同樣的id頂?shù)袅藀rint_job,頂替的過程類似于:如果該任務正在執(zhí)行,就讓它執(zhí)行完成;否則都立馬更改任務內容并重新計算interval(也就是說假設print都是3秒執(zhí)行一次,它在剩下1秒就要觸發(fā)的時候被頂替,那么這個新的任務會重新等待3秒)。

????③.當主進程結束時,scheduler就會被夾斷,演示中沒有展示被夾斷時候的情況,大概是說:無法再進行任務了,當這個scheduler都已經(jīng)shutdown的時候。所以從原理上看scheduler會在__del__的時候自動調用它的shutdown功能,當然你也可以手動shutdown。為了避免這種尷尬的情況出現(xiàn),我們可以加一點東西:一個while True的input,讓你能夠手動的通過控制臺輸入來決定程序是否結束;或者直接點asyncio.get_event_loop().run_forever。我們可以美好地在外面包一層try KeyboardInterrupt,做一點shutdown后的遺言工作。


????最后來說說重量級的AsyncIOScheduler。為了適應測試需要,我們改裝幾個異步函數(shù):

??????先不加任何的executor:

??????輸出:

????非常符合直覺的:異步當然沒辦法使用默認的線程池,不同的線程怎么共享一個event_loop呢?所以它們當然都是運行在同一進程同一線程下的。

????現(xiàn)在我增加一個同步任務,你會很驚訝的發(fā)現(xiàn)輸出變?yōu)榱耍?br>

????也就是說AsyncIOScheduler也是可以跑同步任務的,它會自動不知道怎么(我等下會說明)創(chuàng)建一個線程池;如果你在主線程中打印一下pid和tid,你會發(fā)現(xiàn)所有異步任務用的進程和線程都和主線程一樣。也就是說:對于異步的任務,本質上是跑在主進程主線程中的同一個event_loop當中的;同步的任務,它會使用進程池,本質上效果等于asyncio.to_thread

????你甚至可以對AsyncIOScheduler使用ProcessPoolExecutor,但這做是沒有意義的,官方文檔并不推薦這樣做,該過程也不包含創(chuàng)建event_loop,這會導致所有的異步任務never awaited。AsyncIOScheduler底層的默認executor是AsyncIOExecutor。我們看AsyncIOExecutor的源碼:

????在if iscoroutinefunction_partial(job.func)的地方,它檢測函數(shù)是否是異步的(底層代碼就是調用asyncio.iscoroutinefuction),如果不是,就走else的地方,調用的是loop.run_in_executor(如果你看過我之前的文章,你就會知道這個方法高層封裝之后就是asyncio.to_thread);如果是,就調用loop.create_task。

????也就是說,一個同步任務,本質上是通過跑在concurrent.threadpoolexecutor里面,變成一個全await的異步任務并添加進了event_loop。

????AsyncIOExecutor本身是不接受任何參數(shù)的,所以幾乎沒有辦法客制化它(也沒辦法說我能夠在run_in_executor時用進程池,或者我給每一個異步函數(shù)開一個單獨的進程,創(chuàng)立單獨的loop)。它本質是為AsyncIOScheduler服務的底層代碼。

????有沒有辦法把異步任務單獨建立進程、線程來跑呢?我常常希望多進程、多線程并發(fā)還異步,讓本來就巨慢無比的python代碼上高速。有的,但是需要麻煩點,而且資源占用的事情我概不負責。

????我們給它包一層:

????然后它就變同步了,這時候我們再用BackgroundScheduler帶著ProcessPoolExecutor來跑,竟然還真的跑得通:

????這下是真的實現(xiàn)多進程異步了,就是資源占得有點多,還有管理起來有點困難。如果使用線程池,就需要改成new_event_loop,然后還要set一下(本質原因或許是子線程調用get_event_loop功能等同于get_running_loop,無法自動創(chuàng)建,所以需要手動創(chuàng)建):

????????輸出:

????你需要注意的是,由于每一個任務并非綁定固定的線程,所以一些使用線程id作為標志的單例是極不安全的。通過打印loop你會發(fā)現(xiàn):任務之間、每個任務運行時,其loop的id都是完全不一樣的,它是真的在不斷地新建和消滅,所以從某種意義上講這是一種浪費。這是否和你的需求相匹配,取決于你是否希望犧牲安全來換取速度。

????最后祝諸君都寫出能滿足項目需求的屎山代碼!

apscheduler明燈的評論 (共 條)

分享到微博請遵守國家法律
嘉定区| 绍兴市| 丘北县| 山阴县| 三都| 囊谦县| 确山县| 浦城县| 舞钢市| 固原市| 乌鲁木齐县| 明水县| 平安县| 广昌县| 布拖县| 辰溪县| 同仁县| 沙湾县| 石渠县| 壤塘县| 许昌县| 灵武市| 错那县| 阳东县| 东辽县| 德钦县| 中牟县| 阳曲县| 溧阳市| 富源县| 来安县| 新和县| 青州市| 东乡族自治县| 承德市| 浏阳市| 关岭| 瑞昌市| 贺兰县| 凤翔县| 肇东市|