Flink在快手實(shí)時多維分析場景的應(yīng)用

Flink 在快手應(yīng)用場景及規(guī)模
快手實(shí)時多維分析平臺
SlimBase-更省 IO、嵌入式共享 state 存儲
首先看 Flink 在快手的應(yīng)用場景和規(guī)模。
1. 快手應(yīng)用場景
快手計(jì)算鏈路是從 DB/Binlog 以及 WebService Log 實(shí)時入到 Kafka 中,然后接入 Flink 做實(shí)時計(jì)算,其中包括實(shí)時數(shù)倉、實(shí)時分析以及實(shí)時訓(xùn)練,最后的結(jié)果存到 Druid、Kudu、HBase 或者 ClickHouse 里面;同時 Kafka 數(shù)據(jù)實(shí)時 Dump 一份到 Hadoop 集群,然后通過 Hive、MapReduce 或者 Spark 來做離線計(jì)算;最終實(shí)時計(jì)算和離線計(jì)算的結(jié)果數(shù)據(jù)會用內(nèi)部自研 BI 工具 KwaiBI 來展現(xiàn)出來。
Flink 在快手典型的應(yīng)用場景主要分為三大類:
80% 統(tǒng)計(jì)監(jiān)控:實(shí)時統(tǒng)計(jì),包括各項(xiàng)數(shù)據(jù)的指標(biāo),監(jiān)控項(xiàng)報警,用于輔助業(yè)務(wù)進(jìn)行實(shí)時分析和監(jiān)控;
15% 數(shù)據(jù)處理:對數(shù)據(jù)的清洗、拆分、Join 等邏輯處理,例如大 Topic 的數(shù)據(jù)拆分、清洗;
5% 數(shù)據(jù)處理:實(shí)時業(yè)務(wù)處理,針對特定業(yè)務(wù)邏輯的實(shí)時處理,例如實(shí)時調(diào)度。
Flink 在快手應(yīng)用的典型場景案例包括:
快手是分享短視頻跟直播的平臺,快手短視頻、直播的質(zhì)量監(jiān)控是通過 Flink 進(jìn)行實(shí)時統(tǒng)計(jì),比如直播觀眾端、主播端的播放量、卡頓率、開播失敗率等跟直播質(zhì)量相關(guān)的多種監(jiān)控指標(biāo);
用戶增長分析,實(shí)時統(tǒng)計(jì)各投放渠道拉新情況,根據(jù)效果實(shí)時調(diào)整各渠道的投放量;
實(shí)時數(shù)據(jù)處理,廣告展現(xiàn)流、點(diǎn)擊流實(shí)時 Join,客戶端日志的拆分等;
直播 CDN 調(diào)度,實(shí)時監(jiān)控各 CDN 廠商質(zhì)量,通過 Flink 實(shí)時訓(xùn)練調(diào)整各個 CDN 廠商流量配比。
2. Flink 集群規(guī)模
快手目前集群規(guī)模有 1500 臺左右,日處理?xiàng)l目數(shù)總共有3萬億,峰值處理?xiàng)l目數(shù)大約是 3億/s 左右。集群部署都是 On Yarn 模式,實(shí)時集群和離線集群混合部署,通過 Yarn 標(biāo)簽進(jìn)行物理隔離,實(shí)時集群是 Flink 專用集群,針對隔離性、穩(wěn)定性要求極高的業(yè)務(wù)部署。注:本文所涉及數(shù)據(jù)僅代表嘉賓分享時的數(shù)據(jù)。
此處重點(diǎn)和大家分享下快手的實(shí)時多維分析平臺。
1. 快手實(shí)時多維分析場景
快手內(nèi)部有這樣的應(yīng)用場景,每天的數(shù)據(jù)量在百億級別,業(yè)務(wù)方需要在數(shù)據(jù)中任選五個以內(nèi)的維度組合進(jìn)行全維的建模進(jìn)而計(jì)算累計(jì)的 PV ( Page View 訪問量 )、UV ( Unique Visitor 獨(dú)立訪客 )、新增或者留存等這樣的指標(biāo),然后指標(biāo)的計(jì)算結(jié)果要實(shí)時進(jìn)行圖形化報表展示供給業(yè)務(wù)分析人員進(jìn)行分析。
2. 方案選型
現(xiàn)在社區(qū)已經(jīng)有一些 OLAP 實(shí)時分析的工具,像 Druid 和 ClickHouse;目前快手采用的是 Flink+Kudu 的方案,在前期調(diào)研階段對這三種方案從計(jì)算能力、分組聚合能力、查詢并發(fā)以及查詢延遲四個方面結(jié)合實(shí)時多維查詢業(yè)務(wù)場景進(jìn)行對比分析:
計(jì)算能力方面:多維查詢這種業(yè)務(wù)場景需要支持 Sum、Count 和 count distinct 等能力,而 Druid 社區(qū)版本不支持 count distinct,快手內(nèi)部版本支持?jǐn)?shù)值類型、但不支持字符類型的 count distinct;ClickHouse 本身全都支持這些計(jì)算能力;Flink 是一個實(shí)時計(jì)算引擎,這些能力也都具備。
分組聚合能力方面:Druid 的分組聚合能力一般,ClickHouse 和 Flink 都支持較強(qiáng)的分組聚合能力。
查詢并發(fā)方面:ClickHouse 的索引比較弱,不能支持較高的查詢并發(fā),Druid 和 Flink 都支持較高的并發(fā)度,存儲系統(tǒng) Kudu,它也支持強(qiáng)索引以及很高的并發(fā)。
查詢延遲方面:Druid 和 ClickHouse 都是在查詢時進(jìn)行現(xiàn)計(jì)算,而 Flink+Kudu 方案,通過 Flink 實(shí)時計(jì)算后將指標(biāo)結(jié)果直接存儲到 Kudu 中,查詢直接從 Kudu 中查詢結(jié)果而不需要進(jìn)行計(jì)算,所以查詢延遲比較低。
采用 Flink+Kudu 的方案主要思想是借鑒了 Kylin 的思路,Kylin 可以指定很多維度和指標(biāo)進(jìn)行離線的預(yù)計(jì)算然后將預(yù)計(jì)算結(jié)果存儲到 HBase 中;快手的方案是通過 Flink 實(shí)時計(jì)算指標(biāo),再實(shí)時地寫到 Kudu 里面。
3. 方案設(shè)計(jì)
實(shí)時多維分析的整體的流程為:用戶在快手自研的 BI 分析工具 KwaiBI 上配置 Cube 數(shù)據(jù)立方體模型,指定維度列和指標(biāo)列以及基于指標(biāo)做什么樣的計(jì)算;配置過程中選擇的數(shù)據(jù)表是經(jīng)過處理過后存儲在實(shí)時數(shù)倉平臺中的數(shù)據(jù)表;然后根據(jù)配置的計(jì)算規(guī)則通過 Flink 任務(wù)進(jìn)行建模指標(biāo)的預(yù)計(jì)算,結(jié)果存儲到 Kudu 中;最后 KwaiBI 從 Kudu 中查詢數(shù)據(jù)進(jìn)行實(shí)時看板展示。
接下來詳細(xì)介紹一下實(shí)時多維分析的主要模塊。
① 數(shù)據(jù)預(yù)處理
KwaiBI 配置維度建模時選擇的數(shù)據(jù)表,是經(jīng)過提前預(yù)處理的:
首先內(nèi)部有一個元信息系統(tǒng),在元信息系統(tǒng)中提供統(tǒng)一的 schema 服務(wù),所有的信息都被抽象為邏輯表;
例如 Kafka 的 topic、Redis、HBase 表等元數(shù)據(jù)信息都抽取成 schema 存儲起來;
快手 Kafka 的物理數(shù)據(jù)格式大部分是 Protobuf 和 Json 格式,schema 服務(wù)平臺也支持將其映射為邏輯表;
用戶只需要將邏輯表建好之后,就可以在實(shí)時數(shù)倉對數(shù)據(jù)進(jìn)行清洗和過濾。
② 建模計(jì)算指標(biāo)
數(shù)據(jù)預(yù)處理完成后,最重要的步驟是進(jìn)行建模指標(biāo)計(jì)算,此處支持 Cube、GroupingSet 方式維度組合來計(jì)算小時或者天累計(jì)的 UV ( Unique Visitor )、新增和留存等指標(biāo),可以根據(jù)用戶配置按固定時間間隔定期輸出結(jié)果;維度聚合邏輯中,通過逐層降維計(jì)算的方式會讓 DAG 作業(yè)圖十分復(fù)雜,如上圖右上角模型所示;因此快手設(shè)計(jì)了兩層降維計(jì)算模型,分為全維度層和剩余維度層,這樣既利用了全維度層的聚合結(jié)果又簡化了 DAG 作業(yè)圖。
以 UV 類指標(biāo)計(jì)算舉例,兩個黃色虛線框分別對應(yīng)兩層計(jì)算模塊:全維計(jì)算和降維計(jì)算。
全維計(jì)算分為兩個步驟,為避免數(shù)據(jù)傾斜問題,首先是維度打散預(yù)聚合,將相同的維度值先哈希打散一下。因?yàn)?UV 指標(biāo)需要做到精確去重,所以采用 Bitmap 進(jìn)行去重操作,每分鐘一個窗口計(jì)算出增量窗口內(nèi)數(shù)據(jù)的 Bitmap 發(fā)送給第二步按維度全量聚合;在全量聚合中,將增量的 Bitmap 合并到全量 Bitmap 中最終得出準(zhǔn)確的 UV 值。然而有人會有問題,針對用戶 id 這種的數(shù)值類型的可以采用此種方案,但是對于 deviceid 這種字符類型的數(shù)據(jù)應(yīng)該如何處理?實(shí)際上在源頭,數(shù)據(jù)進(jìn)行維度聚合之前,會通過字典服務(wù)將字符類型的變量轉(zhuǎn)換為唯一的 Long 類型值,進(jìn)而通過 Bitmap 進(jìn)行去重計(jì)算 UV。
降維計(jì)算中,通過全維計(jì)算得出的結(jié)果進(jìn)行預(yù)聚合然后進(jìn)行全量聚合,最終將結(jié)果進(jìn)行輸出。
再重點(diǎn)介紹下,建模指標(biāo)計(jì)算中的幾個關(guān)鍵點(diǎn)。在建模指標(biāo)計(jì)算中,為了避免維度數(shù)據(jù)傾斜問題,通過預(yù)聚合 ( 相同維度 hash 打散 ) 和全量聚合 ( 相同維度打散后聚合 ) 兩種方式來解決;為了解決 UV 精確去重問題,前文有提到,使用 Bitmap 進(jìn)行精確去重,通過字典服務(wù)將 String 類型數(shù)據(jù)轉(zhuǎn)換成 Long 類型數(shù)據(jù)進(jìn)而便于存儲到 Bitmap 中,因?yàn)榻y(tǒng)計(jì) UV 要統(tǒng)計(jì)歷史的數(shù)據(jù),比如說按天累計(jì),隨著時間的推移,Bitmap 會越來越大,在 Rocksdb 狀態(tài)存儲下,讀寫過大的 KV 會比較耗性能,所以內(nèi)部自定義了一個 BitmapState,將 Bitmap 進(jìn)行分塊存儲,一個 blockid 對應(yīng)一個局部的 bitmap,這樣在 RocksDB 中存儲時,一個 KV 會比較小,更新的時候也只需要根據(jù) blockid 更新局部的 bitmap 就可以而不需要全量更新。
接下來,看新增類的指標(biāo)計(jì)算,和剛剛 UV 的不同點(diǎn)是需要判斷是否為新增用戶,通過異步地訪問外部的歷史用戶服務(wù)進(jìn)行新增用戶判斷,再根據(jù)新增用戶流計(jì)算新增 UV,這塊計(jì)算邏輯和 UV 計(jì)算一致。
然后,再來看留存類指標(biāo)計(jì)算,與 UV 計(jì)算不同的時候,不僅需要當(dāng)天的數(shù)據(jù)還需要前一天的歷史數(shù)據(jù),這樣才能計(jì)算出留存率,內(nèi)部實(shí)現(xiàn)的時候是采用雙 buffer state 存儲,在計(jì)算的時候?qū)㈦p buffer 數(shù)據(jù)相除就可以計(jì)算出留存率。
③ Kudu 存儲
最后經(jīng)過上面的計(jì)算邏輯后,會將結(jié)果存儲到 Kudu 里面,其本身具有低延遲隨機(jī)讀寫以及快速列掃描等特點(diǎn),很適合實(shí)時交互分析場景;在存儲方式上,首先對維度進(jìn)行編碼,然后按時間+維度組合+維度值組合作為主鍵,最終按維度組合、維度值組合、時間進(jìn)行分區(qū),這樣有利于提高查詢的效率快速獲取到數(shù)據(jù)。
4. KwaiBI 展示
界面為配置 Cube 模型的截圖,配置一些列并指定類型,再通過一個 SQL 語句來描述指標(biāo)計(jì)算的邏輯,最終結(jié)果也會通過 KwaiBI 展示出來。
SlimBase
更省 IO、嵌入式共享 state 存儲
接下來介紹一種比 RocksDB 更省 IO、嵌入式的共享 state 存儲引擎:SlimBase。
1. 面臨的挑戰(zhàn)
首先看一下 Flink 使用 RocksDB 遇到的問題,先闡述一下快手的應(yīng)用場景、廣告展現(xiàn)點(diǎn)擊流實(shí)時 Join 場景:打開快手 App 可能會收到廣告服務(wù)推薦的廣告視頻,用戶可能會點(diǎn)擊展現(xiàn)的廣告視頻。這樣的行為在后端會形成兩份數(shù)據(jù)流,一份是廣告展現(xiàn)日志,一份是客戶端點(diǎn)擊日志。這兩份數(shù)據(jù)進(jìn)行實(shí)時 Join,并將 Join 結(jié)果作為樣本數(shù)據(jù)用于模型訓(xùn)練,訓(xùn)練出的模型會被推送到線上的廣告服務(wù)。該場景下展現(xiàn)以后20分鐘的點(diǎn)擊被認(rèn)為是有效點(diǎn)擊,實(shí)時 Join 邏輯則是點(diǎn)擊數(shù)據(jù) Join 過去20分鐘內(nèi)的展現(xiàn)。其中,展現(xiàn)流的數(shù)據(jù)量相對比較大,20分鐘數(shù)據(jù)在 1TB 以上。檢查點(diǎn)設(shè)置為五分鐘,Backend 選擇 RocksDB。
在這樣的場景下,面臨著磁盤 IO 開銷70%,其中50%開銷來自于 Compaction;在 Checkpoint 期間,磁盤 IO 開銷達(dá)到了100%,耗時在1~5分鐘,甚至?xí)L于 Checkpoint 間隔,業(yè)務(wù)能明顯感覺到反壓。經(jīng)過分析找出問題:
首先,在 Checkpoint 期間會產(chǎn)生四倍的大規(guī)模數(shù)據(jù)拷貝,即:從 RocksDB 中全量讀取出來然后以三副本形式寫入到 HDFS 中;
其次,對于大規(guī)模數(shù)據(jù)寫入,RocksDB 的默認(rèn) Level Compaction 會有嚴(yán)重的 IO 放大開銷。
2. 解決方案
由于出現(xiàn)上文闡述的問題,開始尋找解決方案,整體思路是在數(shù)據(jù)寫入時直接落地到共享存儲中,避免 Checkpoint 帶來的數(shù)據(jù)拷貝問題。手段是嘗試使用更省 IO 的 Compaction,例如使用 SizeTieredCompation 方式,或者利用時序數(shù)據(jù)的特點(diǎn)使用并改造 FIFOCompaction。綜合比較共享存儲、SizeTieredCompation、基于事件時間的 FIFOCompaction 以及技術(shù)棧四個方面得出共識:HBase 代替 RocksDB 方案。
-
共享存儲方面,HBase 支持, RocksDB 不支持 -
SizeTieredCompation 方面,RocksDB 默認(rèn)不支持,但 HBase 默認(rèn)支持,開發(fā)起來比較簡單 -
基于事件時間下推的 FIFOCompaction 方面,RocksDB 不支持,但 HBase 開發(fā)起來比較簡單 -
技術(shù)棧方面,RocksDB 使用 C++,HBase 使用 java,HBase 改造起來更方便
但是 HBase 有些方面相比 RocksDB 較差:
HBase 是一個依賴 zookeeper、包含 Master 和 RegionServer 的重量級分布式系統(tǒng);而 RocksDB 僅是一個嵌入式的 Lib 庫,很輕量級。
在資源隔離方面,HBase 比較困難,內(nèi)存和 cpu 被多個 Container 共享;而 RocksDB 比較容易,內(nèi)存和 cpu 伴隨 Container 天生隔離。
網(wǎng)絡(luò)開銷方面,因?yàn)?HBase 是分布式的,所有比嵌入式的 RocksDB 開銷要大很多。
綜合上面幾點(diǎn)原因,快手達(dá)成了第二個共識,將 HBase 瘦身,改造為嵌入式共享存儲系統(tǒng)。
3. 實(shí)現(xiàn)方案
接下來介紹一下將 HBase 改造成 SlimBase 的實(shí)現(xiàn)方案,主要是分為兩層:
一層是 SlimBase 本身,包含三層結(jié)構(gòu):Slim HBase、適配器以及接口層;
另一層是 SlimBaseStateBackend,主要包含 ListState、MapState、ValueState 和 ReduceState。
后面將從 HBase 瘦身、適配并實(shí)現(xiàn)操作接口以及實(shí)現(xiàn) SlimBaseStateBackend 三個步驟分別進(jìn)行詳細(xì)介紹。
① HBase 瘦身
先講 HBase 瘦身,主要從減肥和增瘦兩個步驟,在減肥方面:
先對 HBase 進(jìn)行減裁,去除 client、zookeeper 和 master,僅保留 RegionServer
再對 RegionServer 進(jìn)行剪裁,去除 ZK Listener、Master Tracker、Rpc、WAL 和 MetaTable
僅保留 RegionServer 中的 Cache、Memstore、Compaction、Fluster 和 Fs
在增瘦方面:
將原來 Master 上用于清理 Hfile 的 HFileCleaner 遷移到 RegionServer 上
RocksDB 支持讀放大寫的 merge 接口,但是 SlimBase 是不支持的,所以要實(shí)現(xiàn) merge 的接口
接口層主要有以下三點(diǎn)實(shí)現(xiàn):
仿照 RocksDB,邏輯視圖分為兩級:DB 和 ColumnFamily
支持一些基本的接口:put/get/delete/merge 和 snapshot
額外支持了 restore 接口,用于從 snapshot 中恢復(fù)
適配層主要有以下兩個概念:
一個 SlimBase 適配為 Hbase 的 namespace
一個 SlimBase 的 ColumnFamily 適配為 HBase 的 table
SlimBaseStateBackend 實(shí)現(xiàn)上主要體現(xiàn)在兩個方面:
一是多種 States 實(shí)現(xiàn),支持多種數(shù)據(jù)結(jié)構(gòu),ListState、MapState、ValueState 和 ReduceState
二是改造 Snapshot 和 Restore 的流程,從下面的兩幅圖可以看出,SlimBase 在磁盤 IO 上節(jié)省了大量的資源,避免了多次的 IO 的問題。
4. 測試結(jié)論
上線對比測試后,得出測試結(jié)論:
Checkpoint 和 Restore 的時延從分鐘級別降到秒級。
磁盤 IO 下降了66%
磁盤寫吞吐下降50%
CPU 開銷下降了33%
5. 后期優(yōu)化
目前用的 Compaction 策略是 SizeTieredCompaction,后期要實(shí)現(xiàn)基于 OldestUnexpiredTime 的 FiFOCompaction 策略,目標(biāo)是做到無磁盤 IO 開銷。
FiFOCompaction 是一種基于 TTL 的無 IO 的 Compaction 策略;OldestUnexpiredTime 是指例如設(shè)置 OldestUnexpiredTime=t2,表示 t2 時刻前的數(shù)據(jù)全部過期,可以被 Compaction 清理,基于時間點(diǎn)的 FIFOCompaction 理論上可以做到無磁盤 IO 開銷。
后續(xù)還有四點(diǎn)優(yōu)化,前三點(diǎn)是基于 HBase 的優(yōu)化,最后是針對 HDFS 做的優(yōu)化:
SlimBase 使用 InMemoryCompaction,降低內(nèi)存 Flush 和 Compaction 開銷
SlimBase 支持 prefixBloomFilter,提高 Scan 性能
SlimBase 支持短路讀
HDFS 副本落盤改造:非本地副本使用 DirectIO 直接落盤,提高本地讀 pagecache 命中率;此條主要是在測試使用時發(fā)現(xiàn)單副本比多副本讀寫效率高這一問題
6. 未來規(guī)劃
從語言、存儲、壓縮策略、事件事件下推、垃圾回收、檢查點(diǎn)時間、重加載時間七個方面來看,SlimBase 都比 RocksDB 更適合快手實(shí)時計(jì)算任務(wù)的開發(fā),未來的規(guī)劃是對 SlimBase 的性能做進(jìn)一步優(yōu)化,愿景是將快手 Flink 上的所有業(yè)務(wù)場景全部用 SlimBase 替代掉 RocksDB。
分享嘉賓:
董亭亭,快手實(shí)時計(jì)算引擎團(tuán)隊(duì)負(fù)責(zé)人。
特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:
長按訂閱更多精彩▼
如有收獲,點(diǎn)個在看,誠摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點(diǎn),不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!