Big Data 5 min read

Implementing and Registering a Custom SparkListener in Apache Spark

This article explains how to create a custom SparkListener in Apache Spark, provides Scala code examples for the listener and a main application, and details two registration approaches—via Spark configuration or SparkContext—along with a comprehensive list of listener event methods.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Implementing and Registering a Custom SparkListener in Apache Spark

Overview

Spark provides an event‑listener mechanism that notifies various stages of a job’s lifecycle, allowing developers to execute custom actions. The SparkListener interface defines these callbacks, and by implementing its methods you can handle events such as stage start, task end, and application termination.

Custom Listener Example

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener}

/** Created by silent on 2019/1/11. */
class MySparkAppListener extends SparkListener with Logging {
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    val appId = applicationStart.appId
    logInfo("***************************************************" + appId.get)
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logInfo("************************ app end time ************************ " + applicationEnd.time)
  }
}

Main Function Example

object Main extends App {
  val spark = SparkSession.builder()
    .appName("main")
    .master("local[2]")
    .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
    .getOrCreate()
  // spark.sparkContext.addSparkListener(new MySparkAppListener)
  spark.stop()
}

There are two ways to register a custom listener. Method 1 sets the listener class name in the Spark configuration (using spark.extraListeners ) either before Spark 2.0 with SparkConf or after Spark 2.0 directly in the SparkSession.builder() . Method 2 adds the listener to the SparkContext at runtime via addSparkListener .

SparkListener Method Reference

// The abstract SparkListener class defines a method for each event.
// Implement the methods you need.
abstract class SparkListener extends SparkListenerInterface {
  // Stage completed
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

  // Stage submitted
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

  // Task start
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

  // Task getting result
  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

  // Task end
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

  // Job start
  override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

  // Job end
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

  // Environment update
  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

  // Block manager added
  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

  // Block manager removed
  override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

  // Unpersist RDD
  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

  // Application start
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

  // Application end
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }

  // Executor metrics update
  override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

  // Executor added / removed / blacklisted / unblacklisted
  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
  override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
  override def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

  // Node blacklisted / unblacklisted
  override def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
  override def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

  // Block updated
  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

  // Other events
  override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

The content is reproduced from the original blog post.

Big DataSparkApache SparkScalaEvent ListenerSparkListener
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.