Hadoop MapReduce Example

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.OutputFormat;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public class MyMapReduceJob extends Configured implements Tool {

@Override

public int run(String[] arg0) throws Exception {

Job job = new Job(new Configuration());

Configuration jobConf = job.getConfiguration();

job.setJarByClass(MyMapReduceJob.class);

job.setInputFormatClass(TextInputFormat.class);

jobConf.set(“mapred.input.dir”, arg0[0]);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(VarIntWritable.class);

job.setMapOutputValueClass(VarLongWritable.class);

//jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(VarIntWritable.class);

job.setOutputValueClass(VarLongWritable.class);

//job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

jobConf.set(“mapred.output.dir”, arg0[1]);

boolean succeeded = job.waitForCompletion(true);

if(!succeeded) return -1;

return 0;

}

public static void main(String[] args)throws Exception {

ToolRunner.run(new Configuration(), new MyMapReduceJob(),

args);

}

}

//Mapper Class

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;

import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public final class MyMapper extends

Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {

private boolean transpose;

@Override

protected void setup(Context context) {

Configuration jobConf = context.getConfiguration();

transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);

}

@Override

protected void map(LongWritable key,

Text value,

Context context) throws IOException, InterruptedException {

String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());

long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);

int index = TasteHadoopUtils.idToIndex(itemID);

context.write(new VarIntWritable(index), new VarLongWritable(itemID));

}

}

//Reducer Class

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public final class MyReducer extends

Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {

@Override

protected void reduce(VarIntWritable index,

Iterable<VarLongWritable> possibleItemIDs,

Context context) throws IOException, InterruptedException {

long minimumItemID = Long.MAX_VALUE;

for (VarLongWritable varLongWritable : possibleItemIDs) {

long itemID = varLongWritable.get();

if (itemID < minimumItemID) {

minimumItemID = itemID;

}

}

if (minimumItemID != Long.MAX_VALUE) {

context.write(index, new VarLongWritable(minimumItemID));

}

}

}

 

==================================

import java.io.IOException;

 

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

 

 

public class StockAggregatorAvg {

 

static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {

 

private int numRecords = 0;

//private static final IntWritable one = new IntWritable(1);

 

@Override

public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

// extract userKey from the compositeKey (userId + counter)

ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);

double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));

DoubleWritable openWritable = new DoubleWritable(open);

try {

context.write(userKey, openWritable);

} catch (InterruptedException e) {

throw new IOException(e);

}

numRecords++;

if ((numRecords % 10000) == 0) {

context.setStatus(“mapper processed ” + numRecords + ” records so far”);

}

}

}

 

public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {

 

public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)

throws IOException, InterruptedException {

double sum = 0;

int count = 0;

double sumSq = 0;

double value = 0;

for (DoubleWritable val : values) {

value = val.get();

sum += value;

sumSq += value * value;

count++;

}

 

double avg = sum/count;

double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));

Put put = new Put(key.get());

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));

System.out.println(String.format(“stats :   key : %d,  sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));

context.write(key, put);

}

}

public static void main(String[] args) throws Exception {

HBaseConfiguration conf = new HBaseConfiguration();

Job job = new Job(conf, “Stock”);

job.setJarByClass(StockAggregatorAvg.class);

Scan scan = new Scan();

String columns = “info”; // comma seperated

scan.addColumns(columns);

scan.setFilter(new FirstKeyOnlyFilter());

TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,

DoubleWritable.class, job);

TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

 

}

 

 

 

Read from HBase table and Write to HBase using MapReduce

import java.io.IOException;

 

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

 

 

public class StockAggregatorAvg {

 

    static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {

 

        private int numRecords = 0;

        //private static final IntWritable one = new IntWritable(1);

 

        @Override

        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

            // extract userKey from the compositeKey (userId + counter)

            ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);

            double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));

            DoubleWritable openWritable = new DoubleWritable(open);

            try {

                context.write(userKey, openWritable);

            } catch (InterruptedException e) {

                throw new IOException(e);

            }

            numRecords++;

            if ((numRecords % 10000) == 0) {

                context.setStatus(“mapper processed ” + numRecords + ” records so far”);

            }

        }

    }

 

    public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {

 

        public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)

                throws IOException, InterruptedException {

            double sum = 0;

            int count = 0;

            double sumSq = 0;

            double value = 0;

            for (DoubleWritable val : values) {

            value = val.get();

                sum += value;

                sumSq += value * value;

                count++;

            }

 

            double avg = sum/count;

            double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));

            

            Put put = new Put(key.get());

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));

            System.out.println(String.format(“stats :   key : %d,  sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));

            context.write(key, put);

        }

    }

    

    public static void main(String[] args) throws Exception {

        HBaseConfiguration conf = new HBaseConfiguration();

        Job job = new Job(conf, “Stock”);

        job.setJarByClass(StockAggregatorAvg.class);

        Scan scan = new Scan();

        String columns = “info”; // comma seperated

        scan.addColumns(columns);

        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,

                DoubleWritable.class, job);

        TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

 

}

Apache Commons CLI Example

HelpFormatter help = new HelpFormatter();

        Options options = new Options();
        options.addOption("h", "help", false, "print program usage");
        options.addOption("n", "name", true, "sets job name");
        CommandLineParser parser = new BasicParser();
        CommandLine cline;
        try {
            cline = parser.parse(options, args);
            args = cline.getArgs();
            if (args.length < 1) {
                help.printHelp(CMDLINE, options);
                return -1;
            }
        } catch (ParseException e) {
            System.out.println(e);
            e.printStackTrace();
            help.printHelp(CMDLINE, options);
            return -1;
        }
 
        String name = null;
        try {
            if (cline.hasOption('n'))
                name = cline.getOptionValue('n');
            else
                name = "wildnove.com - Tutorial MultiTableOutputFormat ";

HadoopUtil class in Mahout

One can use the HadoopUtil class to prepare a Hadoop MapReduce job

org.apache.mahout.common.HadoopUtil

public static Job prepareJob(Path inputPath,

Path outputPath,

Class<? extends InputFormat> inputFormat,

Class<? extends Mapper> mapper,

Class<? extends Writable> mapperKey,

Class<? extends Writable> mapperValue,

Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {

Job job = new Job(new Configuration(conf));

Configuration jobConf = job.getConfiguration();

if (mapper.equals(Mapper.class)) {

throw new IllegalStateException(“Can’t figure out the user class jar file from mapper/reducer”);

}

job.setJarByClass(mapper);

job.setInputFormatClass(inputFormat);

jobConf.set(“mapred.input.dir”, inputPath.toString());

job.setMapperClass(mapper);

job.setMapOutputKeyClass(mapperKey);

job.setMapOutputValueClass(mapperValue);

job.setOutputKeyClass(mapperKey);

job.setOutputValueClass(mapperValue);

jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setNumReduceTasks(0);

job.setOutputFormatClass(outputFormat);

jobConf.set(“mapred.output.dir”, outputPath.toString());

return job;

}

public static Job prepareJob(Path inputPath,

Path outputPath,

Class<? extends InputFormat> inputFormat,

Class<? extends Mapper> mapper,

Class<? extends Writable> mapperKey,

Class<? extends Writable> mapperValue,

Class<? extends Reducer> reducer,

Class<? extends Writable> reducerKey,

Class<? extends Writable> reducerValue,

Class<? extends OutputFormat> outputFormat,

Configuration conf) throws IOException {

Job job = new Job(new Configuration(conf));

Configuration jobConf = job.getConfiguration();

if (reducer.equals(Reducer.class)) {

if (mapper.equals(Mapper.class)) {

throw new IllegalStateException(“Can’t figure out the user class jar file from mapper/reducer”);

}

job.setJarByClass(mapper);

} else {

job.setJarByClass(reducer);

}

job.setInputFormatClass(inputFormat);

jobConf.set(“mapred.input.dir”, inputPath.toString());

job.setMapperClass(mapper);

if (mapperKey != null) {

job.setMapOutputKeyClass(mapperKey);

}

if (mapperValue != null) {

job.setMapOutputValueClass(mapperValue);

}

jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setReducerClass(reducer);

job.setOutputKeyClass(reducerKey);

job.setOutputValueClass(reducerValue);

job.setOutputFormatClass(outputFormat);

jobConf.set(“mapred.output.dir”, outputPath.toString());

return job;

}