導讀:阿里云 EMR 團隊和趣頭條的大數據團隊共同研發了 RSS,解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎組件。
作者 | 王振華、曹佳清、范振
業務場景與現狀
趣頭條是一家依賴大數據的科技公司,在 2018~2019 年經歷了業務的高速發展,主 App 和其他創新 App 的日活增加了 10 倍以上,相應的大數據系統也從最初的 100 臺機器增加到了 1000 臺以上規模。多個業務線依賴于大數據平臺展開業務,大數據系統的高效和穩定成了公司業務發展的基石,在大數據的架構上我們使用了業界成熟的方案,存儲構建在 HDFS 上、計算資源調度依賴 Yarn、表元數據使用 Hive 管理、用 Spark 進行計算,具體如圖 1 所示:
圖 1 趣頭條離線大數據平臺架構圖
其中 Yarn 集群使用了單一大集群的方案,HDFS 使用了聯邦的方案,同時基于成本因素,HDFS 和 Yarn 服務在 ECS 上進行了 DataNode 和 NodeManager 的混部。
在趣頭條每天有 6W 的 Spark 任務跑在 Yarn 集群上,每天新增的 Spark 任務穩定在 100 左右,公司的迅速發展要求需求快速實現,積累了很多治理欠債,種種問題表現出來集群穩定性需要提升,其中 Shuffle 的穩定性越來越成為集群的桎梏,亟需解決。
當前大數據平臺的挑戰與思考
近半年大數據平臺主要的業務指標是降本增效,一方面業務方希望離線平臺每天能夠承載更多的作業,另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業務量對于每個技術人都是非常大地挑戰。熟悉 Spark 的同學應該非常清楚,在大規模集群場景下,Spark Shuffle 在實現上有比較大的缺陷,體現在以下的幾個方面:
- Spark Shuffle Fetch 過程存在大量的網絡小包,現有的 External Shuffle Service 設計并沒有非常細致的處理這些 RPC 請求,大規模場景下會有很多connection reset 發生,導致 FetchFailed,從而導致 stage 重算。
- Spark Shuffle Fetch 過程存在大量的隨機讀,大規模高負載集群條件下,磁盤 IO 負載高、CPU 滿載時常發生,極容易發生 FetchFailed,從而導致 stage 重算。
- 重算過程會放大集群的繁忙程度,搶占機器資源,導致惡性循環嚴重,SLA 完不成,需要運維人員手動將作業跑在空閑的Label集群。
- 計算和 Shuffle 過程架構不能拆開,不能把 Shuffle 限定在指定的集群內,不能利用部分 SSD 機器。
- M*N 次的 shuffle 過程:對于 10K mapper、5K reducer 級別的作業,基本跑不完。
- NodeManager 和 Spark Shuffle Service 是同一進程,Shuffle 過程太重,經常導致 NodeManager 重啟,從而影響 Yarn 調度穩定性。
以上的這些問題對于 Spark 研發同學是非常痛苦的,好多作業每天運行時長方差會非常大,而且總有一些無法完成的作業,要么業務進行拆分,要么跑到獨有的 Yarn 集群中。除了現有面臨的挑戰之外,我們也在積極構建下一代基礎架構設施,隨著云原生 Kubernetes 概念越來越火,Spark 社區也提供了 Spark on Kubernetes 版本,相比較于 Yarn 來說,Kubernetes 能夠更好的利用云原生的彈性,提供更加豐富的運維、部署、隔離等特性。但是 Spark on Kubernetes 目前還存在很多問題沒有解決,包括容器內的 Shuffle 方式、動態資源調度、調度性能有限等等。我們針對 Kubernetes 在趣頭條的落地,主要有以下幾個方面的需求:
- 實時集群、OLAP 集群和 Spark 集群之前都是相互獨立的,怎樣能夠將這些資源形成統一大數據資源池。通過 Kubernetes 的天生隔離特性,更好的實現離線業務與實時業務混部,達到降本增效目的。
- 公司的在線業務都運行在 Kubernetes 集群中,如何利用在線業務和大數據業務的不同特點進行錯峰調度,達成 ECS 的總資源量最少。
- 希望能夠基于 Kubernetes 來包容在線服務、大數據、AI 等基礎架構,做到運維體系統一化。
因為趣頭條的大數據業務目前全都部署在阿里云上,阿里云 EMR 團隊和趣頭條的大數據團隊進行了深入技術共創,共同研發了 Remote Shuffle Service(以下簡稱 RSS),旨在解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎組件。
Remote Shuffle Service 設計與實現
- Remote Shuffle Service 的背景
早在 2019 年初我們就關注到了社區已經有相應的討論,如 SPARK-25299。該 Issue 主要希望解決的問題是在云原生環境下,Spark 需要將 Shuffle 數據寫出到遠程的服務中。但是我們經過調研后發現 Spark 3.0(之前的 master 分支)只支持了部分的接口,而沒有對應的實現。該接口主要希望在現有的 Shuffle 代碼框架下,將數據寫到遠程服務中。如果基于這種方式實現,比如直接將 Shuffle 以流的方式寫入到 HDFS 或者 Alluxio 等高速內存系統,會有相當大的性能開銷,趣頭條也做了一些相應的工作,并進行了部分的 Poc,性能與原版 Spark Shuffle 實現相差特別多,最差性能可下降 3 倍以上。同時我們也調研了一部分其他公司的實現方案,例如 Facebook 的 Riffle 方案以及 LinkedIn 開源的 Magnet,這些實現方案是首先將 Shuffle 文件寫到本地,然后在進行 Merge 或者 Upload 到遠程的服務上,這和后續我們的Kubernetes架構是不兼容的,因為 Kubernetes 場景下,本地磁盤 Hostpath 或者 LocalPV 并不是一個必選項,而且也會存在隔離和權限的問題。
基于上述背景,我們與阿里云 EMR 團隊共同開發了 Remote Shuffle Service。RSS 可以提供以下的能力,完美的解決了 Spark Shuffle 面臨的技術挑戰,為我們集群的穩定性和容器化的落地提供了強有力的保證,主要體現在以下幾個方面:
- 高性能服務器的設計思路,不同于 Spark 原有 Shuffle Service,RPC 更輕量、通用和穩定。
- 兩副本機制,能夠保證的 Shuffle fetch 極小概率(低于 0.01%)失敗。
- 合并 shuffle 文件,從 M*N 次 shuffle 變成 N 次 shuffle,順序讀 HDD 磁盤會顯著提升 shuffle heavy 作業性能。
- 減少 Executor 計算時內存壓力,避免 map 過程中 Shuffle Spill。
- 計算與存儲分離架構,可以將 Shuffle Service 部署到特殊硬件環境中,例如 SSD 機器,可以保證 SLA 極高的作業。
- 完美解決 Spark on Kubernetes 方案中對于本地磁盤的依賴。
- Remote Shuffle Service 的實現
- 整體設計
Spark RSS 架構包含三個角色:Master、Worker、Client。Master 和 Worker 構成服務端,Client 以不侵入的方式集成到 Spark ShuffleManager 里(RssShuffleManager 實現了 ShuffleManager 接口)。
- Master 的主要職責是資源分配與狀態管理。
- Worker 的主要職責是處理和存儲 Shuffle 數據。
- Client 的主要職責是緩存和推送 Shuffle 數據。
整體流程如下所示(其中 ResourceManager 和 MetaService 是 Master 的組件),如圖 2。
圖 2 RSS 整體架構圖
- 實現流程
下面重點來講一下實現的流程:
- RSS 采用 Push Style 的 shuffle 模式,每個 Mapper 持有一個按 Partition 分界的緩存區,Shuffle 數據首先寫入緩存區,每當某個 Partition 的緩存滿了即觸發 PushData。
- Driver 先和 Master 發生 StageStart 的請求,Master 接受到該 RPC 后,會分配對應的 Worker Partition 并返回給 Driver,Shuffle Client 得到這些元信息后,進行后續的推送數據。
- Client 開始向主副本推送數據。主副本 Worker 收到請求后,把數據緩存到本地內存,同時把該請求以 Pipeline 的方式轉發給從副本,從而實現了 2 副本機制。
- 為了不阻塞 PushData 的請求,Worker 收到 PushData 請求后會以純異步的方式交由專有的線程池異步處理。根據該 Data 所屬的 Partition 拷貝到事先分配的 buffer 里,若 buffer 滿了則觸發 flush。RSS 支持多種存儲后端,包括 DFS 和 Local。若后端是 DFS,則主從副本只有一方會 flush,依靠 DFS 的雙副本保證容錯;若后端是 Local,則主從雙方都會 flush。
- 在所有的 Mapper 都結束后,Driver 會觸發 StageEnd 請求。Master 接收到該 RPC 后,會向所有 Worker 發送 CommitFiles 請求,Worker 收到后把屬于該 Stage buffer 里的數據 flush 到存儲層,close 文件,并釋放 buffer。Master 收到所有響應后,記錄每個 partition 對應的文件列表。若 CommitFiles 請求失敗,則 Master 標記此 Stage 為 DataLost。
- 在 Reduce 階段,reduce task 首先向 Master 請求該 Partition 對應的文件列表,若返回碼是 DataLost,則觸發 Stage 重算或直接 abort 作業。若返回正常,則直接讀取文件數據。
總體來講,RSS 的設計要點總結為 3 個層面:
- 采用 PushStyle 的方式做 shuffle,避免了本地存儲,從而適應了計算存儲分離架構。
- 按照 reduce 做聚合,避免了小文件隨機讀寫和小數據量網絡請求。
- 做了 2 副本,提高了系統穩定性。
- 容錯
對于 RSS 系統,容錯性是至關重要的,我們分為以下幾個維度來實現:
- PushData 失敗
當 PushData 失敗次數(Worker 掛了,網絡繁忙,CPU繁忙等)超過 MaxRetry 后,Client 會給 Master 發消息請求新的 Partition Location,此后本 Client 都會使用新的 Location 地址,該階段稱為 Revive。
若 Revive 是因為 Client 端而非 Worker 的問題導致,則會產生同一個 Partition 數據分布在不同 Worker 上的情況,Master 的 Meta 組件會正確處理這種情形。
若發生 WorkerLost,則會導致大量 PushData 同時失敗,此時會有大量同一 Partition 的 Revive 請求打到 Master。為了避免給同一個 Partition 分配過多的 Location,Master 保證僅有一個 Revive 請求真正得到處理,其余的請求塞到 pending queue 里,待 Revive 處理結束后返回同一個 Location。
- Worker 宕機
當發生 WorkerLost 時,對于該 Worker 上的副本數據,Master 向其 peer 發送 CommitFile 的請求,然后清理 peer 上的 buffer。若 Commit Files 失敗,則記錄該 Stage 為 DataLost;若成功,則后續的 PushData 通過 Revive 機制重新申請 Location。
- 數據去重
Speculation task 和 task 重算會導致數據重復。解決辦法是每個 PushData的數據片里編碼了所屬的 mapId、attemptId 和 batchId,并且 Master 為每個 map task 記錄成功 commit 的 attemtpId。read 端通過 attemptId 過濾不同的 attempt 數據,并通過 batchId 過濾同一個 attempt 的重復數據。
- 多副本
RSS 目前支持 DFS 和 Local 兩種存儲后端。
在 DFS 模式下,ReadPartition 失敗會直接導致 Stage 重算或 abort job。在 Local 模式,ReadPartition 失敗會觸發從 peer location 讀,若主從都失敗則觸發 Stage 重算或 abort job。
- 高可用
大家可以看到 RSS 的設計中 Master 是一個單點,雖然 Master 的負載很小,不會輕易地掛掉,但是這對于線上穩定性來說無疑是一個風險點。在項目的最初上線階段,我們希望可以通過 SubCluster 的方式進行 workaround,即通過部署多套 RSS 來承載不同的業務,這樣即使 RSS Master 宕機,也只會影響有限的一部分業務。但是隨著系統的深入使用,我們決定直面問題,引進高可用 Master。主要的實現如下:
首先,Master 目前的元數據比較多,我們可以將一部分與 ApplD ShuffleId 本身相關的元數據下沉到 Driver 的 ShuffleManager 中,由于元數據并不會很多,Driver 增加的內存開銷非常有限。
另外,關于全局負載均衡的元數據和調度相關的元數據,我們利用 Raft 實現了 Master 組件的高可用,這樣我們通過部署 3 或 5 臺 Master,真正的實現了大規模可擴展的需求。
實際效果與分析
- 性能與穩定性
團隊針對 TeraSort、TPC-DS 以及大量的內部作業進行了測試,在 Reduce 階段減少了隨機讀的開銷,任務的穩定性和性能都有了大幅度提升。
圖 3 是 TeraSort 的 benchmark,以 10T Terasort 為例,Shuffle 量壓縮后大約 5.6T。可以看出該量級的作業在 RSS 場景下,由于 Shuffle read 變為順序讀,性能會有大幅提升。
圖 3 TeraSort 性能測試(RSS 性能更好)
圖 4 是一個線上實際脫敏后的 Shuffle heavy 大作業,之前在混部集群中很小概率可以跑完,每天任務 SLA 不能按時達成,分析原因主要是由于大量的 FetchFailed 導致 stage 進行重算。使用 RSS 之后每天可以穩定的跑完,2.1T 的 shuffle 也不會出現任何 FetchFailed 的場景。在更大的數據集性能和SLA表現都更為顯著。
圖 4 實際業務的作業 stage 圖(使用 RSS 保障穩定性和性能)
- 業務效果
在大數據團隊和阿里云 EMR 團隊的共同努力下,經過近半年的上線、運營 RSS,以及和業務部門的長時間測試,業務價值主要體現在以下方面:
- 降本增效效果明顯,在集群規模小幅下降的基礎上,支撐了更多的計算任務,TCO 成本下降 20%。
- SLA 顯著提升,大規模 Spark Shuffle 任務從跑不完到能跑完,我們能夠將不同 SLA 級別作業合并到同一集群,減小集群節點數量,達到統一管理,縮小成本的目的。原本業務方有一部分 SLA比 較高的作業在一個獨有的 Yarn 集群 B 中運行,由于主 Yarn 集群 A 的負載非常高,如果跑到集群 A 中,會經常的掛掉。利用 RSS 之后可以放心的將作業跑到主集群 A 中,從而釋放掉獨有 Yarn 集群 B。
- 作業執行效率顯著提升,跑的慢→跑的快。我們比較了幾個典型的 Shuffle heavy 作業,一個重要的業務線作業原本需要 3 小時,RSS 版本需要 1.6 小時。抽取線上 5~10 個作業,大作業的性能提升相當明顯,不同作業平均下來有 30% 以上的性能提升,即使是 shuffle 量不大的作業,由于比較穩定不需要 stage 重算,長期運行平均時間也會減少 10%-20%。
- 架構靈活性顯著提升,升級為計算與存儲分離架構。Spark 在容器中運行的過程中,將 RSS 作為基礎組件,可以使得 Spark 容器化能夠大規模的落地,為離線在線統一資源、統一調度打下了基礎。
未來展望
趣頭條大數據平臺和阿里云 EMR 團隊后續會繼續保持深入共創,將探索更多的方向。主要有以下的一些思路:
- RSS 存儲能力優化,包括將云的對象存儲作為存儲后端。
- RSS 多引擎支持,例如 MapReduce、Tez 等,提升歷史任務執行效率。
- 加速大數據容器化落地,配合 RSS 能力,解決 K8s 調度器性能、調度策略等一系列挑戰。
- 持續優化成本,配合 EMR 的彈性伸縮功能,一方面 Spark 可以使用更多的阿里云 ECS/ECI 搶占式實例來進一步壓縮成本,另一方面將已有機器包括阿里云 ACK、ECI 等資源形成統一大池子,將大數據的計算組件和在線業務進行錯峰調度以及混部。
版權聲明:本文內容由互聯網用戶自發貢獻,該文觀點僅代表作者本人。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如發現本站有涉嫌抄襲侵權/違法違規的內容, 請發送郵件至 舉報,一經查實,本站將立刻刪除。