最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

學(xué)習(xí)日志 220107 elasticsearch sink

2022-01-10 17:23 作者:mayoiwill  | 我要投稿

elasticsearch sink + 上傳到flink集群

================================

上接220105 (6號(hào)有事未做學(xué)習(xí))


# elasticsearch sink

- 參考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/elasticsearch/


## 加依賴

- 分為兩步

- 1, 加maven依賴 注意使用maven central里的最新版 注意使用provided

- 2, dockerfile里加wget 下載該依賴到系統(tǒng)lib


## 使用table api

- 參考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

- 注意之前看的是stream api的, 依賴是一樣的

- 用port-forward 把9200端口拉出來(lái)

- 走h(yuǎn)ttp還是https協(xié)議?

? - 見(jiàn)下一節(jié)

- 問(wèn)題 1

? - Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath

? - 參考 https://blog.csdn.net/pop_xiaohao/article/details/110525357

? - 引入依賴 flink-json

? - 是否 dockerfile是否 需要copy?

- 問(wèn)題2

? - ssl問(wèn)題 見(jiàn)下 關(guān)閉eck中elasticsearch的tls

- 開(kāi)發(fā)機(jī)測(cè)試

? - 運(yùn)行中

? - 從web端搜索test

? - 可以搜到2條數(shù)據(jù)了

? - 但沒(méi)有第3條, 因?yàn)槲覀冎槐O(jiān)聽(tīng)了一個(gè)庫(kù)

? - 修改代碼 增加SQL, 把第二個(gè)庫(kù)也監(jiān)聽(tīng)起來(lái)

? - 重新搜索test 現(xiàn)在可以搜到所有3條數(shù)據(jù)了(參考之前shardingSphere部分)

- 問(wèn)題3

? - flink metric warning 暫時(shí)不管


### elasticsearch特性

- primary key

? - 在create table時(shí)指令primary key即可

- ssl

? - org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.AuthRestClientFactory#configureRestClientBuilder

? - org.apache.http.impl.nio.client.CloseableHttpAsyncClient

? - 參考 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html

? - https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/

? - https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/elasticsearch/

? - 以上方案都不行 原因是elastic search table api目前沒(méi)有設(shè)置tls的功能

? ? - 或者我沒(méi)找到

? ? - 同時(shí)我又不想用datastream級(jí)別的api 會(huì)有其它問(wèn)題

? - https://www.elastic.co/guide/en/cloud-on-k8s/1.2/k8s-tls-certificates.html#k8s-disable-tls

? - 參考以上文檔關(guān)閉eck的tls k8s描述文件增加如下配置

? ? ```

? ? spec:

? ? ? http:

? ? ? ? tls:

? ? ? ? ? selfSignedCertificate:

? ? ? ? ? ? disabled: true

? ? ```

? - 以上spec是指根的spec kibana同樣

? - 重新布署elasticsearch kibana

? ? - 重新部署不影響用戶密碼和已有數(shù)據(jù)

? - 驗(yàn)證kibana可以工作 注意portforward后 kibana改為http://localhost:5601訪問(wèn)(不是https)

? - java應(yīng)用springbootdemo中elasticsearch的地址改為http協(xié)議

? - 重新打包發(fā)布

? - 驗(yàn)證springbootdemo

- 可擴(kuò)展

? - TODO


# 將flink應(yīng)用發(fā)布到k8s上

- 修改鏈接 按env選擇數(shù)據(jù)庫(kù)連接和elasticsearch連接

? - 參考springbootdemo

? - 這也是用代碼實(shí)現(xiàn)的好處之一, 這些細(xì)節(jié)可以自定義

- 發(fā)布flink集群和任務(wù)

? - 有兩種方式

? - session方式

? ? - https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/?

? - native cluster方式

? ? - https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/

? - 參考第二種方式

- 搭建bash環(huán)境

? - 由于native cluster方式需要執(zhí)行.sh(沒(méi)有bat選項(xiàng))

? - 所以需要在win10開(kāi)發(fā)環(huán)境下搭建bash

? - win10目前已安裝好ubuntu, 但是是基于WSL2的

? - 需要重新安裝JAVA和kubectl

- 安裝JAVA

? ```

? sudo apt update

? sudo apt install openjdk-11-jdk

? sudo nano /etc/environment

? JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"

? ```

? - 測(cè)試 java --version

? - echo $JAVA_HOME

- 安裝kubectl 配置

? - 試了下kubectl命令是有的, 可能之前裝過(guò)

? - 配置 參考 https://v1-19.docs.kubernetes.io/zh/docs/tasks/tools/install-kubectl/

? - 直接復(fù)制windows的配置過(guò)來(lái)

? - `cp -r /mnt/c/Users/Administrator/.kube/ ~/`

? - 執(zhí)行 `kubectl cluster-info`

? - 報(bào)錯(cuò) crt不存在

? - 修改 ~/.kube/config

? - 把 E:\minikube\.minikube\ 改為 /mnt/e/minikube/.minikube

? - 另外路徑里的所有 \ 改為 /

? - 重新 kubectl cluster-info 成功

- 嘗試布署 flink cluster

? - 下載 13.5的flink(和之前的java應(yīng)用一致)

? - `./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster`

? - 結(jié)果?

? - `Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://192.168.2.15:8081`

? - 測(cè)試運(yùn)行

? ? ```

? ? ./bin/flink run \

? ? --target kubernetes-session \

? ? -Dkubernetes.cluster-id=my-first-flink-cluster \

? ? ./examples/streaming/TopSpeedWindowing.jar

? ? ```

? ? - 這里掛了 192.168.2.15的8081端口沒(méi)打開(kāi)

? ? - 看了下svc, 外部端口是 30504

? ? - 先不管了 把deployment刪了

- 以application cluster方式布署

- 問(wèn)題 cpu不夠了

? - 用 kubectl get pods 列出所有pods

? - 發(fā)現(xiàn)新的這個(gè)在pending

? - 通過(guò) kubectl describe 查看pod部署日志

? - 報(bào)如下問(wèn)題

? - `0/1 nodes are available: 1 Insufficient cpu.`

? - 查看上面的資源要求 發(fā)現(xiàn)需要1 CPU + 1600M 內(nèi)存

? - 先撤銷部署 flink的指令用不了

? - 用kubectl get deployment 查出對(duì)應(yīng)的deployment再delete掉

? - 驗(yàn)證所有影響已刪除 pod svc

? - 查文檔 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template

? - 提到 cpu和內(nèi)存設(shè)置無(wú)法在pod-template里設(shè)置 會(huì)被參數(shù)覆蓋

? - 用-D設(shè)置kubernetes.jobmanager.cpu和kubernetes.taskmanager.cpu

- 問(wèn)題 log里說(shuō)jar包不存在

? - 還是先刪了deployment 消除影響

? - 換docker內(nèi)的路徑再試下

- 問(wèn)題 找不到入口類

? - Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.

? - 找到 pom中用于打包的 maven-shade-plugin

? - 里面有注釋 <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->

? - 修改入口類

? - 還要 加 -Dkubernetes.container.image.pull-policy=Always

? - 默認(rèn)是 不存在才拉取 我們要改成 總是拉取

- 問(wèn)題 缺少類

? - Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase

? - 間接依賴沒(méi)wget

? - 還是別用wget了 把自己加的幾個(gè)依賴的provided刪除

? - dockerfile里的wget也不要了

? - 檢查打出來(lái)的包還是沒(méi)有依賴的這些庫(kù) 包大小看著就不對(duì) 才6k

? - 再檢查pom 發(fā)現(xiàn)是放在build pluginManagements下面了

? - 去掉pluginManagements這一級(jí)

- 問(wèn)題 flink需要k8s權(quán)限

? ```

? io.fabric8.kubernetes.client.KubernetesClientException: pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"

? ```

? - 參考 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#rbac

? - `kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default`

? - 對(duì)默認(rèn)的service account進(jìn)行授權(quán)

? - 重新deployment

? - `kubectl rollout restart deployment/my-first-application-cluster`

- 問(wèn)題 flink第二個(gè)pod創(chuàng)建時(shí)pending

? - 查看第二個(gè)pod的describe 發(fā)現(xiàn)內(nèi)存不足

? - 增加如下配置 降低每個(gè)pod的內(nèi)存占用

? ? ```

? ? -Djobmanager.memory.process.size=1g \

? ? -Dtaskmanager.memory.process.size=1g \

? ? ```

? - 默認(rèn)是1600m 改為 1g

? - 注意單位 flink的單位和k8s不一樣

- 問(wèn)題 mysql-cdc 驅(qū)動(dòng)類找不到

? - `Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.`

? - 檢查運(yùn)行中的任務(wù) http://192.168.2.15:32234/#/overview

? - 端口是隨機(jī)的 看kubectl get svc

? - 時(shí)靈時(shí)不靈的 可能后臺(tái)一直在重啟

? - 看日志 env是生效的, 連接hostname用的是集群的

? - 問(wèn)題原因

? - https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

? - 其中SQL Client JAR 一節(jié) 有要求我們下載一個(gè)jar包打進(jìn)docker鏡像

? - `wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar;`

? - 還可以參考 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

? - 有提到帶-sql的是胖包 大家自己看一下

? - 另外 kubectl delete deployment my-first-application-cluster

? - 和 kubectl delete deployment/my-first-application-cluster

? - 可以混用

? - kubectl logs -f 可以一直盯著日志看 直到pods死掉被重啟

? - pod重啟后日志會(huì)中斷 不過(guò)反正重啟后的錯(cuò)誤往往和重啟前是一樣的

- 問(wèn)題 elasticsearch 驅(qū)動(dòng)類找不到

? - 根據(jù)上個(gè)問(wèn)題的解決經(jīng)驗(yàn) 我們知道驅(qū)動(dòng)類必須放在lib下

? - 先把mysql的驅(qū)動(dòng)類從usrlib里拿掉(改為provided)

? - 再找一下elasticsearch驅(qū)動(dòng)類的胖包

? - 翻看 https://repo1.maven.org/maven2/org/apache/flink/

? - 搜elasticsearch 果然有帶-sql的版本 進(jìn)去找到1.13.5(匹配flink版本)

? - https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.5/flink-sql-connector-elasticsearch7_2.12-1.13.5.jar

? - 在dockerfile里加到wget里

? - 重新打包發(fā)布

- 問(wèn)題? java.lang.IllegalStateException: Unable to instantiate java compiler

? - org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile

? - flink-table-blink_2.12-1.13.5.jar:1.13.5

? - java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

? - org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory

? - flink-table-blink_2.12-1.13.5.jar:1.13.5

? - 這個(gè)包改為provided

? - 居然可以。。

- 問(wèn)題 flink native 的master(jobmanager)想要啟動(dòng)多個(gè)worker(taskmanager) 導(dǎo)致內(nèi)存不足了

? - 看能不能降到1個(gè)worker?


學(xué)習(xí)日志 220107 elasticsearch sink的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
霞浦县| 大同县| 旬邑县| 肥西县| 都江堰市| 永仁县| 长岭县| 周口市| 商丘市| 宜兰市| 阿克| 怀柔区| 金寨县| 松溪县| 山阴县| 综艺| 全州县| 蒲城县| 新乐市| 孝感市| 丘北县| 南部县| 万山特区| 曲靖市| 佛冈县| 汾阳市| 简阳市| 宁化县| 儋州市| 太湖县| 怀远县| 中阳县| 文化| 元朗区| 崇明县| 南雄市| 商丘市| 游戏| 张家口市| 衡阳市| 新竹县|