Apache Celeborn (Incubating): Design, Performance, Stability, and Elasticity of a Remote Shuffle Service
This article reviews the limitations of traditional Spark shuffle, introduces Apache Celeborn (Incubating) as a remote shuffle service, and details its design for performance, stability, and elasticity, including push shuffle, partition splitting, columnar shuffle, multi‑layer storage, congestion control, and real‑world evaluation.
Introduction : This article is compiled from a talk by Zhou Keyong, an R&D engineer at Alibaba Cloud EMR Spark team.
Table of Contents :
Problems of Traditional Shuffle
Apache Celeborn (Incubating) Overview
Celeborn's Performance, Stability, and Elasticity
01 Problems of Traditional Shuffle
Apache Spark is a widely used big‑data processing engine supporting Spark SQL, batch, streaming, MLlib, GraphX, etc. All components share the unified RDD abstraction, whose lineage is expressed via narrow and wide dependencies. Wide dependencies are implemented through the Shuffle mechanism.
Traditional Shuffle works as shown in the middle of the diagram: each mapper sorts its output by partition ID, writes the sorted data and index to local or cloud disks, and reducers read their partitions from all mapper files. This approach suffers from several drawbacks:
It relies on large local or cloud disks to hold shuffle data until consumption, which hinders compute‑storage separation.
Mapper‑side sorting consumes significant memory and may trigger external sorting, adding extra disk I/O.
Shuffle read creates O(m×n) network connections.
Random small reads (e.g., 64 KB) cause severe IOPS pressure when a mapper’s 128 MB shuffle file is read by thousands of reducers.
Data is stored with a single replica, limiting fault tolerance.
These five issues lead to inefficiency, instability, and poor elasticity.
02 Apache Celeborn (Incubating)
Apache Celeborn (Incubating) is a Remote Shuffle Service originally developed by the Alibaba Cloud EMR Spark team to address the problems above. Donated to the Apache Foundation in October 2022, Celeborn aims to provide a unified intermediate‑data service for big‑data engines, independent of the execution engine, and will eventually support spilled data as well.
Historically, Celeborn started in 2020 as a Remote Shuffle Service, open‑sourced in December 2021, and attracted contributors from Xiaomi, Shopee, NetEase, etc. By now it has over 600 commits, 32 contributors, and more than 330 stars.
03 Celeborn's Performance, Stability, and Elasticity
Celeborn improves performance through a push‑shuffle + partition‑aggregation design, supports columnar shuffle, and offers multi‑layer storage.
1. Performance
Each mapper maintains an in‑memory buffer. When the buffer exceeds a threshold, the mapper pushes the data belonging to the same partition to a pre‑assigned worker. Workers store data per partition, producing a single file per partition. During shuffle read, a reducer reads its partition from only one worker, eliminating the need for local‑disk writes, sorting, and reducing network connections from O(m×n) to O(m+n).
To avoid oversized partitions, Celeborn implements dynamic partition splitting: when a partition file grows beyond a limit, the worker returns a split marker, the client obtains a new worker, and continues pushing data, ensuring no single file becomes too large.
For Spark Adaptive Query Execution (AQE), Celeborn supports partition‑range reads and mapper‑range reads. The three‑step process is:
Split: Skewed data triggers partition splitting.
Sort‑On‑Read: The first read of a split file sorts it by partition ID, turning random reads into sequential reads.
Range Read: Sub‑reducers read their mapper ranges sequentially, and split files record the mapper list to prune unnecessary reads.
Celeborn also introduces columnar shuffle. Data is converted from row to column format during write and back to row format during read, using code generation to minimize conversion overhead. In a 3 TB TPC‑DS benchmark, columnar shuffle reduces shuffle size by ~40% with less than 5% conversion cost.
Multi‑layer storage lets Celeborn flexibly use memory, local disks, and distributed storage (OSS/HDFS). Data is first cached in a Push Data Region; when a partition exceeds a threshold it is flushed to the configured target layer (e.g., local disk or memory cache). Larger partitions are evicted to the next layer, and when the lowest layer fills, data can be evicted to OSS or directly flushed from memory to OSS. This design enables both high performance for small shuffles and the ability to handle petabyte‑scale shuffles without relying on local disks.
2. Stability
Celeborn ensures service stability through in‑place upgrades, congestion control, and load balancing.
In‑place upgrades combine protocol forward‑compatibility (via protobuf) and graceful restarts. Workers mark themselves for graceful shutdown, stop receiving new slots, and finish pending partitions before persisting state to LevelDB and restarting. The client then resumes pushing to new workers, achieving second‑level upgrade times without job interruption.
During shuffle write, Celeborn adopts TCP‑like congestion control (slow start, congestion avoidance, and congestion control) to prevent workers from running out of memory under bursty workloads. If a worker’s memory crosses a threshold, it signals clients to revert to slow start, throttling the data rate.
Celeborn also supports credit‑based flow control for Flink shuffle reads, reserving memory credits before data is pushed.
Load balancing focuses on disk health and capacity. Workers report per‑disk metrics (health, write speed, projected usage) to the master, which groups disks and preferentially assigns load to healthier, larger‑capacity disks, improving stability in heterogeneous environments.
3. Elasticity
In Yarn, the External Shuffle Service (ESS) enables Spark dynamic allocation. In Kubernetes, ESS is absent, so idle pods cannot be released. Celeborn fills this gap: shuffle data is hosted by the Celeborn cluster, allowing Spark pods on K8s to terminate immediately after completing work, achieving true elasticity.
4. Typical Scenarios
Fully mixed deployment: HDFS, Yarn, and Celeborn share the same cluster, improving performance and stability.
Celeborn‑only deployment with HDFS/Yarn mixed: adds resource isolation between source‑table I/O and shuffle I/O, enhancing stability and providing partial elasticity.
Compute‑storage separation: source data resides in object storage (OSS), compute runs on Yarn or K8s, and Celeborn is deployed independently, giving both compute and shuffle clusters full elasticity.
5. Evaluation
Case 1 – Mixed deployment: a user runs a 4 PB compressed shuffle workload with >80 k concurrent tasks and a 16 TB job. Even on HDDs, the job runs stably, whereas it previously failed.
Case 2 – Compute‑storage separation: a user runs tens of thousands of Spark pods on K8s with OSS as source storage. With Celeborn, dynamic allocation works flawlessly, delivering noticeable gains in performance and stability.
In a standard TPC‑DS 3 TB benchmark on a mixed deployment, Celeborn improves single‑replica shuffle performance by 20% and dual‑replica performance by 13% compared with the External Shuffle Service, without additional machine resources.
Celeborn User Community (DingTalk group)
Thank you for reading.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.