開源交流丨批流一體數(shù)據(jù)集成框架 ChunJun 數(shù)據(jù)傳輸模塊詳解分享
課件獲?。宏P(guān)注公眾號 “ChunJun”,后臺私信 “課件” 獲得直播課件
視頻回放:點擊這里
ChunJun 開源項目地址:github?丨?gitee?喜歡我們的項目給我們點個__ STAR!STAR!!STAR?。。。ㄖ匾氖虑檎f三遍)__
技術(shù)交流釘釘 qun:30537511
本期我們帶大家回顧一下六六同學(xué)的直播分享《ChunJun 數(shù)據(jù)傳輸模塊介紹》。
一、ChunJun 數(shù)據(jù)類型轉(zhuǎn)換
1、類型轉(zhuǎn)換解決的問題
大家一聽到「ChunJun 數(shù)據(jù)類型轉(zhuǎn)換」這個概念,可能會聯(lián)想到上下游之間進(jìn)行數(shù)據(jù)交互時會涉及到的隱式轉(zhuǎn)換。如果上游和下游數(shù)據(jù)類型一致,則不需要對數(shù)據(jù)進(jìn)行任何干預(yù),直接進(jìn)行下發(fā)即可。
但是大多數(shù)情況下會涉及到兩個問題,一是上游的數(shù)據(jù)源類型和下游的數(shù)據(jù)源類型不一致。比如 MySql 的 varchar 類型要寫到 HdfsOrc 文件里的 string 類型的話,在上游的表示是 varchar,在下游的表示是 string,但實際上中間段 java 的類型都是 string。
另外一種情況則是,上下游之間不止數(shù)據(jù)源類型不一樣,數(shù)據(jù)類型也不一樣,除了要做類型的映射之外,還需要對數(shù)據(jù)本身進(jìn)行改動。比如,MySql 的 date 類型要寫到下游 timestamp 類型,我們需要進(jìn)行的操作是把 date 中的毫秒級的時間戳拿出來,轉(zhuǎn)換成 timestamp 的類型,再往下游去寫。
這樣就引出了一個問題,如何建立所有數(shù)據(jù)源類型之間的映射 / 轉(zhuǎn)換關(guān)系?下面將為大家解答這個問題。

2、類型映射概覽
?client 端:在 Factory 類中通過 RawConverter 類建立映射關(guān)系
?source 端:將數(shù)據(jù)封裝成 AbstractBaseColumn
?sink 端:通過 AbstractBaseColumn 中的轉(zhuǎn)換方法將數(shù)據(jù)轉(zhuǎn)換成對應(yīng)類型

ChunJun 目前支持的數(shù)據(jù)類型映射關(guān)系圖如下:

3、類型映射詳解
以 Timestamp 為例,如果要寫入到 Long 類型的話,根據(jù)上文展示的 ChunJun 數(shù)據(jù)類型映射關(guān)系圖,最終映射到 TimestampColumn 中,具體流程如下圖:

上面這個例子描述的是一個單獨的字段,正常情況下,會處理多個字段,這時的類型映射詳解情況如下圖:

?as 方法就是數(shù)據(jù)類型轉(zhuǎn)換的方法。使用這個機(jī)制之后,在下游可以只關(guān)心需要的數(shù)據(jù)類型,增加開發(fā)效率。
二、ChunJun 數(shù)據(jù)傳輸過程
了解完 ChunJun 數(shù)據(jù)類型轉(zhuǎn)換后,我們來為大家分享 ChunJun 的數(shù)據(jù)傳輸過程。
1、上下游數(shù)據(jù)傳輸方式
在 ChunJun 中進(jìn)行同步作業(yè),有兩種情況,一是算子鏈打開的情況,上游的 Source 和下游的 Sink 會被合并成一個 task,有同一個線程去做調(diào)度;二是把算子鏈進(jìn)行關(guān)閉,Source 和 Sink 各自形成一個 task,也有各自的線程去進(jìn)行調(diào)度。
在算子鏈打開的情況下,上下游數(shù)據(jù)傳輸方式可分為兩種,對象重用和拷貝。
● 對象重用
?上下游數(shù)據(jù)傳輸使用方法調(diào)用的形式,將上游產(chǎn)生的數(shù)據(jù)的對象引用直接交給下游
?上下游算子需要形成算子鏈,作業(yè)開啟對象重用
· env.getConfig().enableObjectReuse();
● 拷貝
?上游傳輸給下游的數(shù)據(jù),需要經(jīng)過一次深拷貝
?上下游算子需要形成算子鏈
算子鏈的好處是可以減少序列化的操作,那么為什么我們還要引入序列化呢?因為 ChunJun 的特殊性。ChunJun 同步作業(yè)的話,只有上下游兩個算子,且都對接了正式的數(shù)據(jù)源,讀寫的時候會導(dǎo)致線程堵塞。因此上限由網(wǎng)絡(luò) io 決定,如果斷開算子鏈,cpu 會在一端線程阻塞的時候切換到另外一端。在序列化的性能較高時,線程上下文切換帶來的性能下降完全可以被彌補(bǔ)。
經(jīng)過測試,序列化的性能比對象重用和拷貝高 30% 左右。
● 序列化
?上下游數(shù)據(jù)傳輸依賴于網(wǎng)絡(luò)傳輸。上游數(shù)據(jù)進(jìn)行序列化成 byte 數(shù)組后進(jìn)行網(wǎng)絡(luò)傳輸,下游收到數(shù)據(jù)后需要進(jìn)行反序列化
?上下游之間不形成算子鏈

知道要做序列化后,會產(chǎn)生一些思考,帶著這些疑問,接著往下看。
?序列化和反序列化在什么時候發(fā)生?
?Flink 支持哪些序列化?
?序列化是怎么做的?
?怎么找到適合的序列化方式?
?如何實現(xiàn)自定義的序列化?
2、序列化傳輸過程
下圖是 ChunJun 在進(jìn)行序列化操作時的數(shù)據(jù)傳輸鏈路圖:

3、DataOutView

4、TypeInformation 介紹

5、kryo 序列化 & BaseSerializer
同樣是序列化一個 int 對象,對 kryo 來說,首先需要知道它的類型,然后從高位到低位依次去寫入。
DataOutputView 則是直接調(diào)用一個 writeInt 的方法,寫一句關(guān)鍵代碼即可:
UNSAFE.putInt(
this.buffer,
BASE_OFFSET + this.position, v);

三、ChunJun 序列化實現(xiàn)
1、ColumnRowData 序列化過程
ColumnRowData 序列化過程采取標(biāo)志位 + 實際數(shù)據(jù)的方式,具體流程如下圖:

相對于 kryo 的序列化來說:
?實現(xiàn)了更密集的存儲
?兼容 null 值
?減少了不必要的數(shù)據(jù)傳輸
2、BinaryRowData 結(jié)構(gòu)

?因為數(shù)據(jù)區(qū)一格只占 8 個字節(jié),且每個 index 只能占到一位,所以肯定存在一些沒法存儲在 8 字節(jié)范圍之內(nèi)的數(shù)據(jù),可變長度部分就是用來存放數(shù)據(jù)區(qū)無法存放的數(shù)據(jù)。
3、BinaryRowData-setNull 操作
看到上文的 null 值判斷區(qū),有些同學(xué)可能會好奇這是什么,又是怎么進(jìn)行操作的。下圖將對一個下標(biāo)為 11 的數(shù)據(jù)去做 setnull 操作,進(jìn)行簡單介紹:

4、BinaryRowData 數(shù)據(jù)存儲方式

袋鼠云開源框架釘釘技術(shù)交流群(30537511),歡迎對大數(shù)據(jù)開源項目有興趣的同學(xué)加入交流最新技術(shù)信息,開源項目庫地址:https://github.com/DTStack