血的教訓(xùn)-踩了定時(shí)線程池的坑
ScheduledExecutorService
一、背景
大家好呀,上周我們公司由于定時(shí)線程池使用不當(dāng)出了一個(gè)故障,幾千萬的單子可能沒了
給兄弟們分享分享這個(gè)坑,希望兄弟們以后別踩!
業(yè)務(wù)中大量的使用定時(shí)線程池(ScheduledExecutorService
)執(zhí)行任務(wù),有時(shí)候會(huì)忽略掉 Try/Catch
的異常判斷
當(dāng)任務(wù)執(zhí)行報(bào)錯(cuò)時(shí),會(huì)導(dǎo)致整個(gè)定時(shí)線程池掛掉,影響業(yè)務(wù)的正常需求
二、問題
我們來模仿一個(gè)生產(chǎn)的例子:
合作方修改頻率低且合作方允許最終一致性
我們有一個(gè)定時(shí)任務(wù)每隔
60
秒去MySQL
拉取全量的合作方
數(shù)據(jù)放至 合作方緩存(本地緩存) 中當(dāng)客戶請求時(shí),我們?nèi)ゾ彺嬷心萌『献鞣郊纯?/p>
這樣的生產(chǎn)例子應(yīng)該存在于絕大數(shù)公司,代碼如下:
java復(fù)制代碼public class Demo { ? ? ?// 創(chuàng)建定時(shí)線程池 ?? ?private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); ? ? ?private List<String> partnerCache = new ArrayList<>(); ? ? ?@PostConstruct ?? ?public void init() { ?? ? ? ?scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ?? ? ? ? ? ?@Override ?? ? ? ? ? ?public void run() { ?? ? ? ? ? ? ? ?// 啟動(dòng)時(shí)每隔60秒執(zhí)行一次數(shù)據(jù)庫的刷新 ?? ? ? ? ? ? ? ?// 將數(shù)據(jù)緩存至本地 ?? ? ? ? ? ? ? ?loadPartner(); ?? ? ? ? ? ?} ?? ? ? ?}, 3, 60, TimeUnit.SECONDS); ?? ?} ? ? ?public void loadPartner() { ?? ? ? ?// 查詢數(shù)據(jù)庫當(dāng)前最新合作方數(shù)據(jù) ?? ? ? ?List<String> partnerList = queryPartners(); ? ? ? ? ?// 合作方數(shù)據(jù)放至緩存 ?? ? ? ?partnerCache.clear(); ?? ? ? ?partnerCache.addAll(partnerList); ?? ?} ? ?? ?public List<String> queryPartners() { ?? ? ? ?// 數(shù)據(jù)庫掛了! ?? ? ? ?throw new RuntimeException(); ?? ?} ?}
運(yùn)行上述樣例,我們會(huì)發(fā)現(xiàn)程序不停止,輸出一遍 Load start!
,一直在運(yùn)行,但后續(xù)不輸出 Load start!
這個(gè)時(shí)候我們可以確認(rèn):異常確實(shí)導(dǎo)致當(dāng)前任務(wù)不再執(zhí)行
1、為什么任務(wù)報(bào)錯(cuò)會(huì)影響定時(shí)線程池?
2、定時(shí)線程池是真的掛掉了嘛?
3、定時(shí)線程池內(nèi)部是如何執(zhí)行的?
跟著這三個(gè)問題,我們一起來看一看 ScheduledExecutorService
的原理介紹
三、原理剖析
對(duì)于 ScheduledExecutorService
來說,本質(zhì)上是 延時(shí)隊(duì)列 + 線程池
1、延時(shí)隊(duì)列介紹
DelayQueue
是一個(gè)無界的 BlockingQueue
,用于放置實(shí)現(xiàn)了Delayed接口的對(duì)象,只能在到期時(shí)才能從隊(duì)列中取走。
這種隊(duì)列是有序的,即隊(duì)頭對(duì)象的延遲到期時(shí)間最長。
我們看一下延時(shí)隊(duì)列里對(duì)象的屬性:
java復(fù)制代碼class MyDelayedTask implements Delayed{ ?? ?// 當(dāng)前任務(wù)創(chuàng)建時(shí)間 ?? ?private long start = System.currentTimeMillis(); ?? ?// 延時(shí)時(shí)間 ?? ?private long time ; ? ? ?// 初始化 ?? ?public MyDelayedTask(long time) { ?? ? ? ?this.time = time; ?? ?} ? ? ?/** ?? ? * 需要實(shí)現(xiàn)的接口,獲得延遲時(shí)間(用過期時(shí)間-當(dāng)前時(shí)間) ?? ? */ ?? ?@Override ?? ?public long getDelay(TimeUnit unit) { ?? ? ? ?return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS); ?? ?} ? ? ?/** ?? ? * 用于延遲隊(duì)列內(nèi)部比較排序(當(dāng)前時(shí)間的延遲時(shí)間 - 比較對(duì)象的延遲時(shí)間) ?? ? */ ?? ?@Override ?? ?public int compareTo(Delayed o) { ?? ? ? ?MyDelayedTask o1 = (MyDelayedTask) o; ?? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); ?? ?} }
所以,延時(shí)隊(duì)列的實(shí)現(xiàn)原理也很簡單:
生產(chǎn)端:投遞消息時(shí)增加時(shí)間戳(當(dāng)前時(shí)間+延時(shí)時(shí)間)
消費(fèi)端:用當(dāng)前時(shí)間與時(shí)間戳進(jìn)行比較,若小于則消費(fèi),反之則循環(huán)等待
2、線程池的原理介紹
當(dāng)前的線程池個(gè)數(shù)低于核心線程數(shù),直接添加核心線程即可
當(dāng)前的線程池個(gè)數(shù)大于核心線程數(shù),將任務(wù)添加至阻塞隊(duì)列中
如果添加阻塞隊(duì)列失敗,則需要添加非核心線程數(shù)處理任務(wù)
如果添加非核心線程數(shù)失?。M了),執(zhí)行拒絕策略
3、定時(shí)線程的原理
我們從定時(shí)線程池的創(chuàng)建看:scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);
java復(fù)制代碼public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ?? ?// 初始化我們的任務(wù) ?? ?// triggerTime:延時(shí)的實(shí)現(xiàn) ?? ?ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period)); ?? ?RunnableScheduledFuture<Void> t = decorateTask(command, sft); ?? ?sft.outerTask = t; ?? ?delayedExecute(t); ?? ?return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { ?? ?// 將當(dāng)前任務(wù)丟進(jìn)延時(shí)隊(duì)列 ?? ?super.getQueue().add(task); ?? ?// 創(chuàng)建核心線程并啟動(dòng) ? ? ?ensurePrestart(); } ?// 時(shí)間輪算法 private long triggerTime(long delay, TimeUnit unit) { ?? ?return now() + delay; }
從這里我們可以得到結(jié)論:定時(shí)線程池通過延時(shí)隊(duì)列來達(dá)到定時(shí)的目的
有一個(gè)問題:我們僅僅向 Queue
里面放了一個(gè)任務(wù),他是怎么保證執(zhí)行多次的呢?
帶著這個(gè)問題,我們看一下他拉取任務(wù)啟動(dòng)的代碼:
java復(fù)制代碼for (;;) { ?? ?// 從延時(shí)隊(duì)列中獲取任務(wù) ?? ?Runnable r = workQueue.take(); } public RunnableScheduledFuture<?> take(){ ?? ?for (;;) { ?? ? ? ?// 獲取隊(duì)列第一個(gè)任務(wù) ?? ? ? ?RunnableScheduledFuture<?> first = queue[0]; ? ?? ? ? ?// 【重點(diǎn)】如果當(dāng)前隊(duì)列任務(wù)為空,則等待 ?? ? ? ?if (first == null){ ?? ? ? ? ? ?available.await(); ?? ? ? ?} ? ?? ? ? ?// 獲取當(dāng)前任務(wù)的時(shí)間 ?? ? ? ?long delay = first.getDelay(NANOSECONDS); ? ?? ? ? ?if (delay <= 0){ ?? ? ? ? ? ?// 彈出當(dāng)前任務(wù) ?? ? ? ? ? ?return finishPoll(first); ?? ? ? ?} ? ? ?? ?} } // 時(shí)間戳減去當(dāng)前時(shí)間 public long getDelay(TimeUnit unit) { ?? ?return unit.convert(time - now(), NANOSECONDS); }
當(dāng)拿到任務(wù)(ScheduledFutureTask)之后,會(huì)執(zhí)行任務(wù):task.run()
java復(fù)制代碼public void run() { ?? // 執(zhí)行當(dāng)前的任務(wù) ?? if (ScheduledFutureTask.super.runAndReset()) { ?? ? ? ?setNextRunTime(); ?? ? ? ?reExecutePeriodic(outerTask); ?? ?} } ?protected boolean runAndReset() { ?? ?if (state != NEW){ ?? ? ? ?return false; ?? ?} ?? ?int s = state; ?? ?try { ?? ? ? ?Callable<V> c = callable; ?? ? ? ?if (c != null && s == NEW) { ?? ? ? ? ? ?try { ?? ? ? ? ? ? ? ?// 執(zhí)行任務(wù) ?? ? ? ? ? ? ? ?c.call(); ? ? ? ? ? ? ? ? ?// 【重點(diǎn)!??!】如果任務(wù)正常執(zhí)行成功的話,這里會(huì)將ran置為true ?? ? ? ? ? ? ? ?// 如果你的任務(wù)有問題,會(huì)被下面直接捕捉到,不會(huì)將此處的ran置為true ?? ? ? ? ? ? ? ?ran = true; ?? ? ? ? ? ?} catch (Throwable ex) { ?? ? ? ? ? ? ? ?// 出現(xiàn)異常會(huì)將state置為EXCEPTIONAL ?? ? ? ? ? ? ? ?// 標(biāo)記當(dāng)前任務(wù)執(zhí)行失敗并將異常賦值到結(jié)果 ?? ? ? ? ? ? ? ?setException(ex); ?? ? ? ? ? ?}finally { ?? ? ? ? ? ? ? ? s = state; ?? ? ? ? ? ?} ?? ? ? ?} ?? ?} ?? ?// ran:當(dāng)前任務(wù)是否執(zhí)行成功 ?? ?// s:當(dāng)前任務(wù)狀態(tài) ?? ?// ran為false:當(dāng)前任務(wù)執(zhí)行失敗 ?? ?// s == NEW = false:當(dāng)前任務(wù)狀態(tài)出現(xiàn)異常 ?? ?return ran && s == NEW; }
如果我們的 runAndReset
返回 false
的話,那么進(jìn)不去 setNextRunTime
該方法:
java復(fù)制代碼if (ScheduledFutureTask.super.runAndReset()) { ?? ?// 修改當(dāng)前任務(wù)的Time ?? ?setNextRunTime(); ?? ?// 將任務(wù)重新丟進(jìn)隊(duì)列 ?? ?reExecutePeriodic(outerTask); }
最終,任務(wù)沒有辦法被丟進(jìn)隊(duì)列,我們的線程無法拿到任務(wù)執(zhí)行,一直在等待。
四、結(jié)論
通過上面的分析,我們回頭看一下開篇的三個(gè)問題:
1、為什么任務(wù)報(bào)錯(cuò)會(huì)影響定時(shí)線程池?
任務(wù)報(bào)錯(cuò)不會(huì)影響線程池,只是線程池將當(dāng)前任務(wù)給丟失,沒有繼續(xù)放到隊(duì)列中
2、定時(shí)線程池是真的掛掉了嘛?
定時(shí)線程池沒有掛,掛的只是報(bào)錯(cuò)的任務(wù)
3、定時(shí)線程池內(nèi)部是如何執(zhí)行的?
線程池 + 延時(shí)隊(duì)列
所以,通過上述的講解,我們應(yīng)該認(rèn)識(shí)到:定時(shí)任務(wù)一定要加Try Catch,不然一旦發(fā)生異常
不然,你就會(huì)和作者一樣,背故障讓公司損失幾千萬,血的教訓(xùn)!
五、總結(jié)
魯迅先生曾說:獨(dú)行難,眾行易,和志同道合的人一起進(jìn)步。彼此毫無保留的分享經(jīng)驗(yàn),才是對(duì)抗互聯(lián)網(wǎng)寒冬的最佳選擇。
其實(shí)很多時(shí)候,并不是我們不夠努力,很可能就是自己努力的方向不對(duì),如果有一個(gè)人能稍微指點(diǎn)你一下,你真的可能會(huì)少走幾年彎路。
如果你也對(duì) 后端架構(gòu) 和 中間件源碼 有興趣,歡迎添加博主微信:hls1793929520,一起學(xué)習(xí),一起成長
我是愛敲代碼的小黃,獨(dú)角獸企業(yè)的Java開發(fā)工程師,CSDN博客專家,喜歡后端架構(gòu)和中間件源碼。
我們下期再見。