唯品會億級數(shù)據(jù)服務平臺落地實踐
數(shù)據(jù)服務是數(shù)據(jù)中臺體系中的關鍵組成部分。作為數(shù)倉對接上層應用的統(tǒng)一出入口,數(shù)據(jù)服務將數(shù)倉當作一個統(tǒng)一的 DB 來訪問,提供統(tǒng)一的 API 接口控制數(shù)據(jù)的流入及流出,能夠滿足用戶對不同類型數(shù)據(jù)的訪問需求。
電商平臺唯品會的數(shù)據(jù)服務自 2019 年開始建設,在公司內經(jīng)歷了從無到有落地,再到為超過 30 業(yè)務方提供 to B、to C 的數(shù)據(jù)服務的過程。本文主要介紹唯品會自研數(shù)據(jù)服務 Hera 的相關背景、架構設計和核心功能。
背景
在統(tǒng)一數(shù)倉數(shù)據(jù)服務之前,數(shù)倉提供的訪問接入方式往往存在效率問題低、數(shù)據(jù)指標難統(tǒng)一等問題,具體而言有以下幾個比較突出的情況:- 廣告人群 USP、DMP 系統(tǒng)每天需要通過 HiveServer 以流的方式從數(shù)倉導出數(shù)據(jù)到本地,每個人群的數(shù)據(jù)量從幾十萬到幾個億,人群數(shù)量 2w ,每個人群運行時間在 30min ,部分大人群的運行直接超過 1h,在資源緊張的情況下,人群延遲情況嚴重。
- 數(shù)倉的數(shù)據(jù)在被數(shù)據(jù)產(chǎn)品使用時,需要為每個表新生成一個單獨的接口,應用端需要為每一種訪問方式(如 Presto、ClickHouse)區(qū)分使用不同的接口,導致數(shù)據(jù)產(chǎn)品接口暴漲,不方便維護,影響開發(fā)及維護效率。數(shù)據(jù)在不同的存儲時,需要包含 clickhouse-client,presto-client 等等第三方 jar 包。
- 不同數(shù)據(jù)產(chǎn)品中都需要使用一些常用的數(shù)據(jù)指標,如銷售額、訂單數(shù)、PV、UV 等,而這些數(shù)據(jù)在不同數(shù)據(jù)產(chǎn)品的實現(xiàn)口徑、實現(xiàn)方式都不一樣,無法形成數(shù)據(jù)共享,每個數(shù)據(jù)產(chǎn)品都重復進行相同的指標建設。因此,在不同數(shù)據(jù)產(chǎn)品查看相同指標卻發(fā)現(xiàn)數(shù)值不同的情況下,難以判斷哪個數(shù)據(jù)產(chǎn)品提供的數(shù)據(jù)是準確的。

為解決以上問題,數(shù)據(jù)服務應運而生。目前數(shù)據(jù)服務的主要優(yōu)勢有:屏蔽底層的存儲引擎、計算引擎,使用同一個 API(one service),數(shù)倉數(shù)據(jù)分層存儲,不同 engine 的 SQL 生成能力,自適應 SQL 執(zhí)行以及統(tǒng)一緩存架構保障業(yè)務 SLA,支持數(shù)據(jù)注冊并授權給任何調用方進行使用,提高數(shù)據(jù)交付效率。
通過唯一的 ID 標識,數(shù)據(jù)產(chǎn)品可通過 ID 查閱數(shù)據(jù),而非直接訪問對應的數(shù)倉表。一方面,指標服務統(tǒng)一了指標的口徑,同時也支持快速構建新的數(shù)據(jù)產(chǎn)品。
架構設計
數(shù)據(jù)服務能給業(yè)務帶來運營和商業(yè)價值,核心在于給用戶提供自助分析數(shù)據(jù)能力。Hera 整體架構基于典型的 Master/slave 模型,數(shù)據(jù)流與控制流單獨鏈路,從而保障系統(tǒng)的高可用性。數(shù)據(jù)服務系統(tǒng)主要分為三層:- 應用接入層:業(yè)務申請接入時,可以根據(jù)業(yè)務要求選擇數(shù)據(jù)服務 API(TCP Client), HTTP 以及 OSP 服務接口(公司內部 RPC 框架)。
- 數(shù)據(jù)服務層:主要執(zhí)行業(yè)務提交的任務,并返回結果。主要功能點包括:路由策略,多引擎支持,引擎資源配置,引擎參數(shù)動態(tài)組裝,SQLLispengine 生成,SQL 自適應執(zhí)行,統(tǒng)一數(shù)據(jù)查詢緩存,F(xiàn)reeMaker SQL 動態(tài)生成等功能。
- 數(shù)據(jù)層:業(yè)務查詢的數(shù)據(jù)無論在數(shù)倉、Clickhouse、MySQL 還是 Redis 中,都可以很好地得到支持,用戶都使用同一套 API。

調度系統(tǒng)的整體流程大致包含以下模塊:
- Master:負責管理所有的 Worker、TransferServer、AdhocWorker 節(jié)點,同時負責調度分發(fā)作業(yè);
- Worker:負責執(zhí)行 ETL 和數(shù)據(jù)文件導出類型的作業(yè),拉起 AdhocWorker 進程(Adhoc 任務在 AdhocWorker 進程中的線程池中執(zhí)行),ETL 類型的作業(yè)通過子進程的方式完成;
- Client:客戶端,用于編程式地提交 SQL 作業(yè);
- ConfigCenter:負責向集群推送統(tǒng)一配置信息及其它運行時相關的配置和 SQLParser (根據(jù)給定的規(guī)則解析、替換、生成改寫 SQL 語句,以支持不同計算引擎的執(zhí)行);
- TransferServer:文件傳輸服務。

主要功能
Hera 數(shù)據(jù)服務的主要功能有:多隊列調度策略、多引擎查詢、多任務類型、文件導出、資源隔離、引擎參數(shù)動態(tài)組裝、自適應 Engine 執(zhí)行和 SQL 構建。多隊列調度策略
數(shù)據(jù)服務支持按照不同用戶、不同任務類型并根據(jù)權重劃分不同調度隊列,以滿足不同任務類型的 SLA。多引擎查詢
數(shù)據(jù)服務支持目前公司內部所有 OLAP 和數(shù)據(jù)庫類型,包括 Spark、Presto、Clickhouse、Hive 、MySQL、Redis。會根據(jù)業(yè)務具體場景和要求,選擇當前最佳的查詢引擎。多任務類型
數(shù)據(jù)服務支持的任務類型有:ETL、Adhoc、文件導出、數(shù)據(jù)導入。加上多引擎功能,實現(xiàn)多種功能組合,如 Spark adhoc 和 Presto adhoc。文件導出
主要是支持大量的數(shù)據(jù)從數(shù)據(jù)倉庫中導出,便于業(yè)務分析和處理,比如供應商發(fā)券和信息推送等。具體執(zhí)行過程如下:用戶提交需要導出數(shù)據(jù)的 SQL,通過分布式 engine 執(zhí)行完成后,落地文件到 hdfs/alluxio. 客戶端通過 TCP 拉取文件到本地。千萬億級的數(shù)據(jù)導出耗時最多 10min。數(shù)據(jù)導出在人群數(shù)據(jù)導出上性能由原來的 30min ,提升到最多不超過 3min,性能提升 10~30 倍。具體流程如下:

資源隔離(Worker 資源和計算資源)
業(yè)務一般分為核心和非核心,在資源分配和調度上也不同。主要是從執(zhí)行任務 Worker 和引擎資源,都可以實現(xiàn)物理級別的隔離,最大化減少不同業(yè)務之間相互影響。引擎參數(shù)動態(tài)組裝
線上業(yè)務執(zhí)行需要根據(jù)業(yè)務情況進行調優(yōu),動態(tài)限制用戶資源使用,集群整體切換等操作,這個時候就需要對用戶作業(yè)參數(shù)動態(tài)修改,如 OLAP 引擎執(zhí)行任務時,經(jīng)常都要根據(jù)任務調優(yōu),設置不同參數(shù)。針對這類問題,數(shù)據(jù)服務提供了根據(jù)引擎類型自動組裝引擎參數(shù),并且引擎參數(shù)支持動態(tài)調整,也可以針對特定任務、執(zhí)行賬號、業(yè)務類型來設定 OLAP 引擎執(zhí)行參數(shù)。自適應 Engine 執(zhí)行
業(yè)務方在查詢時,有可能因為引擎資源不足或者查詢條件數(shù)據(jù)類型不匹配從而導致執(zhí)行失敗。為了提高查詢成功率和服務 SLA 保障,設計了 Ad Hoc 自適應引擎執(zhí)行,當一個引擎執(zhí)行報錯后,會切換到另外一個引擎繼續(xù)執(zhí)行。具體自適應執(zhí)行邏輯如下圖所示:
SQL 構建
數(shù)據(jù)服務 SQL 構建基于維度事實建模,支持單表模型、星型模型和雪花模型。- 單表模型:一張事實表,一般為 DWS 或者 ADS 的匯總事實表。
- 星型模型:1 張事實表(如 DWD 明細事實表) N 張維表,例如訂單明細表 (事實表 FK=商品 ID) ? 商品維表 (維度表 PK=商品 ID)?。
- 雪花模型:1 張事實表(如 DWD 明細事實表) N 張維表 M 張沒有直接連接到事實表的維表,例如訂單明細表 (事實表 FK=商品 ID) ? 商品維表 (維度表 PK=商品 ID,F(xiàn)K=品類 ID)? ? 品類維表(維度表 PK=品類 ID)。

自定義語法(Lisp)描述指標的計算公式
Lisp 是一套自定義的語法,用戶可以使用 Lisp 來描述指標的計算公式。其目標是為了構建統(tǒng)一的指標計算公式處理范式,屏蔽底層的執(zhí)行引擎的語法細節(jié),最大化優(yōu)化業(yè)務配置和生成指標的效率。Lisp 總體格式 (oprator param1 param2 ...) param 可以是一個參數(shù),也可以是一個 Lisp 表達式。目前已經(jīng)實現(xiàn)的功能:
- 聚合表達式
在 Presto 中的實現(xiàn)是 approx_distinct(x,e) over (partition by y,z),在 Spark 中的實現(xiàn)是 approx_count_distinct(x,e) over (partition by y,z)。y,z 只在開窗函數(shù)模式下才生效。目前也支持嵌套的聚合表達式(sum (sum (max x)))。
- 條件表達式
簡單模式 ?(case value val1 then1?[val2 then2] ...?[elseVal])eg:(case subject_id (int 2) (int 1)) ->? case subject_id when 2 then 1 end)
查找模式 ?(case when1 then1 [when2 then2] ... [elseVal])eg:(case (= subject_id (string goods_base)) (int 2) (int 1)) ->? case when subject_id = 'goods_base' then 2 else 1 end
- 類型標識表達式
- 類型轉換表達式
- 聚合通用表達式
- 非聚合通用表達式
例如:(func_none json_extract_scalar 40079 '$.m_name' )
Lisp 語法的解析
Lisp 的解析和翻譯是基于 antlr4 來實現(xiàn)的,處理流程如下:
- 將 Lisp(count x y)表達式通過 antlr 翻譯成語法樹,如下圖所示:

- 通過自定義的 Listener 遍歷語法樹
- 在遍歷語法樹的過程中,結合指標的 query engine(presto/spark/clickhouse/mysql)元數(shù)據(jù)生成對應的查詢引擎的 SQL 代碼(approx_distinct(x,e) over (partition by y))
任務調度
基于 Netty 庫收發(fā)集群消息,系統(tǒng)僅僅使用同一個線程池對象 EventLoopGroup 來收發(fā)消息,而用戶的業(yè)務邏輯,則交由一個單獨的線程池。選用 Netty 的另外一個原因是“零拷貝”的能力,在大量數(shù)據(jù)返回時,通過文件的形式直接將結果送給調用者。
多隊列 多用戶調度
業(yè)務需求通常包含時間敏感與不敏感作業(yè),為了提高作業(yè)的穩(wěn)定性和系統(tǒng)的可配置性,Hera 提供了多隊列作業(yè)調度的功能。用戶在提交作業(yè)時可以顯式地指定一個作業(yè)隊列名,當這個作業(yè)在提交到集群時,如果相應的隊列有空閑,則就會被添加進相應的隊列中,否則返回具體的錯誤給客戶端,如任務隊列滿、隊列名不存在、隊列已經(jīng)關閉等,客戶端可以選擇“是否重試提交”。
當一個作業(yè)被添加進隊列之后,Master 就會立即嘗試調度這個隊列中的作業(yè),基于以下條件選擇合適的作業(yè)運行:
- 每個隊列都有自己的權重,同時會設置占用整個集群的資源總量,如最多使用多少內存、最多運行的任務數(shù)量等。
- 隊列中的任務也有自己的權重,同時會記錄這個作業(yè)入隊的時間,在排序當前隊列的作業(yè)時,利用入隊的時間偏移量和總的超時時間,計算得到一個最終的評分。
- 除了調度系統(tǒng)本身的調度策略外,還需要考慮外部計算集群的負載,在從某個隊列中拿出一個作業(yè)后,再進行一次過濾,或者是先過濾,再進行作業(yè)的評分計算。
一個可用的計算作業(yè)評分模型如下:
隊列動態(tài)因子 = 隊列大小 / 隊列容量 * (1 - 作業(yè)運行數(shù) / 隊列并行度)
這個等式表示的意義是:如果某個隊列正在等待的作業(yè)的占比比較大,同時并行運行的作業(yè)數(shù)占比也比較大時,這個隊列的作業(yè)就擁有一個更大的因子,也就意味著在隊列權重相同時,這個隊列中的作業(yè)應該被優(yōu)先調度。
作業(yè)權重 = 1 - (當前時間-入隊時間) / 超時時間
這個等式表示的意義是:在同一個隊列中,如果一個作業(yè)的剩余超時時間越少,則意味著此作業(yè)將更快達到超時,因此它應該獲得更大的選擇機會。
score = 作業(yè)權重 隊列動態(tài)因子 隊列權重
這個等式表示的意義是:對于所有的隊列中的所有任務,首先決定一個作業(yè)是否優(yōu)先被調度的因子是設置的隊列權重,例如權重為 10 的隊列的作業(yè),應該比權重為 1 的隊列中的作業(yè)被優(yōu)先調度,而不管作業(yè)本身的權重(是否會有很大的機率超時);其次影響作業(yè)調度優(yōu)先級的因子是隊列動態(tài)因子,例如有兩個相同權重的隊列時,如果一個隊列的動態(tài)因子為 0.5,另外一個隊列的動態(tài)因子是 0.3,那么應該優(yōu)先選擇動態(tài)因子為 0.5 的隊列作業(yè)進行調度,而不管作業(yè)本身的權重;最后影響作業(yè)調度優(yōu)先級的因子是作業(yè)權重,例如在同一個隊列中,有兩個權重分別為 0.2 和 0.5 的作業(yè),那么為了避免更多的作業(yè)超時,權重為 0.2 的作業(yè)應該被優(yōu)先調度。
簡單描述作業(yè)的排序過程就是,首先按隊列權重排序所有的隊列;對于有重復的隊列,則會計算每個隊列的動態(tài)因子,并按此因子排序;對于每一個隊列,作業(yè)的排序規(guī)則按作業(yè)的超時比率來排序;最終依次按序遍歷每一個隊列,嘗試從中選擇足夠多的作業(yè)運行,直到作業(yè)都被運行或是達到集群限制條件。這里說足夠多,是指每一個隊列都會有一個最大的并行度和最大資源占比,這兩個限制隊列的參數(shù)組合,是為了避免因某一個隊列的容量和并行度被設置的過大,可能超過了整個集群,導致其它隊列被“餓死”的情況。
SQL 作業(yè)流程
用戶通過 Client 提交原始 SQL,這里以 Presto SQL 為例,Client 在提交作業(yè)時,指定了 SQL 路由,則會首先通過訪問 SQLParser 服務,在發(fā)送給 Master 之前,會首先提交 SQL 語句到 SQLParser 服務器,將 SQL 解析成后端計算集群可以支持的 SQL 語句,如 Spark、Presto、ClickHouse 等,為了能夠減少 RPC 交互次數(shù),SQLParser 會一次返回所有可能被改寫的 SQL 語句。在接收到 SQLParser 服務返回的多個可能 SQL 語句后,就會填充當前的作業(yè)對象,真正開始向 Master 提交運行。
Master 在收到用戶提交的作業(yè)后,會根據(jù)一定的調度策略,最終將任務分發(fā)到合適的 Worker 上,開始執(zhí)行。Worker 會首先采用 SQL 作業(yè)默認的執(zhí)行引擎,比如 Presto,提交到對應的計算集群運行,但如果因為某種原因不能得到結果,則會嘗試使用其它的計算引擎進行計算。當然這里也可以同時向多個計算集群提交作業(yè),一旦某個集群首先返回結果時,就取消所有其它的作業(yè),不過這需要其它計算集群的入口能夠支持取消操作。
當 SQL 作業(yè)完成后,將結果返回到 Worker 端,為了能夠更加高效地將查詢結果返回給 Client 端,Worker 會從 Master 發(fā)送的任務對象中提取 Client 側信息,并將結果直接發(fā)送給 Client,直到收到確認信息,至此整個任務才算執(zhí)行完畢。
在整個作業(yè)的流轉過程中,會以任務的概念在調度系統(tǒng)中進行傳播,并經(jīng)歷幾個狀態(tài)的更新,分別標識 new、waiting、running、succeed、failed 階段。

Metrics 采集
數(shù)據(jù)服務搜集兩類 metrics,一類靜態(tài)的,用于描述 master/worker/client 的基本信息;一類是動態(tài)的,描述 master/worker 的運行時信息。這里主要說明一下有關集群動態(tài)信息的采集過程及作用。以 worker 為例,當 worker 成功注冊到 master 時,就會開啟定時心跳匯報動作,并借道心跳請求,將自己的運行時信息匯報給 master。這里主要是內存使用情況,例如當前 worker 通過估算方法,統(tǒng)計目前運行的任務占據(jù)了多少內存,以便 master 能夠在后續(xù)的任務分發(fā)過程中,能夠根據(jù)內存信息進行決策。master 會統(tǒng)計它所管理的集群整個情況,例如每個任務隊列的快照信息、worker 的快照信息、集群的運行時配置信息等,并通過參數(shù)控制是否打印這些信息,以便調試。調用情況
目前數(shù)據(jù)服務每天調用量:toC: 9000W /每天。toB:150W /每天(透傳到執(zhí)行 engine 端調用量)。- ETL 任務執(zhí)行時間基本在 3 分鐘左右完成;
- adhoc 查詢目前主要有 Spark Thrift Server,Presto,Clickhouse 3 種引擎,大部分 SQL 90% 2s 左右完成,Clickhouse 查詢 99%在 1s 左右完成,Presto 調用量 50W /日, Clickhouse 調用量 44W /日。
解決的性能問題
數(shù)據(jù)服務主要解決 SLA 方面的問題。如人群計算、數(shù)據(jù)無縫遷移、數(shù)據(jù)產(chǎn)品 SLA 等,這里用人群舉例說明如下:人群計算遇到的問題:
- 人群計算任務的數(shù)據(jù)本地性不好;
- HDFS 存在數(shù)據(jù)熱點問題;
- HDFS 讀寫本身存在長尾現(xiàn)象。
數(shù)據(jù)服務改造新的架構方案:
- 計算與存儲同置,這樣數(shù)據(jù)就不需通過網(wǎng)絡反復讀取,造成網(wǎng)絡流量浪費。
- 減少 HDFS 讀寫長尾對人群計算造成的額外影響,同時減少人群計算對于 HDFS 穩(wěn)定性的影響。
- 廣告人群計算介于線上生產(chǎn)任務跟離線任務之間的任務類型。這里我們希望能保證這類應用的可靠性和穩(wěn)定性,從而更好地為公司業(yè)務賦能
- 通過數(shù)據(jù)服務執(zhí)行人群計算。

基于 Alluxio 的緩存表同步
將 Hive 表的 location 從 HDFS 路徑替換為 Alluxio 路徑,即表示該表的數(shù)據(jù)存儲于 Alluxio 中。我們使用的方案不是直接寫通過 ETL 任務寫 Alluxio 表的數(shù)據(jù),而是由 Alluxio 主動去拉取同樣 Hive 表結構的 HDFS 中的數(shù)據(jù),即我們創(chuàng)建了一個 HDFS 表的 Alluxio 緩存表。基于 HDFS 的人群計算底表的表結構如下:
CREATE TABLE `hdfs.ads_tags_table`(
`oaid_md5` string,
`mid` string,
`user_id` bigint,
.........
)
PARTITIONED BY (
`dt` string)
LOCATION
'hdfs://xxxx/hdfs.db/ads_tags_table'
基于 Alluxio 的人群計算底表的表結構如下:
CREATE TABLE `alluxio.ads_tags_table`(
`oaid_md5` string,
`mid` string,
`user_id` bigint,
.........
)
PARTITIONED BY (
`dt` string COMMENT '????')
LOCATION
'alluxio://zk@IP1:2181,IP2:2181/alluxio.db/ads_tags_table'
?兩個表結構的字段和分區(qū)定義完全相同。只有兩處不同點:通過不同的庫名區(qū)分了是 HDFS 的表還是 Alluxio 的表;location 具體確認了數(shù)據(jù)存儲的路徑是 HDFS 還是 Alluxio。
由于 Alluxio 不能感知到分區(qū)表的變化,我們開發(fā)了一個定時任務去自動感知源表的分區(qū)變化,使得 Hive 表的數(shù)據(jù)能夠同步到 Alluxio 中。
具體步驟如下:
- 定時任務發(fā)起輪詢,檢測源表是否有新增分區(qū)。
- 發(fā)起一個 SYN2ALLUXIO 的任務由數(shù)據(jù)服務執(zhí)行。
- 任務執(zhí)行腳本為將 Alluxio 表添加與 HDFS 表相同的分區(qū)。
- 分區(qū)添加完成之后,Alluxio 會自動從 mount 的 HDFS 路徑完成數(shù)據(jù)同步。

人群計算任務
上小節(jié)介紹了如何讓 Alluxio 和 HDFS 的 Hive 表保持數(shù)據(jù)同步,接下來需要做的就是讓任務計算的 Spark 任務跑在 Spark 與 Alluxio 混部的集群上,充分利用數(shù)據(jù)的本地性以及計算資源的隔離性,提高人群計算效率。人群標簽計算的 SQL 樣例如下:
INSERT INTO hive_advert.cd0000127760_full
SELECT result_id, '20210703'
FROM
(SELECT oaid_md5 AS result_id
FROM hdfs.ads_tags_table AS ta
WHERE ta.dt = '20210702' and xxxxxxx) AS t
上面是一個 Spark SQL 的 ETL,此處的 hdfs.ads_tags_table 即為人群計算依賴的底表,此表為一個 HDFS location 的表。
人群服務通過調用數(shù)據(jù)服務執(zhí)行。數(shù)據(jù)服務根據(jù)底表分區(qū)是否同步到 Alluxio 決定是否需要下推是用 Alluxio 表來完成計算。如果底表數(shù)據(jù)已經(jīng)同步到 Alluxio,則使用 Alluxio 表來做為底表計算人群。
下推邏輯是用 Alluxio 的表名替換原表,假設此處緩存的 Alluxio 表名為 alluxio.ads_tags_table,那么原 SQL 就會被改寫成如下:
INSERT INTO hive_advert.cd0000127760_full
SELECT result_id, '20210703'
FROM
(SELECT oaid_md5 AS result_id
FROM alluxio.ads_tags_table AS ta
WHERE ta.dt = '20210702' and xxxxxxx) AS t
依靠數(shù)據(jù)服務調度系統(tǒng),通過用戶 SQL 改寫以及 Alluxio 和 Spark 計算結點混部模式,人群計算任務提速了 10%~30%小結
雖然截至今天,Hera 數(shù)據(jù)服務已經(jīng)支持了很多生產(chǎn)業(yè)務,但目前仍有很多需要完善的地方:- 不同 engine 存在同一個含義函數(shù)寫法不一致的情況。這種情況在 Presto 跟 ClickHouse 的函數(shù)比較時尤為突出,如 Presto 的 strpos(string,substring)函數(shù),在 Clickhouse 中為 position(haystack, needle[, start_pos]),且這些函數(shù)的參數(shù)順序存在不一致的情況,如何更優(yōu)雅地支持不同 engine 的差異情況還需要進一步思考。
- 人群計算采用業(yè)界通用的 ClickHouse BitMap 解決方案落地,提升人群的計算效率同時擴展數(shù)據(jù)服務的業(yè)務邊界。
- 數(shù)據(jù)服務支持調度的 HA 和災備完善,更好地在 K8s 上進行部署。