Boost Spark Performance: Proven Code Optimizations & Tuning Tips
This article outlines practical Spark job optimization techniques—from code-level improvements and resource tuning to data skew handling, persistence strategies, shuffle reduction, broadcast variables, Kryo serialization, and efficient data structures—demonstrating how each can dramatically cut execution time.
1. Overview
During Spark job development, from code writing to deployment and maintenance, several optimization aspects need to be considered: code optimization, resource allocation, data skew handling, and GC tuning.
Code‑level optimization is the most effective and forms the foundation of an efficient Spark task. The example below shows a job whose execution time dropped from 37 min to 8 min after only code adjustments.
2. Case Studies
2.1 Count after saving data to HBase
When results are saved to HBase and then counted, placing the count after the save avoids a second full scan because Spark maintains a write counter.
<code>// save to HBase
saveDataHBase.save(outputtableName, save13RddPo, f = convert)
def convert(hbaseSaveRddPo: HbaseSaveRddPo): (ImmutableBytesWritable, Put) = {
val p = new Put(Bytes.toBytes(hbaseSaveRddPo.getRowKey))
hbaseSaveRddPo.getHbaseSaveRddResultPoList.foreach { x =>
p.addColumn(Bytes.toBytes(x.getFamily), Bytes.toBytes(x.getColumn), Bytes.toBytes(x.getValue))
}
(new ImmutableBytesWritable, p)
}
val count = save13RddPo.count
</code>The writer keeps a
recordsWrittencounter, so the subsequent
countreturns instantly.
<code>var recordsWritten = 0L
while (iterator.hasNext) {
val pair = iterator.next()
config.write(pair)
recordsWritten += 1
}
outputMetrics.setRecordsWritten(recordsWritten)
</code>2.2 Persist frequently used RDDs
Repeatedly computing the same RDD causes the whole pipeline to be re‑executed each time. Persisting the RDD (memory, disk, or serialized) stores intermediate results and avoids recomputation.
<code>val hBaseRDD = rdd.filter { … }
hBaseRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
val rdd1 = hBaseRDD.filter { … }
val rdd2 = hBaseRDD.filter { … }
</code>Persistence levels and their meanings:
MEMORY_ONLY : keep deserialized objects in memory; if insufficient memory, data is not persisted.
MEMORY_AND_DISK : keep data in memory, spill to disk when needed.
MEMORY_ONLY_SER : serialize objects before storing in memory, reducing memory usage at the cost of CPU for (de)serialization.
MEMORY_AND_DISK_SER : serialized storage with memory‑first spill to disk.
DISK_ONLY : store data only on disk.
*_2 : replicate persisted data to another node for fault tolerance.
Choosing a strategy:
① MEMORY_ONLY offers highest performance when the dataset fits comfortably in memory.
② If MEMORY_ONLY causes OOM, switch to MEMORY_ONLY_SER to reduce memory pressure.
③ When memory is still insufficient, use MEMORY_AND_DISK_SER or MEMORY_AND_DISK .
④ Avoid DISK_ONLY and the *_2 levels unless high availability is required.
Note: Persist only when the same RDD is used multiple times.
2.3 Minimize shuffle operations
Shuffle is the most expensive phase because it writes keys to local disks and transfers them across the network. Avoiding shuffle‑heavy operators such as
reduceByKey,
join,
groupByKey, etc., can greatly improve performance.
For joins, broadcast the smaller RDD and perform a map‑side join:
<code>// broadcast‑join
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map { x =>
// lookup matching keys in rdd2DataBroadcast
}
</code>2.4 Use map‑side combine for shuffle
If shuffle cannot be avoided, prefer operators that perform map‑side aggregation, such as
reduceByKeyor
aggregateByKey, instead of
groupByKey.
Diagram (illustrative) shows how
reduceByKeyaggregates locally before shuffling, while
groupByKeytransfers all records.
<code>def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null) = {
// implementation omitted for brevity
}
</code>2.5 Use batch operators
① Replace
mapwith
mapPartitionsto process an entire partition at once, reducing function‑call overhead (beware of possible OOM).
② Replace
foreachwith
foreachPartitionsto create a single database or Kafka producer per partition instead of per record.
<code>sendData.foreachPartition { partition =>
val producer = new Producer[String, String](new ProducerConfig(CommonUtils.getProducerConfig(brokerList)))
partition.foreach { x =>
x.foreach { msg =>
if (msg != null) producer.send(msg)
}
}
producer.close()
}
</code>2.6 Use broadcast variables for large read‑only data
Broadcasting a large collection (e.g., >100 MB) ensures only one copy per executor, reducing network traffic and memory pressure.
<code>val list1 = …
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast.value …)
</code>2.7 Enable Kryo serialization
Kryo is roughly ten times faster than Java serialization and should be enabled for better performance, especially when using serialized persistence levels.
2.8 Choose memory‑efficient data structures
Avoid heavy objects, long strings, and collection types (HashMap, LinkedList) inside operator functions. Prefer primitive types, arrays, or compact string representations, while balancing code maintainability.
References
Spark tuning guide: http://spark.apache.org/docs/2.4.3/tuning.html#memory-tuning
Performance tuning article: https://www.iteblog.com/archives/1657.html
Data Thinking Notes
Sharing insights on data architecture, governance, and middle platforms, exploring AI in data, and linking data with business scenarios.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.