Big Data 12 min read

Implementing a Spark DataSource for REST JSON Interfaces

This article explains how to create a custom Spark DataSource that reads JSON data from a standard REST API, covering the design of DefaultSource, schema inference, data fetching, and integration with Spark SQL for seamless downstream processing.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Implementing a Spark DataSource for REST JSON Interfaces

The Spark DataSource API enables standardized adapters for various data sources, allowing efficient use of Spark's compute engine. This guide demonstrates building a DataSource that reads JSON from a REST endpoint, flattening nested structures for direct joins in streaming pipelines.

Implementation Goal : Use the DataSource API to fetch data via HTTP, extract the required JSON array using an XPath expression, and provide a configurable module for schema inference and data scanning.

DataSource API Usage :

val df = SQLContext.getOrCreate(sc).
read.
format('driver class') // driver class similar to JDBC
options(Map(...)) // additional driver parameters
load('url') // resource path

DefaultSource Definition extends RelationProvider and DataSourceRegister :

org.apache.spark.sql.execution.datasources.rest.json.DefaultSource extends RelationProvider with DataSourceRegister

The shortName method returns a concise identifier:

override def shortName(): String = 'restJSON'

RelationProvider requires implementing createRelation to produce a BaseRelation based on user‑provided parameters such as url , xPath , and samplingRatio :

override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
  val samplingRatio = parameters.get('samplingRatio').map(_.toDouble).getOrElse(1.0)
  val url = parameters.getOrElse('url', '')
  val xPath = parameters.getOrElse('xPath', '$')
  new RestJSONRelation(None, url, xPath, samplingRatio, None)(sqlContext)
}

RestJSONRelation class signature:

private[sql] class RestJSONRelation(
    val inputRDD: Option[RDD[String]],
    val url: String,
    val xPath: String,
    val samplingRatio: Double,
    val maybeDataSchema: Option[StructType]
)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan

Schema inference is performed lazily. If the user does not supply a schema, the relation samples the JSON data (using samplingRatio ) and calls InferSchema to produce a StructType :

lazy val dataSchema = {
  val jsonSchema = maybeDataSchema.getOrElse {
    InferSchema(
      inputRDD.getOrElse(createBaseRdd(Array(url))),
      samplingRatio,
      sqlContext.conf.columnNameOfCorruptRecord)
  }
  checkConstraints(jsonSchema)
  jsonSchema
}

Data fetching creates an RDD of JSON strings by issuing an HTTP GET request and extracting the array defined by xPath :

private def createBaseRdd(inputPaths: Array[String]): RDD[String] = {
  val url = inputPaths.head
  val res = Request.Get(new URL(url).toURI).execute()
  val response = res.returnResponse()
  val content = EntityUtils.toString(response.getEntity)
  if (response != null && response.getStatusLine.getStatusCode == 200) {
    val extractContent = JSONArray.fromObject(JSONPath.read(content, xPath))
      .map(f => JSONObject.fromObject(f).toString).toSeq
    sqlContext.sparkContext.makeRDD(extractContent)
  } else {
    sqlContext.sparkContext.makeRDD(Seq())
  }
}

The buildScan method implements TableScan , converting each JSON string to a Row using the inferred schema:

def buildScan(): RDD[Row] = {
  JacksonParser(
    inputRDD.getOrElse(createBaseRdd(Array(url))),
    dataSchema, sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
}

Conversion logic walks through each JSON object, matches fields to the schema, and transforms values (e.g., strings to UTF8String ) before updating the internal row representation.

Usage Example :

val df = SQLContext.getOrCreate(sc).
read.
format('org.apache.spark.sql.execution.datasources.rest.json').
options(Map('url' -> 'http://[your dns]/path', 'xPath' -> '$.data')).
load('url')
// df can now be used like any regular DataFrame

Conclusion : Implementing a custom Spark DataSource for REST JSON enables seamless integration of external HTTP‑exposed data into Spark pipelines, leveraging schema inference and Spark SQL optimizations for better performance and ecosystem compatibility.

JSONRESTBigDataSparkScalaDataSource
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.