Hadoop MapReduce Example

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.OutputFormat;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public class MyMapReduceJob extends Configured implements Tool {

@Override

public int run(String[] arg0) throws Exception {

Job job = new Job(new Configuration());

Configuration jobConf = job.getConfiguration();

job.setJarByClass(MyMapReduceJob.class);

job.setInputFormatClass(TextInputFormat.class);

jobConf.set(“mapred.input.dir”, arg0[0]);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(VarIntWritable.class);

job.setMapOutputValueClass(VarLongWritable.class);

//jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(VarIntWritable.class);

job.setOutputValueClass(VarLongWritable.class);

//job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

jobConf.set(“mapred.output.dir”, arg0[1]);

boolean succeeded = job.waitForCompletion(true);

if(!succeeded) return -1;

return 0;

}

public static void main(String[] args)throws Exception {

ToolRunner.run(new Configuration(), new MyMapReduceJob(),

args);

}

}

//Mapper Class

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;

import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public final class MyMapper extends

Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {

private boolean transpose;

@Override

protected void setup(Context context) {

Configuration jobConf = context.getConfiguration();

transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);

}

@Override

protected void map(LongWritable key,

Text value,

Context context) throws IOException, InterruptedException {

String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());

long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);

int index = TasteHadoopUtils.idToIndex(itemID);

context.write(new VarIntWritable(index), new VarLongWritable(itemID));

}

}

//Reducer Class

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public final class MyReducer extends

Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {

@Override

protected void reduce(VarIntWritable index,

Iterable<VarLongWritable> possibleItemIDs,

Context context) throws IOException, InterruptedException {

long minimumItemID = Long.MAX_VALUE;

for (VarLongWritable varLongWritable : possibleItemIDs) {

long itemID = varLongWritable.get();

if (itemID < minimumItemID) {

minimumItemID = itemID;

}

}

if (minimumItemID != Long.MAX_VALUE) {

context.write(index, new VarLongWritable(minimumItemID));

}

}

}

 

==================================

import java.io.IOException;

 

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

 

 

public class StockAggregatorAvg {

 

static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {

 

private int numRecords = 0;

//private static final IntWritable one = new IntWritable(1);

 

@Override

public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

// extract userKey from the compositeKey (userId + counter)

ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);

double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));

DoubleWritable openWritable = new DoubleWritable(open);

try {

context.write(userKey, openWritable);

} catch (InterruptedException e) {

throw new IOException(e);

}

numRecords++;

if ((numRecords % 10000) == 0) {

context.setStatus(“mapper processed ” + numRecords + ” records so far”);

}

}

}

 

public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {

 

public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)

throws IOException, InterruptedException {

double sum = 0;

int count = 0;

double sumSq = 0;

double value = 0;

for (DoubleWritable val : values) {

value = val.get();

sum += value;

sumSq += value * value;

count++;

}

 

double avg = sum/count;

double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));

Put put = new Put(key.get());

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));

System.out.println(String.format(“stats :   key : %d,  sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));

context.write(key, put);

}

}

public static void main(String[] args) throws Exception {

HBaseConfiguration conf = new HBaseConfiguration();

Job job = new Job(conf, “Stock”);

job.setJarByClass(StockAggregatorAvg.class);

Scan scan = new Scan();

String columns = “info”; // comma seperated

scan.addColumns(columns);

scan.setFilter(new FirstKeyOnlyFilter());

TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,

DoubleWritable.class, job);

TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

 

}

 

 

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s