This is a great paper about implementing an efficient large scale parallel version of L-BFGS algorithm in MapReduce.
http://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf
This is a great paper about implementing an efficient large scale parallel version of L-BFGS algorithm in MapReduce.
http://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf
Random forest is a popular ensemble learning technique where a collection of decision trees are trained to make prediction. Apache Mahout, Spark MLLib, and H2O have all implemented the Random Forest algorithm. Today, we will look at Mahout’s MapReduce version of Random Forest.
To run Random Forest using Mahout, you can use the following command
hadoop jar mahout-examples-0.9-job.jar org.apache.mahout.classifier.df.mapreduce.BuildForest -Dmapred.max.split.size=1874231 -d data/YOUR_TRAINING_DATA.txt -ds DATASET_METADATA.info -sl 5 -t 100 -o YOUR_OUTPUT_FOLDER
If you look at the org.apache.mahout.classifier.df.mapreduce.BuildForest class below, it sets up the MapReduce job with the following available options.
–data –dataset –selection –no-complete –minsplit
–minprop –seed –partial –nbtrees
–output
org.apache.mahout.classifier.df.mapreduce.BuildForest
@Override public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); ArgumentBuilder abuilder = new ArgumentBuilder(); GroupBuilder gbuilder = new GroupBuilder(); Option dataOpt = obuilder.withLongName("data").withShortName("d").withRequired(true) .withArgument(abuilder.withName("path").withMinimum(1).withMaximum(1).create()) .withDescription("Data path").create(); Option datasetOpt = obuilder.withLongName("dataset").withShortName("ds").withRequired(true) .withArgument(abuilder.withName("dataset").withMinimum(1).withMaximum(1).create()) .withDescription("Dataset path").create(); Option selectionOpt = obuilder.withLongName("selection").withShortName("sl").withRequired(false) .withArgument(abuilder.withName("m").withMinimum(1).withMaximum(1).create()) .withDescription("Optional, Number of variables to select randomly at each tree-node.\n" + "For classification problem, the default is square root of the number of explanatory variables.\n" + "For regression problem, the default is 1/3 of the number of explanatory variables.").create(); Option noCompleteOpt = obuilder.withLongName("no-complete").withShortName("nc").withRequired(false) .withDescription("Optional, The tree is not complemented").create(); Option minSplitOpt = obuilder.withLongName("minsplit").withShortName("ms").withRequired(false) .withArgument(abuilder.withName("minsplit").withMinimum(1).withMaximum(1).create()) .withDescription("Optional, The tree-node is not divided, if the branching data size is " + "smaller than this value.\nThe default is 2.").create(); Option minPropOpt = obuilder.withLongName("minprop").withShortName("mp").withRequired(false) .withArgument(abuilder.withName("minprop").withMinimum(1).withMaximum(1).create()) .withDescription("Optional, The tree-node is not divided, if the proportion of the " + "variance of branching data is smaller than this value.\n" + "In the case of a regression problem, this value is used. " + "The default is 1/1000(0.001).").create(); Option seedOpt = obuilder.withLongName("seed").withShortName("sd").withRequired(false) .withArgument(abuilder.withName("seed").withMinimum(1).withMaximum(1).create()) .withDescription("Optional, seed value used to initialise the Random number generator").create(); Option partialOpt = obuilder.withLongName("partial").withShortName("p").withRequired(false) .withDescription("Optional, use the Partial Data implementation").create(); Option nbtreesOpt = obuilder.withLongName("nbtrees").withShortName("t").withRequired(true) .withArgument(abuilder.withName("nbtrees").withMinimum(1).withMaximum(1).create()) .withDescription("Number of trees to grow").create(); Option outputOpt = obuilder.withLongName("output").withShortName("o").withRequired(true) .withArgument(abuilder.withName("path").withMinimum(1).withMaximum(1).create()) .withDescription("Output path, will contain the Decision Forest").create(); Option helpOpt = obuilder.withLongName("help").withShortName("h") .withDescription("Print out help").create(); Group group = gbuilder.withName("Options").withOption(dataOpt).withOption(datasetOpt) .withOption(selectionOpt).withOption(noCompleteOpt).withOption(minSplitOpt) .withOption(minPropOpt).withOption(seedOpt).withOption(partialOpt).withOption(nbtreesOpt) .withOption(outputOpt).withOption(helpOpt).create(); try { Parser parser = new Parser(); parser.setGroup(group); CommandLine cmdLine = parser.parse(args); if (cmdLine.hasOption("help")) { CommandLineUtil.printHelp(group); return -1; } isPartial = cmdLine.hasOption(partialOpt); String dataName = cmdLine.getValue(dataOpt).toString(); String datasetName = cmdLine.getValue(datasetOpt).toString(); String outputName = cmdLine.getValue(outputOpt).toString(); nbTrees = Integer.parseInt(cmdLine.getValue(nbtreesOpt).toString()); if (cmdLine.hasOption(selectionOpt)) { m = Integer.parseInt(cmdLine.getValue(selectionOpt).toString()); } complemented = !cmdLine.hasOption(noCompleteOpt); if (cmdLine.hasOption(minSplitOpt)) { minSplitNum = Integer.parseInt(cmdLine.getValue(minSplitOpt).toString()); } if (cmdLine.hasOption(minPropOpt)) { minVarianceProportion = Double.parseDouble(cmdLine.getValue(minPropOpt).toString()); } if (cmdLine.hasOption(seedOpt)) { seed = Long.valueOf(cmdLine.getValue(seedOpt).toString()); } if (log.isDebugEnabled()) { log.debug("data : {}", dataName); log.debug("dataset : {}", datasetName); log.debug("output : {}", outputName); log.debug("m : {}", m); log.debug("complemented : {}", complemented); log.debug("minSplitNum : {}", minSplitNum); log.debug("minVarianceProportion : {}", minVarianceProportion); log.debug("seed : {}", seed); log.debug("nbtrees : {}", nbTrees); log.debug("isPartial : {}", isPartial); } dataPath = new Path(dataName); datasetPath = new Path(datasetName); outputPath = new Path(outputName); } catch (OptionException e) { log.error("Exception", e); CommandLineUtil.printHelp(group); return -1; } buildForest(); return 0; }
After arguments parsing, it continues with buildForest method call as shown below, it instantiates either InMemoryBuilder or PartialBuilder MapReduce job and then runs it to build the random forest (DecisionForest).
DecisionForest forest = forestBuilder.build(nbTrees);
private void buildForest() throws IOException, ClassNotFoundException, InterruptedException { // make sure the output path does not exist FileSystem ofs = outputPath.getFileSystem(getConf()); if (ofs.exists(outputPath)) { log.error("Output path already exists"); return; } DecisionTreeBuilder treeBuilder = new DecisionTreeBuilder(); if (m != null) { treeBuilder.setM(m); } treeBuilder.setComplemented(complemented); if (minSplitNum != null) { treeBuilder.setMinSplitNum(minSplitNum); } if (minVarianceProportion != null) { treeBuilder.setMinVarianceProportion(minVarianceProportion); } Builder forestBuilder; if (isPartial) { log.info("Partial Mapred implementation"); forestBuilder = new PartialBuilder(treeBuilder, dataPath, datasetPath, seed, getConf()); } else { log.info("InMem Mapred implementation"); forestBuilder = new InMemBuilder(treeBuilder, dataPath, datasetPath, seed, getConf()); } forestBuilder.setOutputDirName(outputPath.getName()); log.info("Building the forest..."); long time = System.currentTimeMillis(); DecisionForest forest = forestBuilder.build(nbTrees); if (forest == null) { return; } time = System.currentTimeMillis() - time; log.info("Build Time: {}", DFUtils.elapsedTime(time)); log.info("Forest num Nodes: {}", forest.nbNodes()); log.info("Forest mean num Nodes: {}", forest.meanNbNodes()); log.info("Forest mean max Depth: {}", forest.meanMaxDepth()); // store the decision forest in the output path Path forestPath = new Path(outputPath, "forest.seq"); log.info("Storing the forest in: {}", forestPath); DFUtils.storeWritable(getConf(), forestPath, forest); }
Both InMemoryBuilder and PartialBuilder extend the abstract Builder class. The abstract class has the following required abstract methods to be implemented by any subclass.
protected abstract void configureJob(Job job) throws IOException; protected abstract DecisionForest parseOutput(Job job) throws IOException;
Next, we will look inside PartialBuilder, the MapReduce implementation of Random Forest job. It uses Step1Mapper as the Mapper implementation and does not have any reducer.
In each mapper task, it reads and converts each line of inputs into a dense vector and stores these vectors in an array list during the map phase.
At the end of the map phase, the mapper task starts to build a subset of trees using the input array list of vectors and then writes the these trees to the disk as the custom MapredOutput writables.
The number of trees to be built per mapper task is calculated as follows
int treesPerMapper = numTrees / numMaps;
/** * First step of the Partial Data Builder. Builds the trees using the data available in the InputSplit. * Predict the oob classes for each tree in its growing partition (input split). */ public class Step1Mapper extends MapredMapper<LongWritable,Text,TreeID,MapredOutput> { private static final Logger log = LoggerFactory.getLogger(Step1Mapper.class); /** used to convert input values to data instances */ private DataConverter converter; private Random rng; /** number of trees to be built by this mapper */ private int nbTrees; /** id of the first tree */ private int firstTreeId; /** mapper's partition */ private int partition; /** will contain all instances if this mapper's split */ private final List<Instance> instances = Lists.newArrayList(); public int getFirstTreeId() { return firstTreeId; } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); configure(Builder.getRandomSeed(conf), conf.getInt("mapred.task.partition", -1), Builder.getNumMaps(conf), Builder.getNbTrees(conf)); } /** * Useful when testing * * @param partition * current mapper inputSplit partition * @param numMapTasks * number of running map tasks * @param numTrees * total number of trees in the forest */ protected void configure(Long seed, int partition, int numMapTasks, int numTrees) { converter = new DataConverter(getDataset()); // prepare random-numders generator log.debug("seed : {}", seed); if (seed == null) { rng = RandomUtils.getRandom(); } else { rng = RandomUtils.getRandom(seed); } // mapper's partition Preconditions.checkArgument(partition >= 0, "Wrong partition ID: " + partition + ". Partition must be >= 0!"); this.partition = partition; // compute number of trees to build nbTrees = nbTrees(numMapTasks, numTrees, partition); // compute first tree id firstTreeId = 0; for (int p = 0; p < partition; p++) { firstTreeId += nbTrees(numMapTasks, numTrees, p); } log.debug("partition : {}", partition); log.debug("nbTrees : {}", nbTrees); log.debug("firstTreeId : {}", firstTreeId); } /** * Compute the number of trees for a given partition. The first partitions may be longer * than the rest because of the remainder. * * @param numMaps * total number of maps (partitions) * @param numTrees * total number of trees to build * @param partition * partition to compute the number of trees for */ public static int nbTrees(int numMaps, int numTrees, int partition) { int treesPerMapper = numTrees / numMaps; int remainder = numTrees - numMaps * treesPerMapper; return treesPerMapper + (partition < remainder ? 1 : 0); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { instances.add(converter.convert(value.toString())); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // prepare the data log.debug("partition: {} numInstances: {}", partition, instances.size()); Data data = new Data(getDataset(), instances); Bagging bagging = new Bagging(getTreeBuilder(), data); TreeID key = new TreeID(); log.debug("Building {} trees", nbTrees); for (int treeId = 0; treeId < nbTrees; treeId++) { log.debug("Building tree number : {}", treeId); Node tree = bagging.build(rng); key.set(partition, firstTreeId + treeId); if (isOutput()) { MapredOutput emOut = new MapredOutput(tree); context.write(key, emOut); } context.progress(); } } }
Bagging is used to create a random sample from the input data to build a decision tree. Take a look at the Bagging class below.
It delegates the task of bagging (create a random sample) to Data class. After that, it uses tree builder to build a decision tree given the bag of random sampled data.
treeBuilder.build(rng, bag);
/** * Builds a tree using bagging */ public class Bagging { private static final Logger log = LoggerFactory.getLogger(Bagging.class); private final TreeBuilder treeBuilder; private final Data data; private final boolean[] sampled; public Bagging(TreeBuilder treeBuilder, Data data) { this.treeBuilder = treeBuilder; this.data = data; sampled = new boolean[data.size()]; } /** * Builds one tree */ public Node build(Random rng) { log.debug("Bagging..."); Arrays.fill(sampled, false); Data bag = data.bagging(rng, sampled); log.debug("Building..."); return treeBuilder.build(rng, bag); } }
See the below org.apache.mahout.classifier.df.data.Data class. It has the following two methods to sample and return a bag of random samples.
/** * if data has N cases, sample N cases at random -but with replacement. */ public Data bagging(Random rng) { int datasize = size(); List<Instance> bag = Lists.newArrayListWithCapacity(datasize); for (int i = 0; i < datasize; i++) { bag.add(instances.get(rng.nextInt(datasize))); } return new Data(dataset, bag); } /** * if data has N cases, sample N cases at random -but with replacement. * * @param sampled * indicating which instance has been sampled * * @return sampled data */ public Data bagging(Random rng, boolean[] sampled) { int datasize = size(); List<Instance> bag = Lists.newArrayListWithCapacity(datasize); for (int i = 0; i < datasize; i++) { int index = rng.nextInt(datasize); bag.add(instances.get(index)); sampled[index] = true; } return new Data(dataset, bag); }
Once the decision forest is built by the PartialBuider MapReduce job, it can be used for classifying new Instance.
org.apache.mahout.classifier.df.DecisionForest is the Writable implementation of decision forest. It provides the following Classify method. For numerical output, it takes the average of all the trees’ outputs, sum / cnt.
As for categorical output, it returns the category which has the highest votes from all the trees.
/** * predicts the label for the instance * * @param rng * Random number generator, used to break ties randomly * @return NaN if the label cannot be predicted */ public double classify(Dataset dataset, Random rng, Instance instance) { if (dataset.isNumerical(dataset.getLabelId())) { double sum = 0; int cnt = 0; for (Node tree : trees) { double prediction = tree.classify(instance); if (!Double.isNaN(prediction)) { sum += prediction; cnt++; } } if (cnt > 0) { return sum / cnt; } else { return Double.NaN; } } else { int[] predictions = new int[dataset.nblabels()]; for (Node tree : trees) { double prediction = tree.classify(instance); if (!Double.isNaN(prediction)) { predictions[(int) prediction]++; } } if (DataUtils.sum(predictions) == 0) { return Double.NaN; // no prediction available } return DataUtils.maxindex(rng, predictions); } }
The node split criteria used in the Mahout’s is based on the idea of entropy and information gain. Stay tuned for more infos.
What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?
If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.
Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.
http://www.ibm.com/developerworks/library/j-zerocopy/
Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)
A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).
As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.
https://issues.apache.org/jira/browse/HADOOP-3164
Also, it uses the same technique in sending shuffle files. Please see
In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.
Spark-2468 : introduce the same transferTo technique in sending shuffle files.
In the last few posts about Hadoop MapTask spill mechanism, we learn that Hadoop uses an in memory buffer during the map task intermediate output writing phase. As the memory soft limit exceeded, it starts spilling the data to disk. This results in multiple spill files that are eventually merged together at the end of the map task into a big file. During the spilling process, the data in the memory buffer are first sorted by the partitions (each partition corresponds to one Reduce task) and then by the keys.
As of Spark 1.01 version, Spark Map Tasks write the output directly to disk on completion. There is no use of an in memory buffer. Each Map Task writes as many shuffle files as the number of Reduce Task. One shuffle file per Reduce Task. Eg. one could have 1000 Map Tasks (M) and 5000 Reduce Tasks (R), this results in 5 millions shuffle files. Spark does not however merge them into a single partitioned shuffle file as in Hadoop MapReduce. This number of file IO somehow affects the performance.
You can learn more about Spark shuffling from the following report.
http://www.cs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
In the above report, it also points out the potential memory issue when using compression on the Map output files.
eg. In a machine with C number of cores allocated to Spark, we have C concurrent Map Tasks, each Map Task is writing out R shuffle files, one per Reduce Task. The total number of memory usage would be C*R*400KB.
Here comes the good news. A new shuffling mechanism, called sort-based shuffling is implemented for upcoming Spark version 1.1.0. You can read the design document below and learn more about this issue in SPARK-2045 JIRA ticket.
https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf
This sort-based shuffling is quite similar but not the same as Hadoop MapReduce shuffling.
The following is the description of SPARK-2045:
“…a sort-based shuffle implementation that takes advantage of an Ordering for keys (or just sorts by hashcode for keys that don’t have it) would likely improve performance and memory usage in very large shuffles. Our current hash-based shuffle needs an open file for each reduce task, which can fill up a lot of memory for compression buffers and cause inefficient IO. This would avoid both of those issues.”
Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?
Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)
In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.
In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.
public interface MapOutputCollector<K, V> { public void init(Context context ) throws IOException, ClassNotFoundException; public void collect(K key, V value, int partition ) throws IOException, InterruptedException; public void close() throws IOException, InterruptedException; public void flush() throws IOException, InterruptedException, ClassNotFoundException; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable public static class Context { private final MapTask mapTask; private final JobConf jobConf; private final TaskReporter reporter; public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) { this.mapTask = mapTask; this.jobConf = jobConf; this.reporter = reporter; } public MapTask getMapTask() { return mapTask; } public JobConf getJobConf() { return jobConf; } public TaskReporter getReporter() { return reporter; } } }
In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.
private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector<KEY, VALUE>) ReflectionUtils.newInstance( job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class, MapOutputCollector.class), job); LOG.info("Map output collector class = " + collector.getClass().getName()); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); collector.init(context); return collector; }
In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).
//sanity checks final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; kvbuffer = new byte[maxMemUsage];
To be continued…..
In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.
You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.
/** * Event types handled by Job. */ public enum JobEventType { //Producer:Client JOB_KILL, //Producer:MRAppMaster JOB_INIT, JOB_START, //Producer:Task JOB_TASK_COMPLETED, JOB_MAP_TASK_RESCHEDULED, JOB_TASK_ATTEMPT_COMPLETED, //Producer:CommitterEventHandler JOB_SETUP_COMPLETED, JOB_SETUP_FAILED, JOB_COMMIT_COMPLETED, JOB_COMMIT_FAILED, JOB_ABORT_COMPLETED, //Producer:Job JOB_COMPLETED, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, INTERNAL_ERROR, JOB_COUNTER_UPDATE, //Producer:TaskAttemptListener JOB_TASK_ATTEMPT_FETCH_FAILURE, //Producer:RMContainerAllocator JOB_UPDATED_NODES }