import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public class MyMapReduceJob extends Configured implements Tool {
@Override
public int run(String[] arg0) throws Exception {
Job job = new Job(new Configuration());
Configuration jobConf = job.getConfiguration();
job.setJarByClass(MyMapReduceJob.class);
job.setInputFormatClass(TextInputFormat.class);
jobConf.set(“mapred.input.dir”, arg0[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(VarIntWritable.class);
job.setMapOutputValueClass(VarLongWritable.class);
//jobConf.setBoolean(“mapred.compress.map.output”, true);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(VarIntWritable.class);
job.setOutputValueClass(VarLongWritable.class);
//job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
jobConf.set(“mapred.output.dir”, arg0[1]);
boolean succeeded = job.waitForCompletion(true);
if(!succeeded) return -1;
return 0;
}
public static void main(String[] args)throws Exception {
ToolRunner.run(new Configuration(), new MyMapReduceJob(),
args);
}
}
//Mapper Class
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public final class MyMapper extends
Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
private boolean transpose;
@Override
protected void setup(Context context) {
Configuration jobConf = context.getConfiguration();
transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);
}
@Override
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
int index = TasteHadoopUtils.idToIndex(itemID);
context.write(new VarIntWritable(index), new VarLongWritable(itemID));
}
}
//Reducer Class
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public final class MyReducer extends
Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
@Override
protected void reduce(VarIntWritable index,
Iterable<VarLongWritable> possibleItemIDs,
Context context) throws IOException, InterruptedException {
long minimumItemID = Long.MAX_VALUE;
for (VarLongWritable varLongWritable : possibleItemIDs) {
long itemID = varLongWritable.get();
if (itemID < minimumItemID) {
minimumItemID = itemID;
}
}
if (minimumItemID != Long.MAX_VALUE) {
context.write(index, new VarLongWritable(minimumItemID));
}
}
}
==================================
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
public class StockAggregatorAvg {
static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {
private int numRecords = 0;
//private static final IntWritable one = new IntWritable(1);
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// extract userKey from the compositeKey (userId + counter)
ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);
double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));
DoubleWritable openWritable = new DoubleWritable(open);
try {
context.write(userKey, openWritable);
} catch (InterruptedException e) {
throw new IOException(e);
}
numRecords++;
if ((numRecords % 10000) == 0) {
context.setStatus(“mapper processed ” + numRecords + ” records so far”);
}
}
}
public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int count = 0;
double sumSq = 0;
double value = 0;
for (DoubleWritable val : values) {
value = val.get();
sum += value;
sumSq += value * value;
count++;
}
double avg = sum/count;
double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));
Put put = new Put(key.get());
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));
System.out.println(String.format(“stats : key : %d, sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));
context.write(key, put);
}
}
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
Job job = new Job(conf, “Stock”);
job.setJarByClass(StockAggregatorAvg.class);
Scan scan = new Scan();
String columns = “info”; // comma seperated
scan.addColumns(columns);
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,
DoubleWritable.class, job);
TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}