How to Diagnose and Fix Spark Data Skew: Practical Optimization Techniques
This article explains the causes of Spark data skew, how to locate skewed tasks using the Web UI, and presents six optimization methods—including increasing shuffle parallelism, filtering abnormal keys, two‑stage aggregation, map‑join, key sampling, and random‑prefix joins—plus a real‑world case study.
1. Overview
Data skew in Spark occurs when different partitions (tasks) receive vastly different amounts of data, causing most tasks to finish quickly while a few run extremely slowly or trigger OOM errors during reduce operations.
2. Origin of Data Skew
During shuffle, keys with a large amount of data are pulled to a single task for processing. For example, most keys may have 10 records, but a few keys may have 1,000,000 records, causing the corresponding tasks to run many times longer than others.
In the example, the key
helloappears 7 times across three nodes and is processed by a single task, while keys
worldand
youeach appear only once and are handled by separate tasks, making the first task up to seven times slower.
3. Locating Data Skew
Data skew only appears during shuffle operations. Operators that may trigger it include
distinct,
groupByKey,
reduceByKey,
aggregateByKey,
join,
cogroup, and
repartition.
3.1 Very Slow Task
Using the Spark Web UI, you can view the data volume of each task in a stage and identify imbalanced distribution.
Metrics such as Shuffle Read Size/Records show a Max value far exceeding the 75th percentile, and a single task may handle 585 times more data than others, indicating skew.
3.2 Task OOM
Examining the exception stack can pinpoint the line of code causing OOM, often related to a shuffle operator. The following Scala snippet illustrates a pattern that may lead to OOM under skew conditions:
<code>val mainRelation = mainRddAll
.map {
// do something here
(mainCol, Set(relationStr))
}.reduceByKey(_ ++ _)
.map { case (key, t) =>
(key, (t.size, t.mkString(",")))
}</code>4. Common Optimization Methods
4.1 Increase Shuffle Parallelism
When to use: A moderate number of keys cause skew.
How: Set the number of shuffle read tasks, e.g.,
reduceByKey(1000)or configure
spark.sql.shuffle.partitions(default 200) to a higher value.
Principle: More shuffle tasks distribute keys across more tasks, reducing per‑task data volume.
Pros: Simple, no code changes.
Cons: Only mitigates skew; effectiveness limited in extreme cases.
4.2 Filter Abnormal Keys
When to use: Only a few keys have extremely large data volumes and they are not critical.
How: Use
wherein Spark SQL or
filteron RDDs to drop those keys, optionally after sampling to identify them.
Pros: Completely removes the skew source.
Cons: Not applicable when many keys cause skew.
4.3 Two‑Stage Aggregation (Partial + Global)
When to use: Aggregation‑type shuffle (e.g.,
reduceByKey,
groupBy).
How: Add a random prefix to each key, perform a local aggregation, then remove the prefix and run a global aggregation.
Principle: The random prefix spreads the same original key across multiple tasks, reducing per‑task load.
4.4 Convert Reduce Join to Map Join
When to use: One side of the join is small enough to broadcast.
How: Collect the small RDD, broadcast it, and perform the join using a map‑side lookup, eliminating shuffle.
Pros: Removes shuffle entirely, avoiding skew.
Cons: Requires the small dataset to fit in memory; not suitable for two large tables.
4.5 Sample Skewed Keys and Split Join
When to use: A few keys cause skew, but both tables are large.
How: Sample to find heavy keys, split them into a separate RDD, add random prefixes, and join with the counterpart RDD that has been similarly expanded; finally union the results.
4.6 Random Prefix Join for Massive Skew
When to use: Many keys are skewed, making selective splitting ineffective.
How: Add a random prefix to every key in the large RDD, expand the other RDD by replicating each record n times with matching prefixes, then join.
Pros: Handles widespread skew and can dramatically improve performance.
Cons: Increases memory usage because the entire dataset is expanded.
5. Case Study
5.1 Problem Manifestation
A production Spark job processing a complex network calculation showed one task (task42) handling over 1,300 times more data than others, leading to severe skew.
5.2 Investigation
Using the Spark SQL UI, the execution plan revealed large left‑table data (≈1.2 billion rows) and smaller right tables. The left join on
deviceidcaused skew because many rows had
NULLkeys.
Filtering out
NULLkeys reduced execution time from 24 minutes to 4.5 minutes.
5.3 Resolution
Two approaches were applied:
Filter out records with
NULLjoin keys.
If removal is not possible, split the dataset into joinable (non‑null) and non‑joinable (null) parts, join the former, and union with the latter.
<code>val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)
joinable.join(...) union notJoinable</code>6. Summary
Data skew typically stems from abnormal or test data; validating data quality early can prevent it. For aggregation‑heavy jobs, preserving detailed records may exacerbate skew, so consider whether such detail is necessary. The presented methods—especially increasing shuffle parallelism, filtering abnormal keys, two‑stage aggregation, map‑join, and random‑prefix techniques—provide a toolbox to diagnose and mitigate Spark data skew in production.
References:
Spark official tuning guide: http://spark.apache.org/docs/2.4.3/tuning.html#memory-tuning
Spark performance optimization – data skew: https://www.iteblog.com/archives/1671.html
Stack Overflow discussion on NULL join keys: https://stackoverflow.com/questions/52339872/spark-dataset-dataframe-join-null-skew-key
Data Thinking Notes
Sharing insights on data architecture, governance, and middle platforms, exploring AI in data, and linking data with business scenarios.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.