基于袋鼠云實(shí)時(shí)開發(fā)平臺(tái)開發(fā) FlinkSQL 任務(wù)的實(shí)踐探索
隨著業(yè)務(wù)的發(fā)展,實(shí)時(shí)場景在各個(gè)?業(yè)中變得越來越重要。?論是?融、電商還是物流,實(shí)時(shí)數(shù)據(jù)處理都成為了其中的關(guān)鍵環(huán)節(jié)。Flink 憑借其強(qiáng)?的流處理特性、窗?操作以及對各種數(shù)據(jù)源的?持,成為實(shí)時(shí)場景下的?選開發(fā)?具。
FlinkSQL 通過 SQL 語??向數(shù)據(jù)開發(fā)提供了更友好的交互?式,但是其開發(fā)?式和離線開發(fā) SparkSQL 仍然存在較?的差異。袋鼠云實(shí)時(shí)開發(fā)平臺(tái)StreamWorks,?直致?于降低 FlinkSQL 的開發(fā)門檻,讓更多的數(shù)據(jù)開發(fā)掌握實(shí)時(shí)開發(fā)能?,普及實(shí)時(shí)計(jì)算的應(yīng)?。
本文將為大家簡單介紹在袋鼠云實(shí)時(shí)開發(fā)平臺(tái)開發(fā) FlinkSQL 任務(wù)的四種?式。
腳本模式
該模式是最基礎(chǔ)的開發(fā)?式,數(shù)據(jù)開發(fā)人員在平臺(tái) IDE 中通過 FlinkSQL 代碼,完成 Flink 表定義和業(yè)務(wù)邏輯加?。代碼如下:
-- 定義數(shù)據(jù)源表
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector ' = 'faker ',
'fields .client_ip .expression ' = '#{Internet .publicIpV4Address} ',
'fields .client_identity .expression ' = ?'- ',
'fields .userid .expression ' = ?'- ',
'fields .user_agent .expression ' = '#{Internet .userAgentAny} ',
'fields .log_time .expression ' = ?'#{date .past ' '15 ' ', ' '5 ' ', ' 'SECONDS ' '} ',
'fields .request_line .expression ' = '#{regexify ' '(GET |POST |PUT |PATCH){1} ' '} #{regexify ' '(/search\ .html|/login\ .html|/prod\ .html|c
'fields .status_code .expression ' = '#{regexify ' '(200 |201 |204 |400 |401 |403 |301){1} ' '} ',
'fields .size .expression ' = '#{number .numberBetween ' '100 ' ', ' '10000000 ' '} '
);
-- 定義結(jié)果表, ?實(shí)際應(yīng)用中會(huì)選擇 ?Kafka、JDBC 等作為結(jié)果表
CREATE TABLE client_errors (
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector ' = 'stream-x '
);
-- 寫入數(shù)據(jù)到結(jié)果表
INSERT INTO client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM server_logs
WHERE status_code SIMILAR TO '4[0-9][0-9] ';
腳本模式的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):靈活性?。
缺點(diǎn):Flink表定義邏輯復(fù)雜,如果不熟悉數(shù)據(jù)源插件,很難記住需要維護(hù)哪些參數(shù);如果該任務(wù)涉及多張表,代碼塊中存在?段表定義代碼,不?便排查業(yè)務(wù)邏輯。
向?qū)J?/h1>
基于腳本模式存在的缺點(diǎn),袋鼠云實(shí)時(shí)開發(fā)平臺(tái)將 Flink 表定義邏輯抽象成了可視化配置的功能,引導(dǎo)數(shù)據(jù)開發(fā)?員通過??配置化的?式完成 Flink 表定義,讓數(shù)據(jù)開發(fā)更專注在業(yè)務(wù)邏輯的加?。
向?qū)J绞窃陂_發(fā)??的配置項(xiàng)中根據(jù)??引導(dǎo),完成 Flink 表的源表、維表、結(jié)果表的映射,然后在 IDE 中直接引?,讀寫對應(yīng)的 Flink 表,完成邏輯開發(fā)。
· 平臺(tái)默認(rèn)提供各類數(shù)據(jù)源的源表、維表、結(jié)果表常?配置項(xiàng);
· 對于各種?級參數(shù),平臺(tái)也提供了維護(hù)?定義參數(shù)的 key/value ?式來滿?靈活性要求。
Catalog 模式
在向?qū)J街?,我們可以借助配置化?式快速完成表映射,但同時(shí)也存在?個(gè)問題,這些映射表只能在當(dāng)前任務(wù)中被引?,?法在不同的任務(wù)中復(fù)?。
但是在真實(shí)的實(shí)時(shí)數(shù)倉建設(shè)過程中,我們常會(huì)遇到下?這種場景:某?個(gè) dws 層級的 kafka topic,會(huì)在多個(gè) ads 任務(wù)中被作為源表使?。?在每個(gè) ads 任務(wù)開發(fā)過程中,都需要為同?個(gè) dws topic 做?次相同的 Flink 映射。
為了解決這種重復(fù)映射的開發(fā)?作,我們可以借助 Flink Catalog 功能,將映射表的元數(shù)據(jù)信息進(jìn)?持久化存儲(chǔ),這樣就可以在不同的任務(wù)中重復(fù)引?。具體使??法如下(以平臺(tái)的 DT Catalog 為例):
Catalog ?錄維護(hù)
· 先在 DT Catalog 下創(chuàng)建?個(gè)名為 stream_warehouse 的 catalog
· 然后在該 catalog 下根據(jù)數(shù)倉層級或者業(yè)務(wù)域創(chuàng)建不同的 database

Flink 映射表創(chuàng)建
· ?式?:在?錄中 hover database,根據(jù)引導(dǎo)通過配置化?式完成 Flink 表映射

· ?式?:在 IDE 中,通過 Create DDL 完成創(chuàng)建,注意要指定對應(yīng)的 catalog.database 路徑
CREATE TABLE stream_warehouse .dws .orders (
order_uid ?BIGINT,
product_id BIGINT,
price ? ? ?DECIMAL(32, 2),
order_time TIMESTAMP(3)
) WITH (
'connector ' = 'datagen '
);
FlinkSQL 任務(wù)開發(fā)
完成上面兩個(gè)步驟,?張?jiān)獢?shù)據(jù)持久化存儲(chǔ)的 Flink 映射表就創(chuàng)建好了。我們在開發(fā)任務(wù)的時(shí)候,就可以直接通過 catalog.database.table 的?式,引?我們需要的表。
INSERT INTO stream_warehouse .ads_db .client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM stream_warehouse .dws_db .server_logs
Demo 模式
學(xué)會(huì)了上?三種開發(fā)?式后,如果你還對 FlinkSQL 的開發(fā)邏輯?較陌?,那么建議你可以通過袋鼠云實(shí)時(shí)開發(fā)平臺(tái)的代碼模版中?去完成?個(gè)完整的任務(wù)開發(fā)。
在模版中?,我們提供了??余種常?的業(yè)務(wù)場景及其對應(yīng)的 FlinkSQL 代碼邏輯,如各類窗?的寫法、各類 Join 的寫法等等,你可以根據(jù)真實(shí)的業(yè)務(wù)場景去套?模版,快速地完成任務(wù)開發(fā)。


總結(jié)
每種開發(fā)模式?jīng)]有絕對的好壞之分,需要根據(jù)不同企業(yè)的實(shí)時(shí)計(jì)算場景和階段,采?不同的開發(fā)模式,才能真正達(dá)到降本增效的目的。
· 當(dāng)企業(yè)剛接觸實(shí)時(shí)計(jì)算,數(shù)據(jù)開發(fā)?員對 FlinkSQL 熟悉度較低時(shí),DEMO 模式是最好的選擇;
· 當(dāng)企業(yè)已經(jīng)上?實(shí)時(shí)計(jì)算,但是任務(wù)量還不?時(shí),腳本模式或者向?qū)J绞遣诲e(cuò)的選擇;
· 當(dāng)企業(yè)實(shí)時(shí)計(jì)算達(dá)到?定規(guī)模,需要進(jìn)?類似離線數(shù)倉的管理?式時(shí),Catalog 模式是最優(yōu)的選擇。
《數(shù)據(jù)治理行業(yè)實(shí)踐白皮書》下載地址:https://fs80.cn/l134d5?
《數(shù)棧V6.0產(chǎn)品白皮書》下載地址:https://fs80.cn/cw0iw1
想了解或咨詢更多有關(guān)袋鼠云大數(shù)據(jù)產(chǎn)品、行業(yè)解決方案、客戶案例的朋友,瀏覽袋鼠云官網(wǎng):https://www.dtstack.com/?src=szbzhan
同時(shí),歡迎對大數(shù)據(jù)開源項(xiàng)目有興趣的同學(xué)加入「袋鼠云開源框架釘釘技術(shù) qun」,交流最新開源技術(shù)信息,qun 號碼:30537511,項(xiàng)目地址:https://github.com/DTStack