Implementing and Registering a Custom SparkListener in Apache Spark
This article explains how to create a custom SparkListener in Apache Spark, provides Scala code examples for the listener and a main application, and details two registration approaches—via Spark configuration or SparkContext—along with a comprehensive list of listener event methods.
Overview
Spark provides an event‑listener mechanism that notifies various stages of a job’s lifecycle, allowing developers to execute custom actions. The SparkListener interface defines these callbacks, and by implementing its methods you can handle events such as stage start, task end, and application termination.
Custom Listener Example
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener}
/** Created by silent on 2019/1/11. */
class MySparkAppListener extends SparkListener with Logging {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
val appId = applicationStart.appId
logInfo("***************************************************" + appId.get)
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logInfo("************************ app end time ************************ " + applicationEnd.time)
}
}Main Function Example
object Main extends App {
val spark = SparkSession.builder()
.appName("main")
.master("local[2]")
.config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
.getOrCreate()
// spark.sparkContext.addSparkListener(new MySparkAppListener)
spark.stop()
}There are two ways to register a custom listener. Method 1 sets the listener class name in the Spark configuration (using spark.extraListeners ) either before Spark 2.0 with SparkConf or after Spark 2.0 directly in the SparkSession.builder() . Method 2 adds the listener to the SparkContext at runtime via addSparkListener .
SparkListener Method Reference
// The abstract SparkListener class defines a method for each event.
// Implement the methods you need.
abstract class SparkListener extends SparkListenerInterface {
// Stage completed
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
// Stage submitted
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
// Task start
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
// Task getting result
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
// Task end
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
// Job start
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
// Job end
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
// Environment update
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
// Block manager added
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
// Block manager removed
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
// Unpersist RDD
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
// Application start
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
// Application end
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
// Executor metrics update
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
// Executor added / removed / blacklisted / unblacklisted
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
override def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
// Node blacklisted / unblacklisted
override def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
override def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
// Block updated
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
// Other events
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}The content is reproduced from the original blog post.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.