學(xué)習(xí)日志 220107 elasticsearch sink
elasticsearch sink + 上傳到flink集群
================================
上接220105 (6號(hào)有事未做學(xué)習(xí))
# elasticsearch sink
- 參考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/elasticsearch/
## 加依賴
- 分為兩步
- 1, 加maven依賴 注意使用maven central里的最新版 注意使用provided
- 2, dockerfile里加wget 下載該依賴到系統(tǒng)lib
## 使用table api
- 參考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
- 注意之前看的是stream api的, 依賴是一樣的
- 用port-forward 把9200端口拉出來(lái)
- 走h(yuǎn)ttp還是https協(xié)議?
? - 見(jiàn)下一節(jié)
- 問(wèn)題 1
? - Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath
? - 參考 https://blog.csdn.net/pop_xiaohao/article/details/110525357
? - 引入依賴 flink-json
? - 是否 dockerfile是否 需要copy?
- 問(wèn)題2
? - ssl問(wèn)題 見(jiàn)下 關(guān)閉eck中elasticsearch的tls
- 開(kāi)發(fā)機(jī)測(cè)試
? - 運(yùn)行中
? - 從web端搜索test
? - 可以搜到2條數(shù)據(jù)了
? - 但沒(méi)有第3條, 因?yàn)槲覀冎槐O(jiān)聽(tīng)了一個(gè)庫(kù)
? - 修改代碼 增加SQL, 把第二個(gè)庫(kù)也監(jiān)聽(tīng)起來(lái)
? - 重新搜索test 現(xiàn)在可以搜到所有3條數(shù)據(jù)了(參考之前shardingSphere部分)
- 問(wèn)題3
? - flink metric warning 暫時(shí)不管
### elasticsearch特性
- primary key
? - 在create table時(shí)指令primary key即可
- ssl
? - org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.AuthRestClientFactory#configureRestClientBuilder
? - org.apache.http.impl.nio.client.CloseableHttpAsyncClient
? - 參考 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html
? - https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/
? - https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/elasticsearch/
? - 以上方案都不行 原因是elastic search table api目前沒(méi)有設(shè)置tls的功能
? ? - 或者我沒(méi)找到
? ? - 同時(shí)我又不想用datastream級(jí)別的api 會(huì)有其它問(wèn)題
? - https://www.elastic.co/guide/en/cloud-on-k8s/1.2/k8s-tls-certificates.html#k8s-disable-tls
? - 參考以上文檔關(guān)閉eck的tls k8s描述文件增加如下配置
? ? ```
? ? spec:
? ? ? http:
? ? ? ? tls:
? ? ? ? ? selfSignedCertificate:
? ? ? ? ? ? disabled: true
? ? ```
? - 以上spec是指根的spec kibana同樣
? - 重新布署elasticsearch kibana
? ? - 重新部署不影響用戶密碼和已有數(shù)據(jù)
? - 驗(yàn)證kibana可以工作 注意portforward后 kibana改為http://localhost:5601訪問(wèn)(不是https)
? - java應(yīng)用springbootdemo中elasticsearch的地址改為http協(xié)議
? - 重新打包發(fā)布
? - 驗(yàn)證springbootdemo
- 可擴(kuò)展
? - TODO
# 將flink應(yīng)用發(fā)布到k8s上
- 修改鏈接 按env選擇數(shù)據(jù)庫(kù)連接和elasticsearch連接
? - 參考springbootdemo
? - 這也是用代碼實(shí)現(xiàn)的好處之一, 這些細(xì)節(jié)可以自定義
- 發(fā)布flink集群和任務(wù)
? - 有兩種方式
? - session方式
? ? - https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/?
? - native cluster方式
? ? - https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/
? - 參考第二種方式
- 搭建bash環(huán)境
? - 由于native cluster方式需要執(zhí)行.sh(沒(méi)有bat選項(xiàng))
? - 所以需要在win10開(kāi)發(fā)環(huán)境下搭建bash
? - win10目前已安裝好ubuntu, 但是是基于WSL2的
? - 需要重新安裝JAVA和kubectl
- 安裝JAVA
? ```
? sudo apt update
? sudo apt install openjdk-11-jdk
? sudo nano /etc/environment
? JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
? ```
? - 測(cè)試 java --version
? - echo $JAVA_HOME
- 安裝kubectl 配置
? - 試了下kubectl命令是有的, 可能之前裝過(guò)
? - 配置 參考 https://v1-19.docs.kubernetes.io/zh/docs/tasks/tools/install-kubectl/
? - 直接復(fù)制windows的配置過(guò)來(lái)
? - `cp -r /mnt/c/Users/Administrator/.kube/ ~/`
? - 執(zhí)行 `kubectl cluster-info`
? - 報(bào)錯(cuò) crt不存在
? - 修改 ~/.kube/config
? - 把 E:\minikube\.minikube\ 改為 /mnt/e/minikube/.minikube
? - 另外路徑里的所有 \ 改為 /
? - 重新 kubectl cluster-info 成功
- 嘗試布署 flink cluster
? - 下載 13.5的flink(和之前的java應(yīng)用一致)
? - `./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster`
? - 結(jié)果?
? - `Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://192.168.2.15:8081`
? - 測(cè)試運(yùn)行
? ? ```
? ? ./bin/flink run \
? ? --target kubernetes-session \
? ? -Dkubernetes.cluster-id=my-first-flink-cluster \
? ? ./examples/streaming/TopSpeedWindowing.jar
? ? ```
? ? - 這里掛了 192.168.2.15的8081端口沒(méi)打開(kāi)
? ? - 看了下svc, 外部端口是 30504
? ? - 先不管了 把deployment刪了
- 以application cluster方式布署
- 問(wèn)題 cpu不夠了
? - 用 kubectl get pods 列出所有pods
? - 發(fā)現(xiàn)新的這個(gè)在pending
? - 通過(guò) kubectl describe 查看pod部署日志
? - 報(bào)如下問(wèn)題
? - `0/1 nodes are available: 1 Insufficient cpu.`
? - 查看上面的資源要求 發(fā)現(xiàn)需要1 CPU + 1600M 內(nèi)存
? - 先撤銷部署 flink的指令用不了
? - 用kubectl get deployment 查出對(duì)應(yīng)的deployment再delete掉
? - 驗(yàn)證所有影響已刪除 pod svc
? - 查文檔 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template
? - 提到 cpu和內(nèi)存設(shè)置無(wú)法在pod-template里設(shè)置 會(huì)被參數(shù)覆蓋
? - 用-D設(shè)置kubernetes.jobmanager.cpu和kubernetes.taskmanager.cpu
- 問(wèn)題 log里說(shuō)jar包不存在
? - 還是先刪了deployment 消除影響
? - 換docker內(nèi)的路徑再試下
- 問(wèn)題 找不到入口類
? - Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
? - 找到 pom中用于打包的 maven-shade-plugin
? - 里面有注釋 <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
? - 修改入口類
? - 還要 加 -Dkubernetes.container.image.pull-policy=Always
? - 默認(rèn)是 不存在才拉取 我們要改成 總是拉取
- 問(wèn)題 缺少類
? - Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase
? - 間接依賴沒(méi)wget
? - 還是別用wget了 把自己加的幾個(gè)依賴的provided刪除
? - dockerfile里的wget也不要了
? - 檢查打出來(lái)的包還是沒(méi)有依賴的這些庫(kù) 包大小看著就不對(duì) 才6k
? - 再檢查pom 發(fā)現(xiàn)是放在build pluginManagements下面了
? - 去掉pluginManagements這一級(jí)
- 問(wèn)題 flink需要k8s權(quán)限
? ```
? io.fabric8.kubernetes.client.KubernetesClientException: pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
? ```
? - 參考 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#rbac
? - `kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default`
? - 對(duì)默認(rèn)的service account進(jìn)行授權(quán)
? - 重新deployment
? - `kubectl rollout restart deployment/my-first-application-cluster`
- 問(wèn)題 flink第二個(gè)pod創(chuàng)建時(shí)pending
? - 查看第二個(gè)pod的describe 發(fā)現(xiàn)內(nèi)存不足
? - 增加如下配置 降低每個(gè)pod的內(nèi)存占用
? ? ```
? ? -Djobmanager.memory.process.size=1g \
? ? -Dtaskmanager.memory.process.size=1g \
? ? ```
? - 默認(rèn)是1600m 改為 1g
? - 注意單位 flink的單位和k8s不一樣
- 問(wèn)題 mysql-cdc 驅(qū)動(dòng)類找不到
? - `Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.`
? - 檢查運(yùn)行中的任務(wù) http://192.168.2.15:32234/#/overview
? - 端口是隨機(jī)的 看kubectl get svc
? - 時(shí)靈時(shí)不靈的 可能后臺(tái)一直在重啟
? - 看日志 env是生效的, 連接hostname用的是集群的
? - 問(wèn)題原因
? - https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
? - 其中SQL Client JAR 一節(jié) 有要求我們下載一個(gè)jar包打進(jìn)docker鏡像
? - `wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar;`
? - 還可以參考 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
? - 有提到帶-sql的是胖包 大家自己看一下
? - 另外 kubectl delete deployment my-first-application-cluster
? - 和 kubectl delete deployment/my-first-application-cluster
? - 可以混用
? - kubectl logs -f 可以一直盯著日志看 直到pods死掉被重啟
? - pod重啟后日志會(huì)中斷 不過(guò)反正重啟后的錯(cuò)誤往往和重啟前是一樣的
- 問(wèn)題 elasticsearch 驅(qū)動(dòng)類找不到
? - 根據(jù)上個(gè)問(wèn)題的解決經(jīng)驗(yàn) 我們知道驅(qū)動(dòng)類必須放在lib下
? - 先把mysql的驅(qū)動(dòng)類從usrlib里拿掉(改為provided)
? - 再找一下elasticsearch驅(qū)動(dòng)類的胖包
? - 翻看 https://repo1.maven.org/maven2/org/apache/flink/
? - 搜elasticsearch 果然有帶-sql的版本 進(jìn)去找到1.13.5(匹配flink版本)
? - https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.5/flink-sql-connector-elasticsearch7_2.12-1.13.5.jar
? - 在dockerfile里加到wget里
? - 重新打包發(fā)布
- 問(wèn)題? java.lang.IllegalStateException: Unable to instantiate java compiler
? - org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile
? - flink-table-blink_2.12-1.13.5.jar:1.13.5
? - java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
? - org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory
? - flink-table-blink_2.12-1.13.5.jar:1.13.5
? - 這個(gè)包改為provided
? - 居然可以。。
- 問(wèn)題 flink native 的master(jobmanager)想要啟動(dòng)多個(gè)worker(taskmanager) 導(dǎo)致內(nèi)存不足了
? - 看能不能降到1個(gè)worker?