學(xué)習(xí)日志 220104 FLink試用
FLink試用
===============
# 背景/需求
- 數(shù)據(jù)的事務(wù)處理和分析處理對數(shù)據(jù)結(jié)構(gòu)的要求不同
? - 事務(wù)處理關(guān)注一致性 范式
? - 分析處理 比如搜索/報表 往往需要進行多表聚合 統(tǒng)計 以及機器學(xué)習(xí)等
- 需要一種可靠的數(shù)據(jù)處理中間件
? - 大數(shù)據(jù)量
? - 準(zhǔn)實時
? - exactly once 統(tǒng)計一致性
? - 易用
- flink
? - https://flink.apache.org/
- 目標(biāo)
? - 從db讀取數(shù)據(jù)送入elastic search
# 使用
- 采用 k8s + docker
## Getting start
- docker pull flink
- https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
- Flink使用k8s watch監(jiān)控configmap里的leader情況, 一旦leader變更, flink會立該響應(yīng)
- 這個示例提到了復(fù)制my-flink-job.jar的過程
- 所以我們需要先制作這個jar
### 基礎(chǔ)教程 制作my-flink-job.jar
- 我們需要對flink的鏡像進行定制
? - 加入實際的工作內(nèi)容 例如my-flink-job.jar
? - 加入其它插件 例如mysql插件
- 先按first step的形式操作一遍
? - https://nightlies.apache.org/flink/flink-docs-release-1.14//docs/try-flink/local_installation/
- 下載
? - 1.14.2 2.12
? - 發(fā)現(xiàn)沒有bat 不能在windows上跑 跳過
- 編寫一個job
? - https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/datastream_api/
? - 采用stream api
? - https://github.com/apache/flink-training/tree/release-1.14/
? - 使用gradle 設(shè)置GRADLE_USER_HOME 修改gradle下載的jar包本地存儲的位置
? ? - 因為默認(rèn)在C盤 C盤空間不足
? - 報類找不到的錯誤
? ? - 嘗試reload gradle project 不行
? ? - 嘗試 rebuild project 不行
? - 使用idea重新open project
? ? - 觸發(fā)idea重新import gradle dependencies
? ? - 成功解決 找不到類的錯誤
? - 嘗試運行一個job
- 總結(jié)
? - 一個flink job可以是一個java程序(可執(zhí)行jar包)
? - 入口是main方法
? - 一般構(gòu)造一個graph, 再執(zhí)行execute
? - 打包?
? - 排除flink本身的包(這些包由flink運行集群提供)
### table api
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/
- https://github.com/apache/flink-playgrounds
## 創(chuàng)建一個自己的任務(wù)
- 用maven創(chuàng)建一個一般java工程
- `mvn -B archetype:generate "-DgroupId=org.kaien.mycluster" "-DartifactId=flink-db-es" "-DarchetypeArtifactId=maven-archetype-quickstart" "-DarchetypeVersion=1.4"`
- 所有-D參數(shù)全部要用"括起來
- 引入依賴
? - 參考上述table api項目
- 制作docker鏡像
? - 參考 springbootdemo 的docker化方案
? - 采用maven-shade-plugin代替原方案的spring打包插件即可
### 使用mysql cdc connector讀取mysql數(shù)據(jù)
- 參考
? - https://www.sohu.com/a/439533649_411876
? - https://www.bilibili.com/video/BV1zt4y1D7kt/
? - https://github.com/ververica/flink-cdc-connectors
? - 基于 Debezium https://github.com/debezium/debezium
- 引入依賴
? ```
? <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
? <dependency>
? ? ? <groupId>com.alibaba.ververica</groupId>
? ? ? <artifactId>flink-connector-mysql-cdc</artifactId>
? ? ? <version>1.4.0</version>
? </dependency>
? ```
- 源表
? - 參考文檔
? - connector 寫 mysql-cdc
? - 其它參數(shù)依實際mysql數(shù)據(jù)庫連接信息修改
- 目標(biāo)表(sink)
? - 采用print
? - 參考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/print/
- 中間的處理過程
? - 直接用tEnv.executeSql
? - 執(zhí)行 insert into ... select 語句
- 嘗試執(zhí)行
- 問題1 運行時報找不到類 EnvironmentSettings
? - 這個是因為 idea 運行默認(rèn)不引入 provided的 jar
? - 參考 https://stackoverflow.com/questions/65135298/how-to-include-provided-dependencies-with-the-new-application-run-configuration
? - 在 run configuration 中 點擊 modify options
? - 找到 java 段 里面找 add dependencies "provided" ... 選項 勾上即可
- 問題2
? - Could not find a suitable table factory ...
? - TODO