Implementing a Spark DataSource for REST JSON Interfaces
This article explains how to create a custom Spark DataSource that reads JSON data from a standard REST API, covering the design of DefaultSource, schema inference, data fetching, and integration with Spark SQL for seamless downstream processing.
The Spark DataSource API enables standardized adapters for various data sources, allowing efficient use of Spark's compute engine. This guide demonstrates building a DataSource that reads JSON from a REST endpoint, flattening nested structures for direct joins in streaming pipelines.
Implementation Goal : Use the DataSource API to fetch data via HTTP, extract the required JSON array using an XPath expression, and provide a configurable module for schema inference and data scanning.
DataSource API Usage :
val df = SQLContext.getOrCreate(sc).
read.
format('driver class') // driver class similar to JDBC
options(Map(...)) // additional driver parameters
load('url') // resource pathDefaultSource Definition extends RelationProvider and DataSourceRegister :
org.apache.spark.sql.execution.datasources.rest.json.DefaultSource extends RelationProvider with DataSourceRegisterThe shortName method returns a concise identifier:
override def shortName(): String = 'restJSON'RelationProvider requires implementing createRelation to produce a BaseRelation based on user‑provided parameters such as url , xPath , and samplingRatio :
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
val samplingRatio = parameters.get('samplingRatio').map(_.toDouble).getOrElse(1.0)
val url = parameters.getOrElse('url', '')
val xPath = parameters.getOrElse('xPath', '$')
new RestJSONRelation(None, url, xPath, samplingRatio, None)(sqlContext)
}RestJSONRelation class signature:
private[sql] class RestJSONRelation(
val inputRDD: Option[RDD[String]],
val url: String,
val xPath: String,
val samplingRatio: Double,
val maybeDataSchema: Option[StructType]
)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScanSchema inference is performed lazily. If the user does not supply a schema, the relation samples the JSON data (using samplingRatio ) and calls InferSchema to produce a StructType :
lazy val dataSchema = {
val jsonSchema = maybeDataSchema.getOrElse {
InferSchema(
inputRDD.getOrElse(createBaseRdd(Array(url))),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
}
checkConstraints(jsonSchema)
jsonSchema
}Data fetching creates an RDD of JSON strings by issuing an HTTP GET request and extracting the array defined by xPath :
private def createBaseRdd(inputPaths: Array[String]): RDD[String] = {
val url = inputPaths.head
val res = Request.Get(new URL(url).toURI).execute()
val response = res.returnResponse()
val content = EntityUtils.toString(response.getEntity)
if (response != null && response.getStatusLine.getStatusCode == 200) {
val extractContent = JSONArray.fromObject(JSONPath.read(content, xPath))
.map(f => JSONObject.fromObject(f).toString).toSeq
sqlContext.sparkContext.makeRDD(extractContent)
} else {
sqlContext.sparkContext.makeRDD(Seq())
}
}The buildScan method implements TableScan , converting each JSON string to a Row using the inferred schema:
def buildScan(): RDD[Row] = {
JacksonParser(
inputRDD.getOrElse(createBaseRdd(Array(url))),
dataSchema, sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
}Conversion logic walks through each JSON object, matches fields to the schema, and transforms values (e.g., strings to UTF8String ) before updating the internal row representation.
Usage Example :
val df = SQLContext.getOrCreate(sc).
read.
format('org.apache.spark.sql.execution.datasources.rest.json').
options(Map('url' -> 'http://[your dns]/path', 'xPath' -> '$.data')).
load('url')
// df can now be used like any regular DataFrameConclusion : Implementing a custom Spark DataSource for REST JSON enables seamless integration of external HTTP‑exposed data into Spark pipelines, leveraging schema inference and Spark SQL optimizations for better performance and ecosystem compatibility.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.