Spark Adaptive Execution: Dynamic Shuffle Partition, Broadcast Join, and Skew Handling
The article explains the limitations of static shuffle partitions, execution‑plan estimation, and data skew in Spark SQL, and describes how Spark Adaptive Execution can automatically adjust shuffle partition numbers, switch join strategies, and mitigate skew through configurable parameters and code examples.
This article shares a detailed discussion on Spark SQL Adaptive Execution, focusing on three main problems: sub‑optimal shuffle partition numbers, insufficient execution‑plan optimization (especially for join strategies), and data skew.
Problem 1 – Shuffle partition count : Spark SQL uses spark.sql.shuffle.partition (default 200) to set the number of reduce tasks. Too many partitions cause excessive task‑scheduling overhead, many small files on HDFS, and pressure on the NameNode; too few partitions lead to large per‑task data sizes, possible OOM, and poor performance. The optimal value varies per job, making manual tuning difficult.
Problem 2 – Execution‑plan shortcomings : Shuffle is expensive, so avoiding it via BroadcastHashJoin (MapJoin) is preferred when one side of a join is small. Spark can enable this with spark.sql.autoBroadcastJoinThreshold (default 10 MB). However, for complex SQL the optimizer often cannot accurately estimate data sizes, so it may still choose SortMergeJoin, missing performance gains.
Problem 3 – Data skew : Skew occurs when some partitions contain far more data than others, slowing the whole job. Common remedies include balancing input splits, adjusting shuffle partition numbers, adding prefixes/suffixes to hot keys, or using BroadcastHashJoin for small tables, but none address all skew scenarios.
Spark Adaptive Execution solutions :
1. Automatic shuffle‑partition sizing
After Shuffle Write, Spark can compute the size of each partition and merge small ones, allowing a single task to process multiple partitions. This reduces task count and resource overhead. The merge threshold is controlled by spark.sql.adaptive.shuffle.targetPostShuffleInputSize (default 64 MB). Only consecutive small partitions are merged to preserve read order.
spark.sql.adaptive.enabled=true2. Dynamic execution‑plan adjustment
Post‑Shuffle Write, Spark can re‑evaluate stage output sizes; if a side of a join is small, it can switch the downstream stage to use BroadcastHashJoin, avoiding another shuffle read. This reduces network I/O and speeds up the job.
spark.sql.adaptive.enabled=true
spark.sql.adaptive.join.enabled=true
spark.sql.adaptiveBroadcastJoinThreshold=... // sets the threshold for converting SortMergeJoin to BroadcastJoin3. Automatic skew handling
When a partition exceeds skew thresholds, Spark splits it into multiple tasks and later unions the results. Configuration parameters include:
spark.sql.adaptive.skewedJoin.enabled=true
spark.sql.adaptive.skewedPartitionMaxSplits (default 5) – max tasks per skewed partition
spark.sql.adaptive.skewedPartitionRowCountThreshold (default 10,000,000 rows)
spark.sql.adaptive.skewedPartitionSizeThreshold (default 64 MB)
spark.sql.adaptive.skewedPartitionFactor – factor to decide skew based on median size/row count
These settings enable Spark to automatically detect and mitigate skew during execution.
Conclusion : Adaptive Execution is an enhancement contributed by Intel’s big‑data team and Baidu engineers, available at https://github.com/Intel-bigdata/spark-adaptive . Some features are already present in Spark 2.3.1 (automatic shuffle partition sizing), while others are still under development (dynamic join adjustment, skew handling).
References include the original blog post and related JIRA issue SPARK‑23128.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.