This explains more about various hadoop versions
http://www.cloudera.com/blog/2012/04/apache-hadoop-versions-looking-ahead-3/
http://www.cloudera.com/blog/2012/01/an-update-on-apache-hadoop-1-0/
This explains more about various hadoop versions
http://www.cloudera.com/blog/2012/04/apache-hadoop-versions-looking-ahead-3/
http://www.cloudera.com/blog/2012/01/an-update-on-apache-hadoop-1-0/
Here are some examples:
{
“namespace”: “test.avro”,
“name”: “FacebookUser”,
“type”: “record”,
“fields”: [
{“name”: “name”, “type”: “string”},
{“name”: “num_likes”, “type”: “int”},
{“name”: “num_photos”, “type”: “int”},
{“name”: “num_groups”, “type”: “int”},
{"name": "friends", "type": "array", "items": "FacebookUser"}
]
}
First, create a ByteArrayInputStream with the avro data as the input
Then, create DecoderFactory and then get the DirectBinaryDecoder from the factory
Next, instantiate the GenericDatumReader with the Avro schema
Now we can use the above reader to read the deserialized avro data from the decoder
Schema schema = Schema.parse(new File(“PositionList.avsc”));
ByteArrayInputStream bai = new ByteArrayInputStream(bytes);
DecoderFactory decoderFactory = new DecoderFactory();
Decoder decoder = DecoderFactory.get().directBinaryDecoder(bai, null);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
GenericRecord rec = (GenericRecord)reader.read(null, decoder);
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public class MyMapReduceJob extends Configured implements Tool {
@Override
public int run(String[] arg0) throws Exception {
Job job = new Job(new Configuration());
Configuration jobConf = job.getConfiguration();
job.setJarByClass(MyMapReduceJob.class);
job.setInputFormatClass(TextInputFormat.class);
jobConf.set(“mapred.input.dir”, arg0[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(VarIntWritable.class);
job.setMapOutputValueClass(VarLongWritable.class);
//jobConf.setBoolean(“mapred.compress.map.output”, true);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(VarIntWritable.class);
job.setOutputValueClass(VarLongWritable.class);
//job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
jobConf.set(“mapred.output.dir”, arg0[1]);
boolean succeeded = job.waitForCompletion(true);
if(!succeeded) return -1;
return 0;
}
public static void main(String[] args)throws Exception {
ToolRunner.run(new Configuration(), new MyMapReduceJob(),
args);
}
}
//Mapper Class
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public final class MyMapper extends
Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
private boolean transpose;
@Override
protected void setup(Context context) {
Configuration jobConf = context.getConfiguration();
transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);
}
@Override
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
int index = TasteHadoopUtils.idToIndex(itemID);
context.write(new VarIntWritable(index), new VarLongWritable(itemID));
}
}
//Reducer Class
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public final class MyReducer extends
Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
@Override
protected void reduce(VarIntWritable index,
Iterable<VarLongWritable> possibleItemIDs,
Context context) throws IOException, InterruptedException {
long minimumItemID = Long.MAX_VALUE;
for (VarLongWritable varLongWritable : possibleItemIDs) {
long itemID = varLongWritable.get();
if (itemID < minimumItemID) {
minimumItemID = itemID;
}
}
if (minimumItemID != Long.MAX_VALUE) {
context.write(index, new VarLongWritable(minimumItemID));
}
}
}
==================================
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
public class StockAggregatorAvg {
static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {
private int numRecords = 0;
//private static final IntWritable one = new IntWritable(1);
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// extract userKey from the compositeKey (userId + counter)
ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);
double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));
DoubleWritable openWritable = new DoubleWritable(open);
try {
context.write(userKey, openWritable);
} catch (InterruptedException e) {
throw new IOException(e);
}
numRecords++;
if ((numRecords % 10000) == 0) {
context.setStatus(“mapper processed ” + numRecords + ” records so far”);
}
}
}
public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int count = 0;
double sumSq = 0;
double value = 0;
for (DoubleWritable val : values) {
value = val.get();
sum += value;
sumSq += value * value;
count++;
}
double avg = sum/count;
double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));
Put put = new Put(key.get());
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));
System.out.println(String.format(“stats : key : %d, sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));
context.write(key, put);
}
}
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
Job job = new Job(conf, “Stock”);
job.setJarByClass(StockAggregatorAvg.class);
Scan scan = new Scan();
String columns = “info”; // comma seperated
scan.addColumns(columns);
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,
DoubleWritable.class, job);
TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
0x7FFFFFFF is 0111 1111 1111 1111 1111 1111 1111 1111 : all 1 except the sign bit.
(hash & 0x7FFFFFFF) will result in a positive integer.
These are the popular InputFormats
TextInputFormat (key:LongWritable, value:Text)
KeyValueTextInputFormat (key:Text, value:Text)
SequenceFileInputFormat<K,V>
Check out this slides about HBase schema design
http://www.slideshare.net/cloudera/hadoop-world-2011-advanced-hbase-schema-design
http://www.slideshare.net/hmisty/20090713-hbase-schema-design-case-studies
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
public class StockAggregatorAvg {
static class AvgMapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> {
private int numRecords = 0;
//private static final IntWritable one = new IntWritable(1);
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// extract userKey from the compositeKey (userId + counter)
ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_LONG);
double open = Bytes.toDouble(values.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“open”)));
DoubleWritable openWritable = new DoubleWritable(open);
try {
context.write(userKey, openWritable);
} catch (InterruptedException e) {
throw new IOException(e);
}
numRecords++;
if ((numRecords % 10000) == 0) {
context.setStatus(“mapper processed ” + numRecords + ” records so far”);
}
}
}
public static class AvgReducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int count = 0;
double sumSq = 0;
double value = 0;
for (DoubleWritable val : values) {
value = val.get();
sum += value;
sumSq += value * value;
count++;
}
double avg = sum/count;
double stddev = Math.sqrt((sumSq – avg*sum)/(count-1));
Put put = new Put(key.get());
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“ap”), Bytes.toBytes(sum/count));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“sum”), Bytes.toBytes(sum));
put.add(Bytes.toBytes(“details”), Bytes.toBytes(“dev”), Bytes.toBytes(stddev));
System.out.println(String.format(“stats : key : %d, sum : %f, avg : %f, stddev : %f”, Bytes.toLong(key.get()), sum, avg, stddev));
context.write(key, put);
}
}
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
Job job = new Job(conf, “Stock”);
job.setJarByClass(StockAggregatorAvg.class);
Scan scan = new Scan();
String columns = “info”; // comma seperated
scan.addColumns(columns);
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob(“stocklongId”, scan, AvgMapper.class, ImmutableBytesWritable.class,
DoubleWritable.class, job);
TableMapReduceUtil.initTableReducerJob(“stockStats”, AvgReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
HelpFormatter help = new HelpFormatter();
Options options = new Options(); options.addOption("h", "help", false, "print program usage"); options.addOption("n", "name", true, "sets job name"); CommandLineParser parser = new BasicParser(); CommandLine cline; try { cline = parser.parse(options, args); args = cline.getArgs(); if (args.length < 1) { help.printHelp(CMDLINE, options); return -1; } } catch (ParseException e) { System.out.println(e); e.printStackTrace(); help.printHelp(CMDLINE, options); return -1; } String name = null; try { if (cline.hasOption('n')) name = cline.getOptionValue('n'); else name = "wildnove.com - Tutorial MultiTableOutputFormat ";Check out this blog about Apache Commons CLI
http://www.baptiste-wicht.com/2010/10/compute-command-line-arguments-with-apache-commons-cli/