Migrating Data from HBase to Kafka Using MapReduce
This article explains how to reverse the typical data flow by extracting massive Rowkeys from HBase with MapReduce, storing them on HDFS, and then using batch Get operations to retrieve the full records and write them into Kafka, while handling retries and monitoring progress.
In many real‑time applications data is stored in an HBase cluster, but sometimes it is necessary to migrate that data back to Kafka.
Typically data flows from source → Kafka → consumer (Flink, Spark, Kafka API) → HBase, but the reverse direction requires a different approach.
The article outlines a solution that leverages HBase Get/List<Get> operations and MapReduce to extract Rowkeys, store them on HDFS, and then read them back to fetch the full rows and write them to Kafka.
Key steps include:
Extracting Rowkeys using a MapReduce job with FirstKeyOnlyFilter to minimize data transfer.
Generating Rowkey files on HDFS, splitting them according to data volume.
In a second MapReduce job, reading the Rowkey files, performing batch List<Get> on HBase, and sending the records to Kafka.
Recording successful and failed Rowkeys on HDFS to enable retry of failed writes.
A sample Java MapReduce program (MRROW2HDFS) is provided, showing configuration of HBase connection, scan setup, mapper and reducer classes, and how to write Rowkeys to HDFS.
public class MRROW2HDFS { public static void main(String[] args) throws Exception { Configuration config = HBaseConfiguration.create(); // HBase Config info Job job = Job.getInstance(config, "MRROW2HDFS"); job.setJarByClass(MRROW2HDFS.class); job.setReducerClass(ROWReducer.class); String hbaseTableName = "hbase_tbl_name"; Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setFilter(new FirstKeyOnlyFilter()); TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class ROWMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Mapper .Context context) throws IOException, InterruptedException { for (Cell cell : value.rawCells()) { // Filter date range // context.write(...); } } } public static class ROWReducer extends Reducer { private Text result = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for(Text val:values){ result.set(val); context.write(key, result); } } } }
The process is straightforward, but attention must be paid to Rowkey formatting, handling of empty spaces, and proper logging of success/failure for reliable data migration.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.