Read from HBase table and Write to HBase using MapReduce

import java.io.IOException;

 

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

 

 

public class StockAggregatorAvg {

 

    static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {

 

        private int numRecords = 0;

        //private static final IntWritable one = new IntWritable(1);

 

        @Override

        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

            // extract userKey from the compositeKey (userId + counter)

            ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);

            double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));

            DoubleWritable openWritable = new DoubleWritable(open);

            try {

                context.write(userKey, openWritable);

            } catch (InterruptedException e) {

                throw new IOException(e);

            }

            numRecords++;

            if ((numRecords % 10000) == 0) {

                context.setStatus(“mapper processed ” + numRecords + ” records so far”);

            }

        }

    }

 

    public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {

 

        public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)

                throws IOException, InterruptedException {

            double sum = 0;

            int count = 0;

            double sumSq = 0;

            double value = 0;

            for (DoubleWritable val : values) {

            value = val.get();

                sum += value;

                sumSq += value * value;

                count++;

            }

 

            double avg = sum/count;

            double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));

            

            Put put = new Put(key.get());

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));

            System.out.println(String.format(“stats :   key : %d,  sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));

            context.write(key, put);

        }

    }

    

    public static void main(String[] args) throws Exception {

        HBaseConfiguration conf = new HBaseConfiguration();

        Job job = new Job(conf, “Stock”);

        job.setJarByClass(StockAggregatorAvg.class);

        Scan scan = new Scan();

        String columns = “info”; // comma seperated

        scan.addColumns(columns);

        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,

                DoubleWritable.class, job);

        TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

 

}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s