Avro Schema

Here are some examples:

{

“namespace”: “test.avro”,

“name”: “FacebookUser”,

“type”: “record”,

“fields”: [

{“name”: “name”, “type”: “string”},

{“name”: “num_likes”, “type”: “int”},

{“name”: “num_photos”, “type”: “int”},

{“name”: “num_groups”, “type”: “int”},

    {"name": "friends", "type": "array", "items": "FacebookUser"} 
]

}

 

First, create a ByteArrayInputStream with the avro data as the input

Then, create DecoderFactory and then get the DirectBinaryDecoder from the factory

Next, instantiate the GenericDatumReader with the Avro schema

Now we can use the above reader to read the deserialized avro data from the decoder

 

Schema schema = Schema.parse(new File(“PositionList.avsc”));

ByteArrayInputStream bai = new ByteArrayInputStream(bytes);

DecoderFactory decoderFactory = new DecoderFactory();

Decoder decoder = DecoderFactory.get().directBinaryDecoder(bai, null);

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

GenericRecord rec = (GenericRecord)reader.read(null, decoder);

 

 

Hadoop MapReduce Example

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.OutputFormat;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public class MyMapReduceJob extends Configured implements Tool {

@Override

public int run(String[] arg0) throws Exception {

Job job = new Job(new Configuration());

Configuration jobConf = job.getConfiguration();

job.setJarByClass(MyMapReduceJob.class);

job.setInputFormatClass(TextInputFormat.class);

jobConf.set(“mapred.input.dir”, arg0[0]);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(VarIntWritable.class);

job.setMapOutputValueClass(VarLongWritable.class);

//jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(VarIntWritable.class);

job.setOutputValueClass(VarLongWritable.class);

//job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

jobConf.set(“mapred.output.dir”, arg0[1]);

boolean succeeded = job.waitForCompletion(true);

if(!succeeded) return -1;

return 0;

}

public static void main(String[] args)throws Exception {

ToolRunner.run(new Configuration(), new MyMapReduceJob(),

args);

}

}

//Mapper Class

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;

import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public final class MyMapper extends

Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {

private boolean transpose;

@Override

protected void setup(Context context) {

Configuration jobConf = context.getConfiguration();

transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);

}

@Override

protected void map(LongWritable key,

Text value,

Context context) throws IOException, InterruptedException {

String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());

long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);

int index = TasteHadoopUtils.idToIndex(itemID);

context.write(new VarIntWritable(index), new VarLongWritable(itemID));

}

}

//Reducer Class

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.mahout.math.VarIntWritable;

import org.apache.mahout.math.VarLongWritable;

public final class MyReducer extends

Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {

@Override

protected void reduce(VarIntWritable index,

Iterable<VarLongWritable> possibleItemIDs,

Context context) throws IOException, InterruptedException {

long minimumItemID = Long.MAX_VALUE;

for (VarLongWritable varLongWritable : possibleItemIDs) {

long itemID = varLongWritable.get();

if (itemID < minimumItemID) {

minimumItemID = itemID;

}

}

if (minimumItemID != Long.MAX_VALUE) {

context.write(index, new VarLongWritable(minimumItemID));

}

}

}

 

==================================

import java.io.IOException;

 

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

 

 

public class StockAggregatorAvg {

 

static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {

 

private int numRecords = 0;

//private static final IntWritable one = new IntWritable(1);

 

@Override

public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

// extract userKey from the compositeKey (userId + counter)

ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);

double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));

DoubleWritable openWritable = new DoubleWritable(open);

try {

context.write(userKey, openWritable);

} catch (InterruptedException e) {

throw new IOException(e);

}

numRecords++;

if ((numRecords % 10000) == 0) {

context.setStatus(“mapper processed ” + numRecords + ” records so far”);

}

}

}

 

public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {

 

public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)

throws IOException, InterruptedException {

double sum = 0;

int count = 0;

double sumSq = 0;

double value = 0;

for (DoubleWritable val : values) {

value = val.get();

sum += value;

sumSq += value * value;

count++;

}

 

double avg = sum/count;

double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));

Put put = new Put(key.get());

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));

put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));

System.out.println(String.format(“stats :   key : %d,  sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));

context.write(key, put);

}

}

public static void main(String[] args) throws Exception {

HBaseConfiguration conf = new HBaseConfiguration();

Job job = new Job(conf, “Stock”);

job.setJarByClass(StockAggregatorAvg.class);

Scan scan = new Scan();

String columns = “info”; // comma seperated

scan.addColumns(columns);

scan.setFilter(new FirstKeyOnlyFilter());

TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,

DoubleWritable.class, job);

TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

 

}

 

 

 

Read from HBase table and Write to HBase using MapReduce

import java.io.IOException;

 

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

 

 

public class StockAggregatorAvg {

 

    static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {

 

        private int numRecords = 0;

        //private static final IntWritable one = new IntWritable(1);

 

        @Override

        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

            // extract userKey from the compositeKey (userId + counter)

            ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);

            double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));

            DoubleWritable openWritable = new DoubleWritable(open);

            try {

                context.write(userKey, openWritable);

            } catch (InterruptedException e) {

                throw new IOException(e);

            }

            numRecords++;

            if ((numRecords % 10000) == 0) {

                context.setStatus(“mapper processed ” + numRecords + ” records so far”);

            }

        }

    }

 

    public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {

 

        public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)

                throws IOException, InterruptedException {

            double sum = 0;

            int count = 0;

            double sumSq = 0;

            double value = 0;

            for (DoubleWritable val : values) {

            value = val.get();

                sum += value;

                sumSq += value * value;

                count++;

            }

 

            double avg = sum/count;

            double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));

            

            Put put = new Put(key.get());

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));

            put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));

            System.out.println(String.format(“stats :   key : %d,  sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));

            context.write(key, put);

        }

    }

    

    public static void main(String[] args) throws Exception {

        HBaseConfiguration conf = new HBaseConfiguration();

        Job job = new Job(conf, “Stock”);

        job.setJarByClass(StockAggregatorAvg.class);

        Scan scan = new Scan();

        String columns = “info”; // comma seperated

        scan.addColumns(columns);

        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,

                DoubleWritable.class, job);

        TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

 

}

Apache Commons CLI Example

HelpFormatter help = new HelpFormatter();

        Options options = new Options();
        options.addOption("h", "help", false, "print program usage");
        options.addOption("n", "name", true, "sets job name");
        CommandLineParser parser = new BasicParser();
        CommandLine cline;
        try {
            cline = parser.parse(options, args);
            args = cline.getArgs();
            if (args.length < 1) {
                help.printHelp(CMDLINE, options);
                return -1;
            }
        } catch (ParseException e) {
            System.out.println(e);
            e.printStackTrace();
            help.printHelp(CMDLINE, options);
            return -1;
        }
 
        String name = null;
        try {
            if (cline.hasOption('n'))
                name = cline.getOptionValue('n');
            else
                name = "wildnove.com - Tutorial MultiTableOutputFormat ";

HadoopUtil class in Mahout

One can use the HadoopUtil class to prepare a Hadoop MapReduce job

org.apache.mahout.common.HadoopUtil

public static Job prepareJob(Path inputPath,

Path outputPath,

Class<? extends InputFormat> inputFormat,

Class<? extends Mapper> mapper,

Class<? extends Writable> mapperKey,

Class<? extends Writable> mapperValue,

Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {

Job job = new Job(new Configuration(conf));

Configuration jobConf = job.getConfiguration();

if (mapper.equals(Mapper.class)) {

throw new IllegalStateException(“Can’t figure out the user class jar file from mapper/reducer”);

}

job.setJarByClass(mapper);

job.setInputFormatClass(inputFormat);

jobConf.set(“mapred.input.dir”, inputPath.toString());

job.setMapperClass(mapper);

job.setMapOutputKeyClass(mapperKey);

job.setMapOutputValueClass(mapperValue);

job.setOutputKeyClass(mapperKey);

job.setOutputValueClass(mapperValue);

jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setNumReduceTasks(0);

job.setOutputFormatClass(outputFormat);

jobConf.set(“mapred.output.dir”, outputPath.toString());

return job;

}

public static Job prepareJob(Path inputPath,

Path outputPath,

Class<? extends InputFormat> inputFormat,

Class<? extends Mapper> mapper,

Class<? extends Writable> mapperKey,

Class<? extends Writable> mapperValue,

Class<? extends Reducer> reducer,

Class<? extends Writable> reducerKey,

Class<? extends Writable> reducerValue,

Class<? extends OutputFormat> outputFormat,

Configuration conf) throws IOException {

Job job = new Job(new Configuration(conf));

Configuration jobConf = job.getConfiguration();

if (reducer.equals(Reducer.class)) {

if (mapper.equals(Mapper.class)) {

throw new IllegalStateException(“Can’t figure out the user class jar file from mapper/reducer”);

}

job.setJarByClass(mapper);

} else {

job.setJarByClass(reducer);

}

job.setInputFormatClass(inputFormat);

jobConf.set(“mapred.input.dir”, inputPath.toString());

job.setMapperClass(mapper);

if (mapperKey != null) {

job.setMapOutputKeyClass(mapperKey);

}

if (mapperValue != null) {

job.setMapOutputValueClass(mapperValue);

}

jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setReducerClass(reducer);

job.setOutputKeyClass(reducerKey);

job.setOutputValueClass(reducerValue);

job.setOutputFormatClass(outputFormat);

jobConf.set(“mapred.output.dir”, outputPath.toString());

return job;

}