Real‑time Data Processing with ElasticSearch, Kibana and Logstash: Installation, CRUD, Bulk Import, and Data Transformation
This tutorial walks through building a real‑time data processing pipeline using ElasticSearch, Kibana and Logstash, covering core concepts such as data volume, velocity, variety and accuracy, detailed installation steps, CRUD operations, bulk data import, Java‑based data conversion, and Logstash pipeline configuration with filters and date parsing.
The article introduces the fundamentals of handling real‑time or near‑real‑time data streams, emphasizing the four V’s of big data—volume, velocity, variety and accuracy—and explains how tools like Storm, Kafka and ElasticSearch can be combined to build a data reduction system (DRS).
It then provides step‑by‑step instructions for installing ElasticSearch and Kibana, showing the directory structures, key configuration files (e.g., config/elasticsearch.yml ), and how to verify the cluster health with a simple curl request.
Next, the guide details ElasticSearch CRUD operations, mapping each CRUD command to its HTTP/REST counterpart and illustrating document creation, retrieval, update (including scripted updates), deletion, and bulk API usage with examples such as:
PUT /flight/_doc/1
{
"Icao":"A0835D",
"Alt":2400,
"Lat":39.984322,
"Long":-82.925616
}It also explains how to prepare bulk data files, convert JSON flight logs into the required NDJSON format using Java code (e.g., JsonFlightFileConverter ), and import them with:
curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"The article then shifts to Logstash, describing its architecture (inputs, filters, outputs), installation options, and a sample pipeline that reads flight JSON files, parses them with the json filter, removes unwanted fields, renames the Id field to [@metadata][Id] , and indexes the data into ElasticSearch:
input {
file { path => "/path/to/test.json" codec => "json" start_position => "beginning" }
}
filter {
json { source => "message" }
mutate { remove_field => ["path","@version","@timestamp","host","message"]
rename => { "[Id]" => "[@metadata][Id]" } }
mutate { gsub => ["FSeen", "\/Date\(", "", "FSeen", "\)\/", ""] }
date { match => ["FSeen","UNIX_MS"] timezone => "UTC" }
}
output {
elasticsearch { hosts => ["http://localhost:9200"] index => "flight-logstash" document_id => "%{[@metadata][Id]}" }
stdout { codec => rubydebug }
}Finally, it discusses when Logstash is appropriate for batch versus real‑time processing and provides additional resources for scaling Logstash with Redis.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.