Streaming 讀取Kafka 保存OFFSET到kafka
Streaming 讀取Kafka 實(shí)現(xiàn)斷點(diǎn)續(xù)讀功能
老版本的kafka比較麻煩,streaming提供的只有checkpoint方法實(shí)現(xiàn)斷點(diǎn)續(xù)讀功能,但是當(dāng)修改程序之后就沒(méi)法平滑部署。
因?yàn)閏heckpoint存儲(chǔ)的是整個(gè)streaming啟動(dòng)類(lèi)的序列化文件,當(dāng)文件改動(dòng)之后沒(méi)法反序列化了。所以需要更好的方法來(lái)實(shí)現(xiàn)讀取Kafka 實(shí)現(xiàn)斷點(diǎn)續(xù)讀功能。
本文主要講解的就是通過(guò)zookeeper保存offset信息來(lái)實(shí)現(xiàn)的功能。
這里我們解釋如何配置Spark Streaming已從Kafka接收數(shù)據(jù)。有兩種方法 - 使用Receivers和Kafka的高級(jí)API的舊方法,以及不使用Receiver的新方法(在Spark 1.3中引入)。它們具有不同的編程模型,性能特征和語(yǔ)義保證,因此請(qǐng)繼續(xù)閱讀以獲取更多詳細(xì)信息。從當(dāng)前版本的Spark開(kāi)始,這兩種方法都被認(rèn)為是穩(wěn)定的API。
方法1: Receiver-based Approach
此方法使用Receiver接收數(shù)據(jù)。Receiver是使用Kafka高級(jí)消費(fèi)者API實(shí)現(xiàn)的。與所有接收器一樣,從Kafka通過(guò)Receiver接收的數(shù)據(jù)存儲(chǔ)在Spark執(zhí)行器中,然后由Spark Streaming啟動(dòng)的作業(yè)處理數(shù)據(jù)。
但是,在默認(rèn)配置下,此方法可能會(huì)在失敗時(shí)丟失數(shù)據(jù)(請(qǐng)參閱接收器可靠性。為確保零數(shù)據(jù)丟失,您必須在Spark Streaming中另外啟用預(yù)寫(xiě)日志(在Spark 1.2中引入)。這將同步保存所有收到的Kafka將數(shù)據(jù)轉(zhuǎn)換為分布式文件系統(tǒng)(例如HDFS)上的預(yù)寫(xiě)日志,以便在發(fā)生故障時(shí)可以恢復(fù)所有數(shù)據(jù)。有關(guān)預(yù)寫(xiě)日志的更多詳細(xì)信息,請(qǐng)參閱流式編程指南中的“ 部署”部分。
方法2: Direct Approach (No Receivers)
Spark 1.3中引入了這種新的無(wú)接收器“直接”方法,以確保更強(qiáng)大的端到端保證。這種方法不是使用接收器來(lái)接收數(shù)據(jù),而是定期向Kafka查詢(xún)每個(gè)主題+分區(qū)中的最新偏移量,并相應(yīng)地定義要在每個(gè)批次中處理的偏移量范圍。當(dāng)啟動(dòng)處理數(shù)據(jù)的作業(yè)時(shí),Kafka的簡(jiǎn)單消費(fèi)者API用于從Kafka讀取定義的偏移范圍(類(lèi)似于從文件系統(tǒng)讀取的文件)。請(qǐng)注意,此功能是在Spark 1.3中為Scala和Java API引入的,在Python 1.4中為Python API引入。
與基于接收器的方法(即方法1)相比,該方法具有以下優(yōu)點(diǎn)。
簡(jiǎn)化的并行性:?無(wú)需創(chuàng)建多個(gè)輸入Kafka流并將它們聯(lián)合起來(lái)。使用directStream,Spark Streaming將創(chuàng)建與要使用的Kafka分區(qū)一樣多的RDD分區(qū),這些分區(qū)將并行地從Kafka讀取數(shù)據(jù)。因此,Kafka和RDD分區(qū)之間存在一對(duì)一的映射,這更容易理解和調(diào)整。
效率:?在第一種方法中實(shí)現(xiàn)零數(shù)據(jù)丟失需要將數(shù)據(jù)存儲(chǔ)在預(yù)寫(xiě)日志中,這會(huì)進(jìn)一步復(fù)制數(shù)據(jù)。這實(shí)際上是低效的,因?yàn)閿?shù)據(jù)有效地被復(fù)制兩次 - 一次由Kafka復(fù)制,第二次由Write-Ahead Log復(fù)制。第二種方法消除了問(wèn)題,因?yàn)闆](méi)有接收器,因此不需要預(yù)寫(xiě)日志。只要您有足夠的Kafka保留,就可以從Kafka恢復(fù)消息。
完全一次的語(yǔ)義:?第一種方法使用Kafka的高級(jí)API在Zookeeper中存儲(chǔ)消耗的偏移量。傳統(tǒng)上,這是從Kafka使用數(shù)據(jù)的方式。雖然這種方法(與預(yù)寫(xiě)日志結(jié)合使用)可以確保零數(shù)據(jù)丟失(即至少一次語(yǔ)義),但某些記錄在某些故障下可能會(huì)被消耗兩次。這是因?yàn)镾park Streaming可靠接收的數(shù)據(jù)與Zookeeper跟蹤的偏移之間存在不一致。因此,在第二種方法中,我們使用不使用Zookeeper的簡(jiǎn)單Kafka API。Spark Streaming在其檢查點(diǎn)內(nèi)跟蹤偏移量。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,因此盡管出現(xiàn)故障,Spark Streaming也會(huì)有效地接收每條記錄一次。為了獲得結(jié)果輸出的精確一次語(yǔ)義,主編程指南中輸出操作的語(yǔ)義以獲取更多信息)。
請(qǐng)注意,此方法的一個(gè)缺點(diǎn)是它不會(huì)更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka監(jiān)視工具將不會(huì)顯示進(jìn)度。但是,您可以在每個(gè)批次中訪(fǎng)問(wèn)此方法處理的偏移量,并自行更新Zookeeper。
目前使用較多的是方法二,因?yàn)楸容^方便。但是方法二中存在一個(gè)問(wèn)題,就是需要自己來(lái)維護(hù)offset信息,才能實(shí)現(xiàn)服務(wù)重新部署之后能從之前讀取的位置繼續(xù)讀取kafka的數(shù)據(jù)。
保存kafka的offset信息有兩種方案
方案一:通過(guò)一個(gè)文件路勁來(lái)保存,存儲(chǔ)的是checkpoint的信息。實(shí)際上就是把類(lèi)序列化之后保存到文件系統(tǒng)里面,再次啟動(dòng)的時(shí)候去反序列化回來(lái),從反序列化之后的類(lèi)里面獲取offset信息。這種方案的弊端是,當(dāng)修改streaming程序之后,類(lèi)的反序列化會(huì)失敗,因?yàn)轭?lèi)里面的代碼改變導(dǎo)致反序列化失敗。這種方案解決這個(gè)問(wèn)題的策略是同時(shí)啟動(dòng)兩個(gè)流程,新老流程共存一段時(shí)間,來(lái)保證數(shù)據(jù)的不丟失,但是如果沒(méi)有在后面做去重流程,處理完畢的數(shù)據(jù)會(huì)有很多重復(fù)數(shù)據(jù),需要清洗。
方案二:通過(guò)zookeeper來(lái)保存offset信息,這樣每次啟動(dòng)流程只需要再次去讀取zookeeper上的offset信息就好。
首先寫(xiě)一個(gè)通用的父類(lèi)實(shí)現(xiàn)基本的streaming流程,然后通過(guò)子類(lèi)繼承父類(lèi),然后重寫(xiě)父類(lèi)處理業(yè)務(wù)的方法,實(shí)現(xiàn)個(gè)性化需求(主要是實(shí)現(xiàn)不同的轉(zhuǎn)換和解析)
通用的父類(lèi)如下:
然后寫(xiě)一個(gè)帶main函數(shù)的類(lèi)來(lái)啟動(dòng)steam流程。
具體的需求的時(shí)候只需要個(gè)性化繼承KafkaSparkStreaming,然后重寫(xiě)doTask方法就能實(shí)現(xiàn)自己的個(gè)性化需求了。