最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

Streaming 讀取Kafka 保存OFFSET到kafka

2023-03-22 14:50 作者:兩年半的java練習(xí)生  | 我要投稿

Streaming 讀取Kafka 實(shí)現(xiàn)斷點(diǎn)續(xù)讀功能

老版本的kafka比較麻煩,streaming提供的只有checkpoint方法實(shí)現(xiàn)斷點(diǎn)續(xù)讀功能,但是當(dāng)修改程序之后就沒(méi)法平滑部署。

因?yàn)閏heckpoint存儲(chǔ)的是整個(gè)streaming啟動(dòng)類(lèi)的序列化文件,當(dāng)文件改動(dòng)之后沒(méi)法反序列化了。所以需要更好的方法來(lái)實(shí)現(xiàn)讀取Kafka 實(shí)現(xiàn)斷點(diǎn)續(xù)讀功能。

本文主要講解的就是通過(guò)zookeeper保存offset信息來(lái)實(shí)現(xiàn)的功能。

這里我們解釋如何配置Spark Streaming已從Kafka接收數(shù)據(jù)。有兩種方法 - 使用Receivers和Kafka的高級(jí)API的舊方法,以及不使用Receiver的新方法(在Spark 1.3中引入)。它們具有不同的編程模型,性能特征和語(yǔ)義保證,因此請(qǐng)繼續(xù)閱讀以獲取更多詳細(xì)信息。從當(dāng)前版本的Spark開(kāi)始,這兩種方法都被認(rèn)為是穩(wěn)定的API。

方法1: Receiver-based Approach

此方法使用Receiver接收數(shù)據(jù)。Receiver是使用Kafka高級(jí)消費(fèi)者API實(shí)現(xiàn)的。與所有接收器一樣,從Kafka通過(guò)Receiver接收的數(shù)據(jù)存儲(chǔ)在Spark執(zhí)行器中,然后由Spark Streaming啟動(dòng)的作業(yè)處理數(shù)據(jù)。

但是,在默認(rèn)配置下,此方法可能會(huì)在失敗時(shí)丟失數(shù)據(jù)(請(qǐng)參閱接收器可靠性。為確保零數(shù)據(jù)丟失,您必須在Spark Streaming中另外啟用預(yù)寫(xiě)日志(在Spark 1.2中引入)。這將同步保存所有收到的Kafka將數(shù)據(jù)轉(zhuǎn)換為分布式文件系統(tǒng)(例如HDFS)上的預(yù)寫(xiě)日志,以便在發(fā)生故障時(shí)可以恢復(fù)所有數(shù)據(jù)。有關(guān)預(yù)寫(xiě)日志的更多詳細(xì)信息,請(qǐng)參閱流式編程指南中的“ 部署”部分。

方法2: Direct Approach (No Receivers)

Spark 1.3中引入了這種新的無(wú)接收器“直接”方法,以確保更強(qiáng)大的端到端保證。這種方法不是使用接收器來(lái)接收數(shù)據(jù),而是定期向Kafka查詢(xún)每個(gè)主題+分區(qū)中的最新偏移量,并相應(yīng)地定義要在每個(gè)批次中處理的偏移量范圍。當(dāng)啟動(dòng)處理數(shù)據(jù)的作業(yè)時(shí),Kafka的簡(jiǎn)單消費(fèi)者API用于從Kafka讀取定義的偏移范圍(類(lèi)似于從文件系統(tǒng)讀取的文件)。請(qǐng)注意,此功能是在Spark 1.3中為Scala和Java API引入的,在Python 1.4中為Python API引入。

與基于接收器的方法(即方法1)相比,該方法具有以下優(yōu)點(diǎn)。

  • 簡(jiǎn)化的并行性:?無(wú)需創(chuàng)建多個(gè)輸入Kafka流并將它們聯(lián)合起來(lái)。使用directStream,Spark Streaming將創(chuàng)建與要使用的Kafka分區(qū)一樣多的RDD分區(qū),這些分區(qū)將并行地從Kafka讀取數(shù)據(jù)。因此,Kafka和RDD分區(qū)之間存在一對(duì)一的映射,這更容易理解和調(diào)整。

  • 效率:?在第一種方法中實(shí)現(xiàn)零數(shù)據(jù)丟失需要將數(shù)據(jù)存儲(chǔ)在預(yù)寫(xiě)日志中,這會(huì)進(jìn)一步復(fù)制數(shù)據(jù)。這實(shí)際上是低效的,因?yàn)閿?shù)據(jù)有效地被復(fù)制兩次 - 一次由Kafka復(fù)制,第二次由Write-Ahead Log復(fù)制。第二種方法消除了問(wèn)題,因?yàn)闆](méi)有接收器,因此不需要預(yù)寫(xiě)日志。只要您有足夠的Kafka保留,就可以從Kafka恢復(fù)消息。

  • 完全一次的語(yǔ)義:?第一種方法使用Kafka的高級(jí)API在Zookeeper中存儲(chǔ)消耗的偏移量。傳統(tǒng)上,這是從Kafka使用數(shù)據(jù)的方式。雖然這種方法(與預(yù)寫(xiě)日志結(jié)合使用)可以確保零數(shù)據(jù)丟失(即至少一次語(yǔ)義),但某些記錄在某些故障下可能會(huì)被消耗兩次。這是因?yàn)镾park Streaming可靠接收的數(shù)據(jù)與Zookeeper跟蹤的偏移之間存在不一致。因此,在第二種方法中,我們使用不使用Zookeeper的簡(jiǎn)單Kafka API。Spark Streaming在其檢查點(diǎn)內(nèi)跟蹤偏移量。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,因此盡管出現(xiàn)故障,Spark Streaming也會(huì)有效地接收每條記錄一次。為了獲得結(jié)果輸出的精確一次語(yǔ)義,主編程指南中輸出操作的語(yǔ)義以獲取更多信息)。

請(qǐng)注意,此方法的一個(gè)缺點(diǎn)是它不會(huì)更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka監(jiān)視工具將不會(huì)顯示進(jìn)度。但是,您可以在每個(gè)批次中訪(fǎng)問(wèn)此方法處理的偏移量,并自行更新Zookeeper。

目前使用較多的是方法二,因?yàn)楸容^方便。但是方法二中存在一個(gè)問(wèn)題,就是需要自己來(lái)維護(hù)offset信息,才能實(shí)現(xiàn)服務(wù)重新部署之后能從之前讀取的位置繼續(xù)讀取kafka的數(shù)據(jù)。

保存kafka的offset信息有兩種方案

方案一:通過(guò)一個(gè)文件路勁來(lái)保存,存儲(chǔ)的是checkpoint的信息。實(shí)際上就是把類(lèi)序列化之后保存到文件系統(tǒng)里面,再次啟動(dòng)的時(shí)候去反序列化回來(lái),從反序列化之后的類(lèi)里面獲取offset信息。這種方案的弊端是,當(dāng)修改streaming程序之后,類(lèi)的反序列化會(huì)失敗,因?yàn)轭?lèi)里面的代碼改變導(dǎo)致反序列化失敗。這種方案解決這個(gè)問(wèn)題的策略是同時(shí)啟動(dòng)兩個(gè)流程,新老流程共存一段時(shí)間,來(lái)保證數(shù)據(jù)的不丟失,但是如果沒(méi)有在后面做去重流程,處理完畢的數(shù)據(jù)會(huì)有很多重復(fù)數(shù)據(jù),需要清洗。

方案二:通過(guò)zookeeper來(lái)保存offset信息,這樣每次啟動(dòng)流程只需要再次去讀取zookeeper上的offset信息就好。

首先寫(xiě)一個(gè)通用的父類(lèi)實(shí)現(xiàn)基本的streaming流程,然后通過(guò)子類(lèi)繼承父類(lèi),然后重寫(xiě)父類(lèi)處理業(yè)務(wù)的方法,實(shí)現(xiàn)個(gè)性化需求(主要是實(shí)現(xiàn)不同的轉(zhuǎn)換和解析)

通用的父類(lèi)如下:

然后寫(xiě)一個(gè)帶main函數(shù)的類(lèi)來(lái)啟動(dòng)steam流程。

具體的需求的時(shí)候只需要個(gè)性化繼承KafkaSparkStreaming,然后重寫(xiě)doTask方法就能實(shí)現(xiàn)自己的個(gè)性化需求了。

Streaming 讀取Kafka 保存OFFSET到kafka的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
昭苏县| 万安县| 女性| 西青区| 宝兴县| 武安市| 肇庆市| 雷山县| 高清| 沙坪坝区| 华池县| 镇赉县| 黑山县| 闽侯县| 余庆县| 北票市| 涞源县| 清远市| 大安市| 馆陶县| 石台县| 巫溪县| 郯城县| 牙克石市| 肃宁县| 勐海县| 玛纳斯县| 河曲县| 黄浦区| 巴马| 武山县| 吴川市| 安化县| 岱山县| 临沂市| 科技| 广东省| 静安区| 永昌县| 岳阳县| 托克托县|