Big Data 12 min read

Introducing the Fire Framework: Annotation‑Driven Development for Spark and Flink

The Fire framework, open‑source by ZTO Express, provides a unified annotation‑based programming model for real‑time Spark and Flink jobs, dramatically reducing boilerplate, simplifying configuration, and enabling rapid development of large‑scale data processing tasks with concise Scala code examples.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Introducing the Fire Framework: Annotation‑Driven Development for Spark and Flink

Since JDK5, annotations have become a core feature of Java and are heavily used in frameworks such as Spring. However, traditional big‑data engines like Hadoop, Spark, and Flink initially ignored annotations, making it difficult to integrate Spring‑style configuration into real‑time data processing.

The Fire framework, developed by ZTO Express's big‑data team, fills this gap by offering a lightweight, annotation‑driven layer that works with both Spark and Flink, reducing code volume and simplifying configuration for real‑time analytics.

Key benefits: open‑source, supports Spark and Flink, provides a concise API, and can be learned within minutes.

1. Quick‑Start Examples

1.1 Flink Development Example

Example code:

@Streaming(interval = 100, unaligned = true, parallelism = 4) // 100s checkpoint, enable unaligned checkpoint
@Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire")
object FlinkDemo extends FlinkStreaming {

  @Process
  def kafkaSource: Unit = {
    val dstream = this.fire.createKafkaDirectStream() // consume Kafka via API
    sql("""create table statement ...""")
    sql("""insert into statement ...""")
  }
}

1.2 Spark Development Example

@Config(
  """
    |# Spark/Flink engine tuning parameters, Fire framework parameters, custom parameters
    |spark.shuffle.compress=true
    |spark.ui.enabled=true
    |"""
)
@Hive("thrift://localhost:9083") // connect to Hive
@Streaming(interval = 100, maxRatePerPartition = 100) // 100s batch, limit consumption rate
@Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire")
object SparkDemo extends SparkStreaming {

  @Process
  def kafkaSource: Unit = {
    val dstream = this.fire.createKafkaDirectStream() // consume Kafka via API
    sql("""select * from xxx""").show()
  }
}

These snippets demonstrate that Fire provides a unified programming style for both engines; developers only need to apply the provided annotations and extend the corresponding base class.

The @Config annotation supports multi‑line configuration, covering Spark/Flink tuning parameters, Fire‑specific settings, and user‑defined options.

During execution, Fire initializes the SparkSession or Flink ExecutionEnvironment based on these configurations, eliminating repetitive initialization code.

The @Process annotation marks the entry point of user logic, automatically invoked by the framework, while the main method is supplied by the parent class.

The @Streaming annotation works for both Spark Streaming and Flink, controlling batch interval or checkpoint frequency, and also supports parallelism, rate limiting, checkpoint timeout, and non‑aligned checkpoints.

The @Hive annotation configures the Hive metastore URL, allowing automatic creation of Hive catalogs for both engines without manual XML configuration.

The @Kafka and @RocketMQ annotations configure message‑queue connections, enabling one‑line stream creation:

val dstream = this.fire.createKafkaDirectStream() // consume Kafka
val dStream = this.fire.createRocketMqPullStream() // consume RocketMQ

1.3 SQL Development Example

Fire also supports pure SQL development. By placing SQL statements (separated by semicolons) inside a sql() call and annotating the method with @Step , the statements are executed sequentially, and the Chinese description in @Step is logged for traceability.

@Streaming(interval = 60, parallelism = 2)
object JdbcDimDemo extends FlinkStreaming {

  @Step(1, "Data source definition")
  def ddl: Unit = {
    sql("""
      |CREATE TABLE t_mysql_dim (
      |  `id` BIGINT ...
      |) WITH ( ... );
      |
      |CREATE TABLE t_kafka_fire (
      |  `id` BIGINT ...
      |) WITH ( ... )
      |""").stripMargin
  }

  @Step(2, "Join Kafka data with MySQL dimension table")
  def showJoin: Unit = {
    sql("""
      |select
      |  xxx
      |from t_kafka_fire t1 left join t_mysql_dim t2 on t1.id=t2.id
      |""").stripMargin.print()
  }
}

The framework executes the annotated steps in order and logs progress.

2. Annotation Overview (Common to Spark and Flink)

@Config : Configures engine‑specific parameters, Fire framework settings, and custom options; automatically applied when building SparkSession or Flink ExecutionEnvironment.

@Streaming : For Flink, controls checkpoint settings; for Spark Streaming, sets batch interval, back‑pressure, and rate limiting.

@Kafka : Specifies Kafka cluster details; multiple clusters can be configured with @Kafka2 , @Kafka3 , etc.

@Hive : Sets Hive metastore URL, supporting HDFS HA and cross‑cluster reads/writes.

@Process : Marks the user code entry point, automatically invoked by Fire.

@HBase : Configures HBase connection; a single line handles read/write.

@JDBC : Configures JDBC connection; Fire manages a connection pool internally.

@Scheduled : Similar to Spring’s scheduling, enables periodic tasks in Spark Streaming or Flink.

@Before : Lifecycle hook executed before engine context initialization.

@After : Lifecycle hook executed before JVM shutdown, useful for resource cleanup.

Examples of using @HBase , @Jdbc , and @Scheduled illustrate the convenience of annotation‑driven access to external systems and scheduled jobs.

// HBase example
@HBase("localhost:2181")
@HBase2(cluster = "192.168.0.1:2181", storageLevel = "DISK_ONLY")

this.fire.hbasePutDF(hTableName, studentDF, classOf[Student]) // uses @HBase (keyNum=1)
this.fire.hbasePutDF(hTableName2, studentDF, classOf[Student], keyNum=2) // uses @HBase2

// JDBC example
@Jdbc(url = "jdbc:derby:memory:fire;create=true", username = "fire", password = "fire")
val insertSql = "INSERT INTO $tableName (name, age, createTime, length, sex) VALUES (?,?,?,?,?)"
this.fire.jdbcUpdate(insertSql, Seq("admin", 12, timestamp, 10.0, 1))

// Scheduled example
/**
  * Method annotated with @Scheduled runs periodically.
  * @scope defaults to both driver and executor; "driver" limits to driver.
  * @initialDelay defines delay before first execution.
  */
@Scheduled(cron = "0/5 * * * * ?", scope = "driver", initialDelay = 60000)
def loadTable: Unit = {
  this.logger.info("Updating dimension table")
}

3. Open‑Source Project Collaboration

Interested developers are welcome to contribute to the Fire framework. Project repositories:

GitHub: https://github.com/ZTO-Express/fire

Gitee: https://gitee.com/RS131419/fire

big dataReal-time ProcessingFlinkAnnotationsSparkScalaFire Framework
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.