最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊

血的教訓(xùn)-踩了定時(shí)線程池的坑

2023-08-17 21:10 作者:我要吃那朵棉花糖  | 我要投稿

ScheduledExecutorService

一、背景

大家好呀,上周我們公司由于定時(shí)線程池使用不當(dāng)出了一個(gè)故障,幾千萬的單子可能沒了


給兄弟們分享分享這個(gè)坑,希望兄弟們以后別踩!

業(yè)務(wù)中大量的使用定時(shí)線程池(ScheduledExecutorService)執(zhí)行任務(wù),有時(shí)候會(huì)忽略掉 Try/Catch 的異常判斷

當(dāng)任務(wù)執(zhí)行報(bào)錯(cuò)時(shí),會(huì)導(dǎo)致整個(gè)定時(shí)線程池掛掉,影響業(yè)務(wù)的正常需求


二、問題

我們來模仿一個(gè)生產(chǎn)的例子:

  • 合作方修改頻率低且合作方允許最終一致性

  • 我們有一個(gè)定時(shí)任務(wù)每隔 60 秒去 MySQL 拉取全量的 合作方 數(shù)據(jù)放至 合作方緩存(本地緩存)

  • 當(dāng)客戶請求時(shí),我們?nèi)ゾ彺嬷心萌『献鞣郊纯?/p>

這樣的生產(chǎn)例子應(yīng)該存在于絕大數(shù)公司,代碼如下:

java復(fù)制代碼public class Demo { ? ? ?// 創(chuàng)建定時(shí)線程池 ?? ?private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); ? ? ?private List<String> partnerCache = new ArrayList<>(); ? ? ?@PostConstruct ?? ?public void init() { ?? ? ? ?scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ?? ? ? ? ? ?@Override ?? ? ? ? ? ?public void run() { ?? ? ? ? ? ? ? ?// 啟動(dòng)時(shí)每隔60秒執(zhí)行一次數(shù)據(jù)庫的刷新 ?? ? ? ? ? ? ? ?// 將數(shù)據(jù)緩存至本地 ?? ? ? ? ? ? ? ?loadPartner(); ?? ? ? ? ? ?} ?? ? ? ?}, 3, 60, TimeUnit.SECONDS); ?? ?} ? ? ?public void loadPartner() { ?? ? ? ?// 查詢數(shù)據(jù)庫當(dāng)前最新合作方數(shù)據(jù) ?? ? ? ?List<String> partnerList = queryPartners(); ? ? ? ? ?// 合作方數(shù)據(jù)放至緩存 ?? ? ? ?partnerCache.clear(); ?? ? ? ?partnerCache.addAll(partnerList); ?? ?} ? ?? ?public List<String> queryPartners() { ?? ? ? ?// 數(shù)據(jù)庫掛了! ?? ? ? ?throw new RuntimeException(); ?? ?} ?}

運(yùn)行上述樣例,我們會(huì)發(fā)現(xiàn)程序不停止,輸出一遍 Load start!,一直在運(yùn)行,但后續(xù)不輸出 Load start!

這個(gè)時(shí)候我們可以確認(rèn):異常確實(shí)導(dǎo)致當(dāng)前任務(wù)不再執(zhí)行

1、為什么任務(wù)報(bào)錯(cuò)會(huì)影響定時(shí)線程池?

2、定時(shí)線程池是真的掛掉了嘛?

3、定時(shí)線程池內(nèi)部是如何執(zhí)行的?

跟著這三個(gè)問題,我們一起來看一看 ScheduledExecutorService 的原理介紹

三、原理剖析

對(duì)于 ScheduledExecutorService 來說,本質(zhì)上是 延時(shí)隊(duì)列 + 線程池

1、延時(shí)隊(duì)列介紹

DelayQueue 是一個(gè)無界的 BlockingQueue,用于放置實(shí)現(xiàn)了Delayed接口的對(duì)象,只能在到期時(shí)才能從隊(duì)列中取走。

這種隊(duì)列是有序的,即隊(duì)頭對(duì)象的延遲到期時(shí)間最長。

我們看一下延時(shí)隊(duì)列里對(duì)象的屬性:

java復(fù)制代碼class MyDelayedTask implements Delayed{ ?? ?// 當(dāng)前任務(wù)創(chuàng)建時(shí)間 ?? ?private long start = System.currentTimeMillis(); ?? ?// 延時(shí)時(shí)間 ?? ?private long time ; ? ? ?// 初始化 ?? ?public MyDelayedTask(long time) { ?? ? ? ?this.time = time; ?? ?} ? ? ?/** ?? ? * 需要實(shí)現(xiàn)的接口,獲得延遲時(shí)間(用過期時(shí)間-當(dāng)前時(shí)間) ?? ? */ ?? ?@Override ?? ?public long getDelay(TimeUnit unit) { ?? ? ? ?return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS); ?? ?} ? ? ?/** ?? ? * 用于延遲隊(duì)列內(nèi)部比較排序(當(dāng)前時(shí)間的延遲時(shí)間 - 比較對(duì)象的延遲時(shí)間) ?? ? */ ?? ?@Override ?? ?public int compareTo(Delayed o) { ?? ? ? ?MyDelayedTask o1 = (MyDelayedTask) o; ?? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); ?? ?} }

所以,延時(shí)隊(duì)列的實(shí)現(xiàn)原理也很簡單:

  • 生產(chǎn)端:投遞消息時(shí)增加時(shí)間戳(當(dāng)前時(shí)間+延時(shí)時(shí)間

  • 消費(fèi)端:用當(dāng)前時(shí)間與時(shí)間戳進(jìn)行比較,若小于則消費(fèi),反之則循環(huán)等待

2、線程池的原理介紹

  • 當(dāng)前的線程池個(gè)數(shù)低于核心線程數(shù),直接添加核心線程即可

  • 當(dāng)前的線程池個(gè)數(shù)大于核心線程數(shù),將任務(wù)添加至阻塞隊(duì)列中

  • 如果添加阻塞隊(duì)列失敗,則需要添加非核心線程數(shù)處理任務(wù)

  • 如果添加非核心線程數(shù)失?。M了),執(zhí)行拒絕策略

3、定時(shí)線程的原理

我們從定時(shí)線程池的創(chuàng)建看:scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);

java復(fù)制代碼public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ?? ?// 初始化我們的任務(wù) ?? ?// triggerTime:延時(shí)的實(shí)現(xiàn) ?? ?ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period)); ?? ?RunnableScheduledFuture<Void> t = decorateTask(command, sft); ?? ?sft.outerTask = t; ?? ?delayedExecute(t); ?? ?return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { ?? ?// 將當(dāng)前任務(wù)丟進(jìn)延時(shí)隊(duì)列 ?? ?super.getQueue().add(task); ?? ?// 創(chuàng)建核心線程并啟動(dòng) ? ? ?ensurePrestart(); } ?// 時(shí)間輪算法 private long triggerTime(long delay, TimeUnit unit) { ?? ?return now() + delay; }

從這里我們可以得到結(jié)論:定時(shí)線程池通過延時(shí)隊(duì)列來達(dá)到定時(shí)的目的

有一個(gè)問題:我們僅僅向 Queue 里面放了一個(gè)任務(wù),他是怎么保證執(zhí)行多次的呢?

帶著這個(gè)問題,我們看一下他拉取任務(wù)啟動(dòng)的代碼:

java復(fù)制代碼for (;;) { ?? ?// 從延時(shí)隊(duì)列中獲取任務(wù) ?? ?Runnable r = workQueue.take(); } public RunnableScheduledFuture<?> take(){ ?? ?for (;;) { ?? ? ? ?// 獲取隊(duì)列第一個(gè)任務(wù) ?? ? ? ?RunnableScheduledFuture<?> first = queue[0]; ? ?? ? ? ?// 【重點(diǎn)】如果當(dāng)前隊(duì)列任務(wù)為空,則等待 ?? ? ? ?if (first == null){ ?? ? ? ? ? ?available.await(); ?? ? ? ?} ? ?? ? ? ?// 獲取當(dāng)前任務(wù)的時(shí)間 ?? ? ? ?long delay = first.getDelay(NANOSECONDS); ? ?? ? ? ?if (delay <= 0){ ?? ? ? ? ? ?// 彈出當(dāng)前任務(wù) ?? ? ? ? ? ?return finishPoll(first); ?? ? ? ?} ? ? ?? ?} } // 時(shí)間戳減去當(dāng)前時(shí)間 public long getDelay(TimeUnit unit) { ?? ?return unit.convert(time - now(), NANOSECONDS); }

當(dāng)拿到任務(wù)(ScheduledFutureTask)之后,會(huì)執(zhí)行任務(wù):task.run()

java復(fù)制代碼public void run() { ?? // 執(zhí)行當(dāng)前的任務(wù) ?? if (ScheduledFutureTask.super.runAndReset()) { ?? ? ? ?setNextRunTime(); ?? ? ? ?reExecutePeriodic(outerTask); ?? ?} } ?protected boolean runAndReset() { ?? ?if (state != NEW){ ?? ? ? ?return false; ?? ?} ?? ?int s = state; ?? ?try { ?? ? ? ?Callable<V> c = callable; ?? ? ? ?if (c != null && s == NEW) { ?? ? ? ? ? ?try { ?? ? ? ? ? ? ? ?// 執(zhí)行任務(wù) ?? ? ? ? ? ? ? ?c.call(); ? ? ? ? ? ? ? ? ?// 【重點(diǎn)!??!】如果任務(wù)正常執(zhí)行成功的話,這里會(huì)將ran置為true ?? ? ? ? ? ? ? ?// 如果你的任務(wù)有問題,會(huì)被下面直接捕捉到,不會(huì)將此處的ran置為true ?? ? ? ? ? ? ? ?ran = true; ?? ? ? ? ? ?} catch (Throwable ex) { ?? ? ? ? ? ? ? ?// 出現(xiàn)異常會(huì)將state置為EXCEPTIONAL ?? ? ? ? ? ? ? ?// 標(biāo)記當(dāng)前任務(wù)執(zhí)行失敗并將異常賦值到結(jié)果 ?? ? ? ? ? ? ? ?setException(ex); ?? ? ? ? ? ?}finally { ?? ? ? ? ? ? ? ? s = state; ?? ? ? ? ? ?} ?? ? ? ?} ?? ?} ?? ?// ran:當(dāng)前任務(wù)是否執(zhí)行成功 ?? ?// s:當(dāng)前任務(wù)狀態(tài) ?? ?// ran為false:當(dāng)前任務(wù)執(zhí)行失敗 ?? ?// s == NEW = false:當(dāng)前任務(wù)狀態(tài)出現(xiàn)異常 ?? ?return ran && s == NEW; }

如果我們的 runAndReset 返回 false 的話,那么進(jìn)不去 setNextRunTime 該方法:

java復(fù)制代碼if (ScheduledFutureTask.super.runAndReset()) { ?? ?// 修改當(dāng)前任務(wù)的Time ?? ?setNextRunTime(); ?? ?// 將任務(wù)重新丟進(jìn)隊(duì)列 ?? ?reExecutePeriodic(outerTask); }

最終,任務(wù)沒有辦法被丟進(jìn)隊(duì)列,我們的線程無法拿到任務(wù)執(zhí)行,一直在等待。

四、結(jié)論

通過上面的分析,我們回頭看一下開篇的三個(gè)問題:

1、為什么任務(wù)報(bào)錯(cuò)會(huì)影響定時(shí)線程池?

  • 任務(wù)報(bào)錯(cuò)不會(huì)影響線程池,只是線程池將當(dāng)前任務(wù)給丟失,沒有繼續(xù)放到隊(duì)列中

2、定時(shí)線程池是真的掛掉了嘛?

  • 定時(shí)線程池沒有掛,掛的只是報(bào)錯(cuò)的任務(wù)

3、定時(shí)線程池內(nèi)部是如何執(zhí)行的?

  • 線程池 + 延時(shí)隊(duì)列

所以,通過上述的講解,我們應(yīng)該認(rèn)識(shí)到:定時(shí)任務(wù)一定要加Try Catch,不然一旦發(fā)生異常

不然,你就會(huì)和作者一樣,背故障讓公司損失幾千萬,血的教訓(xùn)!

五、總結(jié)

魯迅先生曾說:獨(dú)行難,眾行易,和志同道合的人一起進(jìn)步。彼此毫無保留的分享經(jīng)驗(yàn),才是對(duì)抗互聯(lián)網(wǎng)寒冬的最佳選擇。

其實(shí)很多時(shí)候,并不是我們不夠努力,很可能就是自己努力的方向不對(duì),如果有一個(gè)人能稍微指點(diǎn)你一下,你真的可能會(huì)少走幾年彎路。

如果你也對(duì) 后端架構(gòu)中間件源碼 有興趣,歡迎添加博主微信:hls1793929520,一起學(xué)習(xí),一起成長

我是愛敲代碼的小黃,獨(dú)角獸企業(yè)的Java開發(fā)工程師,CSDN博客專家,喜歡后端架構(gòu)和中間件源碼。

我們下期再見。

血的教訓(xùn)-踩了定時(shí)線程池的坑的評(píng)論 (共 條)

分享到微博請遵守國家法律
虹口区| 翼城县| 永川市| 宁晋县| 石景山区| 南投县| 丽水市| 苏尼特左旗| 山阳县| 介休市| 长白| 怀来县| 布尔津县| 普宁市| 郴州市| 仙游县| 兴安盟| 高陵县| 福海县| 昌平区| 全州县| 马鞍山市| 松桃| 张家川| 玛多县| 大理市| 绵阳市| 沈丘县| 科技| 安阳县| 达日县| 云南省| 海口市| 台安县| 盐亭县| 会同县| 华坪县| 西平县| 苏州市| 阳新县| 酒泉市|