數(shù)棧產(chǎn)品分享:基于StreamWorks構(gòu)建實(shí)時(shí)大數(shù)據(jù)處理平臺(tái)
數(shù)棧是云原生—站式數(shù)據(jù)中臺(tái)PaaS,我們?cè)趃ithub和gitee上有一個(gè)有趣的開源項(xiàng)目:FlinkX,F(xiàn)linkX是一個(gè)基于Flink的批流統(tǒng)一的數(shù)據(jù)同步工具,既可以采集靜態(tài)的數(shù)據(jù),也可以采集實(shí)時(shí)變化的數(shù)據(jù),是全域、異構(gòu)、批流一體的數(shù)據(jù)同步引擎。大家喜歡的話請(qǐng)給我們點(diǎn)個(gè)star!star!star!
github開源項(xiàng)目:https://github.com/DTStack/flinkx
gitee開源項(xiàng)目:https://gitee.com/dtstack_dev_0/flinkx
2020年春節(jié)期間,一場(chǎng)突如其來的疫情在全國蔓延開來,打破了大家原有的工作生活節(jié)奏。疫情期間,大家宅在家就能隨時(shí)看到實(shí)時(shí)的大數(shù)據(jù)疫情地圖,可以隨時(shí)刷到自己當(dāng)前感興趣的抖音視頻,這一切背后依賴的最重要的技術(shù),就是實(shí)時(shí)大數(shù)據(jù)處理技術(shù)。
現(xiàn)在疫情即將過去,國家提出要加快大數(shù)據(jù)中心等新型基礎(chǔ)設(shè)施建設(shè),實(shí)時(shí)大數(shù)據(jù)處理平臺(tái)建設(shè)成為企業(yè)數(shù)智化轉(zhuǎn)型過程中越來越重要的部分。
一、什么是實(shí)時(shí)計(jì)算
在大數(shù)據(jù)處理領(lǐng)域,通常根據(jù)數(shù)據(jù)的不同性質(zhì),將任務(wù)劃分為實(shí)時(shí)計(jì)算與離線計(jì)算,以溫度傳感器的場(chǎng)景舉例:假設(shè)某城市安裝了大量的溫度傳感器,每個(gè)傳感器每隔1min上傳一次采集到的溫度信息,由氣象中心統(tǒng)一匯總,每隔5分鐘更新一次各個(gè)地區(qū)的溫度,這些數(shù)據(jù)是一直源源不斷的產(chǎn)生的,且不會(huì)停止。實(shí)時(shí)計(jì)算就主要用于“數(shù)據(jù)源源不斷的產(chǎn)生,而且不會(huì)停止,需要以最小的延遲獲得計(jì)算結(jié)果”的場(chǎng)景,這種最小的延遲通常為秒級(jí)或分鐘級(jí)。
為了滿足這種數(shù)據(jù)量很大,而且實(shí)時(shí)性要求又非常高的場(chǎng)景,通常會(huì)采用實(shí)時(shí)計(jì)算技術(shù),實(shí)時(shí)計(jì)算的“數(shù)據(jù)源源不斷”的特定決定了其數(shù)據(jù)處理方式與離線是截然不同的。

Figure 1 實(shí)時(shí)計(jì)算和離線計(jì)算的區(qū)別
離線計(jì)算的批量、高延時(shí)、主動(dòng)發(fā)起的計(jì)算特點(diǎn)不同,實(shí)時(shí)計(jì)算是一種持續(xù)、低延時(shí)、事件觸發(fā)的計(jì)算任務(wù)。離線計(jì)算需要先裝載數(shù)據(jù),然后提交離線任務(wù),最后任務(wù)計(jì)算返回結(jié)果;實(shí)時(shí)計(jì)算首先要提交流式任務(wù),然后等實(shí)時(shí)流數(shù)據(jù)接入,然后計(jì)算出實(shí)時(shí)結(jié)果流。

Figure 2 實(shí)時(shí)計(jì)算和離線計(jì)算的區(qū)別(形象圖)
形象點(diǎn)可以理解為離線計(jì)算是開著船去湖里(數(shù)據(jù)庫)打漁,實(shí)時(shí)計(jì)算為在河流(數(shù)據(jù)流)上建立大壩發(fā)電。進(jìn)一步發(fā)散,湖泊的形成依賴河流,河流確定上下邊界就是湖泊;其實(shí),離線計(jì)算可以理解為實(shí)時(shí)計(jì)算的一種特例。
二、實(shí)時(shí)計(jì)算能解決的問題

Figure 3 實(shí)時(shí)計(jì)算能解決的問題
從技術(shù)領(lǐng)域來看,實(shí)時(shí)計(jì)算主要用于以下場(chǎng)景:
基于Data Pipline的實(shí)時(shí)數(shù)據(jù)ETL:目的是實(shí)時(shí)地把數(shù)據(jù)從A點(diǎn)傳輸?shù)紹點(diǎn)。在傳輸?shù)倪^程中可能添加數(shù)據(jù)清洗和集成的工作,例如實(shí)時(shí)構(gòu)建搜索系統(tǒng)的索引、實(shí)時(shí)數(shù)倉中的ETL過程等。
基于Data Analysis的實(shí)時(shí)數(shù)據(jù)分析:根據(jù)業(yè)務(wù)目標(biāo),從原始數(shù)據(jù)中抽取對(duì)應(yīng)信息并整合的過程。例如,查看每天銷售額排行前10的商品、倉庫平均周轉(zhuǎn)時(shí)間、網(wǎng)頁平均點(diǎn)擊率、實(shí)時(shí)推送打開率等。實(shí)時(shí)數(shù)據(jù)分析則是上述過程的實(shí)時(shí)化,通常在終端體現(xiàn)為實(shí)時(shí)報(bào)表或?qū)崟r(shí)大屏。
基于Data Driven的事件驅(qū)動(dòng)應(yīng)用:對(duì)一系列訂閱事件進(jìn)行處理或作出響應(yīng)的系統(tǒng)。事件驅(qū)動(dòng)應(yīng)用通常需要依賴內(nèi)部狀態(tài),例如點(diǎn)擊欺詐檢測(cè)、風(fēng)控系統(tǒng)、運(yùn)維異常檢測(cè)系統(tǒng)等。當(dāng)用戶的行為觸發(fā)某些風(fēng)險(xiǎn)控制點(diǎn)時(shí),系統(tǒng)會(huì)捕獲這個(gè)事件,并根據(jù)用戶當(dāng)前和之前的行為進(jìn)行分析,決定是否對(duì)用戶進(jìn)行風(fēng)險(xiǎn)控制。
三、實(shí)時(shí)開發(fā)的全鏈路流程

Figure 4 實(shí)時(shí)開發(fā)的全鏈路流程
實(shí)時(shí)采集——使用流式數(shù)據(jù)采集工具將數(shù)據(jù)流式且實(shí)時(shí)地采集并傳輸?shù)酱髷?shù)據(jù)消息存儲(chǔ)(kafka等),流式數(shù)據(jù)存儲(chǔ)作為實(shí)時(shí)計(jì)算的上游,提供源源不斷的數(shù)據(jù)流去觸發(fā)流式計(jì)算作業(yè)的運(yùn)行。流數(shù)據(jù)作為實(shí)時(shí)計(jì)算的觸發(fā)源驅(qū)動(dòng)實(shí)時(shí)計(jì)算運(yùn)行。因此,一個(gè)實(shí)時(shí)計(jì)算作業(yè)必須至少使用一個(gè)流數(shù)據(jù)作為源。每一條進(jìn)入的流數(shù)據(jù)將直接觸發(fā)實(shí)時(shí)計(jì)算的一次流式計(jì)算處理。數(shù)據(jù)在實(shí)時(shí)計(jì)算系統(tǒng)中處理分析后隨機(jī)寫到下游數(shù)據(jù)存儲(chǔ),下游數(shù)據(jù)庫一般與業(yè)務(wù)相關(guān),可以用來做實(shí)時(shí)報(bào)表、實(shí)時(shí)大屏等數(shù)據(jù)消費(fèi)。
四、實(shí)時(shí)采集---全鏈路實(shí)時(shí)開發(fā)平臺(tái)的關(guān)鍵
整個(gè)全鏈路的實(shí)時(shí)開發(fā)中,實(shí)時(shí)采集是實(shí)時(shí)計(jì)算的上游。對(duì)于很對(duì)企業(yè)而言,本身已經(jīng)有數(shù)據(jù)存儲(chǔ)系統(tǒng),但是很大一部分都是離線的關(guān)系型數(shù)據(jù)庫。如何將這些離線的關(guān)系型數(shù)據(jù)庫的實(shí)時(shí)增量數(shù)據(jù),提供給實(shí)時(shí)計(jì)算去分析,是一個(gè)亟需解決的環(huán)節(jié)。如下圖所示,是袋鼠云實(shí)時(shí)數(shù)據(jù)采集工具的功能架構(gòu)。

Figure 5 實(shí)時(shí)數(shù)據(jù)采集工具FlinkX數(shù)據(jù)流程
袋鼠云實(shí)時(shí)數(shù)據(jù)采集作為StreamWorks平臺(tái)的一個(gè)模塊,有以下功能特點(diǎn)。
FlinkX支持批量數(shù)據(jù)抽取,同時(shí)支持實(shí)時(shí)捕捉MySQL、Oracle、SQLServer等變化數(shù)據(jù),實(shí)現(xiàn)批流統(tǒng)一采集。
底層基于Flink分布式架構(gòu),支持大容量、高并發(fā)同步,相比單點(diǎn)同步性能更好,穩(wěn)定性更高。
支持直接讀取數(shù)據(jù)庫Binlog的方式實(shí)時(shí)同步,也支持間隔輪詢方式實(shí)時(shí)同步。
支持?jǐn)帱c(diǎn)續(xù)傳和臟數(shù)據(jù)記錄,實(shí)時(shí)數(shù)據(jù)采集的metric曲線展示。
五、StreamWorks實(shí)時(shí)開發(fā)平臺(tái)介紹
袋鼠云實(shí)時(shí)開發(fā)平臺(tái)(StreamWorks)基于 Apache Flink 構(gòu)建的云原生一站式大數(shù)據(jù)流式計(jì)算平臺(tái),涵蓋從實(shí)時(shí)數(shù)據(jù)采集到實(shí)時(shí)數(shù)據(jù)ETL的全鏈路流程。亞秒級(jí)別的處理延時(shí), Datastream API 作業(yè)開發(fā),與已有大數(shù)據(jù)組件兼容,幫助企業(yè)實(shí)時(shí)數(shù)據(jù)智能化轉(zhuǎn)型,助力新型基礎(chǔ)設(shè)施建設(shè)。
在以往的數(shù)據(jù)開發(fā)技術(shù)棧中,SQL語言能解決大部分業(yè)務(wù)場(chǎng)景的問題。StreamWorks的核心功能是主打以SQL語義的流式數(shù)據(jù)分析能力(FlinkStreamSQL),降低開發(fā)門檻。提供Exactly-Once的處理語義保證,保證業(yè)務(wù)精確一致。

Figure 6 StreamWorks功能架構(gòu)
如上圖所示,StreamWorks包含如下幾個(gè)模塊:
實(shí)時(shí)采集:支持MySQL、SQLServer、Oracle、PolarDB、Kafka、EMQ等數(shù)據(jù)源實(shí)時(shí)數(shù)據(jù)采集,通過速率和并發(fā)數(shù)控制可幫助用戶更準(zhǔn)確的控制采集過程。
數(shù)據(jù)開發(fā):支持FlinkSQL、Flink任務(wù)類型,F(xiàn)linkSQL作業(yè)提供可視化存儲(chǔ)配置、作業(yè)開發(fā)、語法檢查等功能;Flink任務(wù)支持上傳jar包的方式運(yùn)行實(shí)時(shí)開發(fā)作業(yè)。
任務(wù)運(yùn)維:任務(wù)運(yùn)行情況監(jiān)控,數(shù)據(jù)曲線、運(yùn)行日志、數(shù)據(jù)延時(shí)、CkeckPoint、Failover、屬性參數(shù)、告警配置等功能。
項(xiàng)目管理: 用戶管理、角色管理、項(xiàng)目整體配置、項(xiàng)目成員管理等。
六、StreamWorks實(shí)時(shí)大數(shù)據(jù)開發(fā)平臺(tái)的優(yōu)勢(shì)

Figure 7 StreamWorks平臺(tái)層級(jí)
如上圖所示,StreamWorks實(shí)時(shí)大數(shù)據(jù)開發(fā)平臺(tái)基于Apache Flink計(jì)算引擎,做了一層SQL化的封裝,最上層有一個(gè)在線開發(fā)的IDE平臺(tái)。平臺(tái)有以下幾個(gè)優(yōu)勢(shì)點(diǎn):
簡(jiǎn)單易用: 提供在線IDE,定制化適配FlinkSQL的開發(fā)工具!
可視化DDL:提供可視化建表工具,配置參數(shù)即可完成DDL!
內(nèi)置函數(shù):提供豐富的FlinkSQL內(nèi)置函數(shù),簡(jiǎn)化開發(fā)工作!
高效運(yùn)維: 提供多達(dá)幾十個(gè)運(yùn)行指標(biāo),解決開源運(yùn)維難題!
實(shí)時(shí)采集:提供實(shí)時(shí)采集工具,支撐全鏈路實(shí)時(shí)開發(fā)平臺(tái)!
FlinkX:自研的批流一體的數(shù)據(jù)采集工具,已經(jīng)開源!

Figure 8 傳統(tǒng)開發(fā)模式 VS StreamWorks開發(fā)模式
七、十四行代碼搞定實(shí)時(shí)業(yè)務(wù)開發(fā)
講了這么多,我們的產(chǎn)品到底如何方便大家進(jìn)行實(shí)時(shí)業(yè)務(wù)邏輯開發(fā)的,我們還是拿最常見的網(wǎng)站流量分析的例子說明下。比如,某網(wǎng)站需要對(duì)訪問來源進(jìn)行分析:
如下圖所示,從日志服務(wù)讀取該站點(diǎn)訪問日志,解析日志中的來源并檢查來源是否在感興趣的網(wǎng)站列表中(類似來源網(wǎng)站的白名單,保存在MySQL中),統(tǒng)計(jì)來自各個(gè)網(wǎng)站的流量PV,最終結(jié)果寫出到MySQL。

Figure 9 業(yè)務(wù)邏輯流程圖
用StreamSQL代碼實(shí)現(xiàn)的話非常簡(jiǎn)單,只需要14行偽代碼即可搞定。
CREATE TABLE ? ?
log_source(dt STRING, …) ?
WITH (type=kafka);
CREATE TABLE ? ?
mysql_dim(url STRING, …, PRIMARY KEY(url))
WITH (type=mysql); ?
CREATE TABLE ? ?
mysql_result(url STRING, …, PRIMARY KEY(url))
WITH (type=mysql); ?
INSERT INTO mysql_result
SELECT ? ?
l.url, count(*) as pv …
FROM ?log_source l JOIN mysql_dim ?d ON l.url = d.url
group by l.url
八、基于StreamWorks構(gòu)建實(shí)時(shí)推薦系統(tǒng)
一般的推薦系統(tǒng)都是基于標(biāo)簽來實(shí)現(xiàn)的,基于標(biāo)簽的推薦其實(shí)應(yīng)用很普遍,比如頭條,比如抖音,都用到了大量的標(biāo)簽,這樣的推薦系統(tǒng)有很多優(yōu)點(diǎn),比如實(shí)現(xiàn)簡(jiǎn)單、可解釋性好等。如何通過標(biāo)簽來實(shí)現(xiàn)實(shí)時(shí)商品或者內(nèi)容的推薦呢?
首先一個(gè)新的用戶在注冊(cè)app賬號(hào)的時(shí)候會(huì)填寫一些比較固定的數(shù)據(jù),比如年齡、職業(yè)等信息,這些信息可以通過離線計(jì)算分析出長(zhǎng)期興趣標(biāo)簽的結(jié)果,存儲(chǔ)到長(zhǎng)期興趣標(biāo)簽庫。用戶在最近感興趣的內(nèi)容(比如最近10分鐘內(nèi)關(guān)注的信息點(diǎn))可以通過實(shí)時(shí)計(jì)算分析出短期的興趣標(biāo)簽結(jié)果,然后再通過實(shí)時(shí)開發(fā)的數(shù)據(jù)流關(guān)聯(lián)維表的功能,把短期的感興趣標(biāo)簽和長(zhǎng)期興趣標(biāo)簽庫做關(guān)聯(lián),最終生成新的推薦內(nèi)容給到客戶端,形成一個(gè)用戶數(shù)據(jù)流的閉環(huán),從而實(shí)現(xiàn)一個(gè)簡(jiǎn)單的實(shí)時(shí)推薦系統(tǒng)。具體流程如下圖所示。

Figure 10 基于StreamWorks構(gòu)建實(shí)時(shí)推薦系統(tǒng)
九、結(jié)語——把未來變成現(xiàn)在
疫情即將過去,生活還要繼續(xù)。隨著“新基建”建設(shè)不斷深化下去,越來越多的實(shí)時(shí)化場(chǎng)景會(huì)出現(xiàn)在我們生活中。袋鼠云作為新基建解決方案供應(yīng)商,我們的口號(hào)就是把未來變成現(xiàn)在,在未來會(huì)賦能更多的企業(yè)實(shí)時(shí)化轉(zhuǎn)型。