Big Data 16 min read

Understanding Flink Time Notions: ProcessTime, EventTime, IngestionTime and Watermarks with Code Examples

This article explains the three time notions supported by Apache Flink—ProcessTime, EventTime, and IngestionTime—detailing their semantics, how Watermarks enable event‑time processing, and provides Scala code samples for configuring time characteristics, assigning timestamps, and generating Watermarks in a streaming job.

Architect
Architect
Architect
Understanding Flink Time Notions: ProcessTime, EventTime, IngestionTime and Watermarks with Code Examples

In real‑world development, various streaming data sources (click streams, monitoring events, sensor data) are often ingested into systems like Kafka before being processed. These events carry a fixed event time, which becomes crucial when replaying historical data or performing time‑based analytics.

Apache Flink defines three time notions: ProcessTime , EventTime , and IngestionTime . ProcessTime relies on the system clock of the operator host, EventTime uses the timestamp embedded in each event and requires Watermarks to track progress, while IngestionTime records the time an event enters Flink and cannot handle out‑of‑order data.

ProcessTime is suitable for simple real‑time aggregations where the exact event timestamp is irrelevant, such as per‑minute user activity counts that depend only on the processing machine’s clock.

EventTime preserves the original event timestamp, enabling accurate windowing even with out‑of‑order or delayed events. Watermarks (e.g., BoundedOutOfOrdernessTimestampExtractor ) indicate when all events up to a certain timestamp have arrived, allowing windows to be triggered safely.

IngestionTime assigns timestamps based on when events reach the source operator, which is simpler but cannot handle late or out‑of‑order data.

To use EventTime in Flink, set the time characteristic and assign timestamps with Watermarks:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Two common approaches are:

Calling assignTimestampsAndWatermarks() on the DataStream.

Embedding timestamp assignment and Watermark emission directly in a custom SourceFunction.

Example of a custom source that generates synthetic events with configurable lateness:

class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {
  val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
  @volatile private var running = true
  val channelSet = Seq("a", "b", "c", "d")
  val behaviorTypes = Seq("INSTALL", "OPEN", "BROWSE", "CLICK", "PURCHASE", "CLOSE", "UNINSTALL")
  val rand = Random
  override def run(ctx: SourceContext[String]): Unit = {
    var count = 0L
    while (running && count < Long.MaxValue) {
      val channel = channelSet(rand.nextInt(channelSet.size))
      val event = generateEvent()
      LOG.debug("Event: " + event)
      val ts = event(0)
      val id = event(1)
      val behaviorType = event(2)
      ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
      count += 1
      TimeUnit.MILLISECONDS.sleep(5L)
    }
  }
  private def generateEvent(): Seq[String] = {
    // simulate lateness
    val ts = Instant.ofEpochMilli(System.currentTimeMillis).minusMillis(latenessMillis).toEpochMilli
    val id = UUID.randomUUID().toString
    val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
    Seq(ts.toString, id, behaviorType)
  }
  override def cancel(): Unit = running = false
}

Assign timestamps and Watermarks in the main program:

def main(args: Array[String]): Unit = {
  val params = ParameterTool.fromArgs(args)
  val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
  val maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
  val windowSizeMillis = params.getRequired("window-size-millis").toLong
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val stream = env.addSource(new StringLineEventSource(sourceLatenessMillis))
  stream
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
      override def extractTimestamp(element: String): Long = element.split("\t")(0).toLong
    })
    .map(line => {
      val a = line.split("\t")
      ((a(1), a(3)), 1L)
    })
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis)))
    .process(new EventTimeWindowReduceFunction())
    .map(t => Seq(t._1, t._2, t._3, t._4, t._5).mkString("\t"))
    .addSink(kafkaProducer)
  env.execute(getClass.getSimpleName)
}

The BoundedOutOfOrdernessTimestampExtractor implementation maintains the maximum observed event timestamp and emits Watermarks as currentMaxTimestamp - maxOutOfOrderness , ensuring progress despite out‑of‑order arrivals.

Running the job on a Flink cluster (example command shown) produces windowed aggregates that are written to Kafka, with sample output like:

20180108154000    20180108154100    a    CLOSE    421
20180108154000    20180108154100    c    PURCHASE    434
...

Alternatively, timestamp and Watermark logic can be placed directly in the source operator, emitting timestamps via ctx.collectWithTimestamp(...) and Watermarks via ctx.emitWatermark(...) .

References include the official Flink documentation on event time, Watermarks, and timestamp extractors, as well as O'Reilly articles on stream processing.

FlinkStreamProcessingwatermarkBigDataScalaEventTime
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.