Understanding Flink Time Notions: ProcessTime, EventTime, IngestionTime and Watermarks with Code Examples
This article explains the three time notions supported by Apache Flink—ProcessTime, EventTime, and IngestionTime—detailing their semantics, how Watermarks enable event‑time processing, and provides Scala code samples for configuring time characteristics, assigning timestamps, and generating Watermarks in a streaming job.
In real‑world development, various streaming data sources (click streams, monitoring events, sensor data) are often ingested into systems like Kafka before being processed. These events carry a fixed event time, which becomes crucial when replaying historical data or performing time‑based analytics.
Apache Flink defines three time notions: ProcessTime , EventTime , and IngestionTime . ProcessTime relies on the system clock of the operator host, EventTime uses the timestamp embedded in each event and requires Watermarks to track progress, while IngestionTime records the time an event enters Flink and cannot handle out‑of‑order data.
ProcessTime is suitable for simple real‑time aggregations where the exact event timestamp is irrelevant, such as per‑minute user activity counts that depend only on the processing machine’s clock.
EventTime preserves the original event timestamp, enabling accurate windowing even with out‑of‑order or delayed events. Watermarks (e.g., BoundedOutOfOrdernessTimestampExtractor ) indicate when all events up to a certain timestamp have arrived, allowing windows to be triggered safely.
IngestionTime assigns timestamps based on when events reach the source operator, which is simpler but cannot handle late or out‑of‑order data.
To use EventTime in Flink, set the time characteristic and assign timestamps with Watermarks:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)Two common approaches are:
Calling assignTimestampsAndWatermarks() on the DataStream.
Embedding timestamp assignment and Watermark emission directly in a custom SourceFunction.
Example of a custom source that generates synthetic events with configurable lateness:
class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {
val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
@volatile private var running = true
val channelSet = Seq("a", "b", "c", "d")
val behaviorTypes = Seq("INSTALL", "OPEN", "BROWSE", "CLICK", "PURCHASE", "CLOSE", "UNINSTALL")
val rand = Random
override def run(ctx: SourceContext[String]): Unit = {
var count = 0L
while (running && count < Long.MaxValue) {
val channel = channelSet(rand.nextInt(channelSet.size))
val event = generateEvent()
LOG.debug("Event: " + event)
val ts = event(0)
val id = event(1)
val behaviorType = event(2)
ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
count += 1
TimeUnit.MILLISECONDS.sleep(5L)
}
}
private def generateEvent(): Seq[String] = {
// simulate lateness
val ts = Instant.ofEpochMilli(System.currentTimeMillis).minusMillis(latenessMillis).toEpochMilli
val id = UUID.randomUUID().toString
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
Seq(ts.toString, id, behaviorType)
}
override def cancel(): Unit = running = false
}Assign timestamps and Watermarks in the main program:
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
val maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
val windowSizeMillis = params.getRequired("window-size-millis").toLong
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(new StringLineEventSource(sourceLatenessMillis))
stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
override def extractTimestamp(element: String): Long = element.split("\t")(0).toLong
})
.map(line => {
val a = line.split("\t")
((a(1), a(3)), 1L)
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis)))
.process(new EventTimeWindowReduceFunction())
.map(t => Seq(t._1, t._2, t._3, t._4, t._5).mkString("\t"))
.addSink(kafkaProducer)
env.execute(getClass.getSimpleName)
}The BoundedOutOfOrdernessTimestampExtractor implementation maintains the maximum observed event timestamp and emits Watermarks as currentMaxTimestamp - maxOutOfOrderness , ensuring progress despite out‑of‑order arrivals.
Running the job on a Flink cluster (example command shown) produces windowed aggregates that are written to Kafka, with sample output like:
20180108154000 20180108154100 a CLOSE 421
20180108154000 20180108154100 c PURCHASE 434
...Alternatively, timestamp and Watermark logic can be placed directly in the source operator, emitting timestamps via ctx.collectWithTimestamp(...) and Watermarks via ctx.emitWatermark(...) .
References include the official Flink documentation on event time, Watermarks, and timestamp extractors, as well as O'Reilly articles on stream processing.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.