import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
public class StockAggregatorAvg {
static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {
private int numRecords = 0;
//private static final IntWritable one = new IntWritable(1);
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// extract userKey from the compositeKey (userId + counter)
ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);
double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));
DoubleWritable openWritable = new DoubleWritable(open);
try {
context.write(userKey, openWritable);
} catch (InterruptedException e) {
throw new IOException(e);
}
numRecords++;
if ((numRecords % 10000) == 0) {
context.setStatus(“mapper processed ” + numRecords + ” records so far”);
}
}
}
public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int count = 0;
double sumSq = 0;
double value = 0;
for (DoubleWritable val : values) {
value = val.get();
sum += value;
sumSq += value * value;
count++;
}
double avg = sum/count;
double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));
Put put = new Put(key.get());
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));
System.out.println(String.format(“stats : key : %d, sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));
context.write(key, put);
}
}
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
Job job = new Job(conf, “Stock”);
job.setJarByClass(StockAggregatorAvg.class);
Scan scan = new Scan();
String columns = “info”; // comma seperated
scan.addColumns(columns);
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,
DoubleWritable.class, job);
TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}