Big Data 13 min read

How Spark Streaming Submits Tasks: Internal Mechanics and Code Walkthrough

This article explains the internal workflow of Spark Streaming task submission, detailing how StreamingContext, DStream, receivers, and output operators are transformed into RDD jobs, and includes annotated Scala code examples that illustrate each step of the process.

Qunar Tech Salon
Qunar Tech Salon
Qunar Tech Salon
How Spark Streaming Submits Tasks: Internal Mechanics and Code Walkthrough

Spark Streaming is a real‑time processing solution built on top of Spark Core. The article walks through how a streaming job is turned into Spark RDD tasks, assuming the reader already knows RDDs and SparkContext submission.

Example Program

The following Scala snippet shows a minimal Spark Streaming word‑count application (original code trimmed for clarity):

object NetworkWordCount {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

StreamingContext and Core Components

The entry point of Spark Streaming is StreamingContext . Internally it holds a DStreamGraph that tracks input streams ( DStream , InputDStream , Receiver ) and output streams. A JobScheduler coordinates a ReceiverTracker and a JobGenerator to turn streaming logic into batch RDD jobs.

DStream

DStream is Spark’s temporal wrapper around RDDs, representing a continuous series of RDDs. Its dependencies describe how streams are linked, similar to RDD DAG dependencies.

Output Operator

Output operations such as print or saveAs… are implemented via foreachRDD . The call registers a ForEachDStream into the graph’s outputStreams , which later generates a Job for each batch.

InputDStream Hierarchy

All data sources extend InputDStream . Direct driver‑side sources inherit InputDStream directly, while distributed receivers inherit ReceiverInputDStream . Receivers are treated as RDD jobs and scheduled on executors.

abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
  ssc.graph.addInputStream(this)
  val id = ssc.getNewInputStreamId()
}

ReceiverTracker.start()

The tracker registers an RPC endpoint, launches receivers, and allocates blocks to batches. It creates a dummy Spark job to ensure receivers are spread across workers, then sends a StartAllReceivers message.

private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map { nis =>
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  }
  runDummySparkJob()
  logInfo(s"Starting ${receivers.length} receivers")
  endpoint.send(StartAllReceivers(receivers))
}

JobGenerator.start()

The generator creates a recurring timer that posts GenerateJobs messages at each batch interval. When a GenerateJobs arrives, it allocates receiver blocks, asks the graph to generate jobs for each output stream, and submits the resulting JobSet to the JobScheduler .

Job Submission Flow

Each Job is executed by a thread pool ( spark.streaming.concurrentJobs ). The JobHandler posts JobStarted and JobCompleted events, disables output‑spec validation for checkpointing, and finally runs the user‑provided function.

ForEachDStream.generateJob()

When an output stream is a ForEachDStream , its generateJob method obtains the parent RDD for the batch, wraps the user’s foreachFunc into a zero‑argument job function, and returns a Job instance.

override def generateJob(time: Time): Option[Job] = {
  parent.getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
        foreachFunc(rdd, time)
      }
      Some(new Job(time, jobFunc))
    case None => None
  }
}

print() Implementation

The print operator is a specific foreachFunc that takes the first num elements of each RDD batch using the take action, then prints them with timestamps.

def print(num: Int): Unit = {
  foreachRDD { (rdd, time) =>
    val firstNum = rdd.take(num + 1)
    println("-------------------------------------------")
    println(s"Time: $time")
    println("-------------------------------------------")
    firstNum.take(num).foreach(println)
    if (firstNum.length > num) println("...")
    println()
  }
}

Conclusion

The key takeaways are that receivers are scheduled as regular RDD jobs, and every DStream operation ultimately becomes an RDD job submitted to the Spark cluster. This explains why Spark Streaming can reuse Spark’s fault‑tolerance and resource management mechanisms.

Big DataReal-time ProcessingSpark StreamingScalaDStream
Qunar Tech Salon
Written by

Qunar Tech Salon

Qunar Tech Salon is a learning and exchange platform for Qunar engineers and industry peers. We share cutting-edge technology trends and topics, providing a free platform for mid-to-senior technical professionals to exchange and learn.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.