Apache Mahout Random Forest

Random forest is a popular ensemble learning technique where a collection of decision trees are trained to make prediction. Apache Mahout, Spark MLLib, and H2O have all implemented the Random Forest algorithm. Today, we will look at Mahout’s MapReduce version of Random Forest.

To run Random Forest using Mahout, you can use the following command

hadoop jar mahout-examples-0.9-job.jar org.apache.mahout.classifier.df.mapreduce.BuildForest -Dmapred.max.split.size=1874231 -d data/YOUR_TRAINING_DATA.txt -ds DATASET_METADATA.info -sl 5 -t 100 -o YOUR_OUTPUT_FOLDER

If you look at the org.apache.mahout.classifier.df.mapreduce.BuildForest class below, it sets up the MapReduce job with the following available options.

–data –dataset –selection –no-complete –minsplit
–minprop –seed –partial –nbtrees


  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
    ArgumentBuilder abuilder = new ArgumentBuilder();
    GroupBuilder gbuilder = new GroupBuilder();
    Option dataOpt = obuilder.withLongName("data").withShortName("d").withRequired(true)
        .withDescription("Data path").create();
    Option datasetOpt = obuilder.withLongName("dataset").withShortName("ds").withRequired(true)
        .withDescription("Dataset path").create();
    Option selectionOpt = obuilder.withLongName("selection").withShortName("sl").withRequired(false)
        .withDescription("Optional, Number of variables to select randomly at each tree-node.\n" 
        + "For classification problem, the default is square root of the number of explanatory variables.\n"
        + "For regression problem, the default is 1/3 of the number of explanatory variables.").create();

    Option noCompleteOpt = obuilder.withLongName("no-complete").withShortName("nc").withRequired(false)
        .withDescription("Optional, The tree is not complemented").create();

    Option minSplitOpt = obuilder.withLongName("minsplit").withShortName("ms").withRequired(false)
        .withDescription("Optional, The tree-node is not divided, if the branching data size is " 
        + "smaller than this value.\nThe default is 2.").create();

    Option minPropOpt = obuilder.withLongName("minprop").withShortName("mp").withRequired(false)
        .withDescription("Optional, The tree-node is not divided, if the proportion of the " 
        + "variance of branching data is smaller than this value.\n"
        + "In the case of a regression problem, this value is used. "
        + "The default is 1/1000(0.001).").create();

    Option seedOpt = obuilder.withLongName("seed").withShortName("sd").withRequired(false)
        .withDescription("Optional, seed value used to initialise the Random number generator").create();
    Option partialOpt = obuilder.withLongName("partial").withShortName("p").withRequired(false)
        .withDescription("Optional, use the Partial Data implementation").create();
    Option nbtreesOpt = obuilder.withLongName("nbtrees").withShortName("t").withRequired(true)
        .withDescription("Number of trees to grow").create();
    Option outputOpt = obuilder.withLongName("output").withShortName("o").withRequired(true)
        .withDescription("Output path, will contain the Decision Forest").create();

    Option helpOpt = obuilder.withLongName("help").withShortName("h")
        .withDescription("Print out help").create();
    Group group = gbuilder.withName("Options").withOption(dataOpt).withOption(datasetOpt)
    try {
      Parser parser = new Parser();
      CommandLine cmdLine = parser.parse(args);
      if (cmdLine.hasOption("help")) {
        return -1;
      isPartial = cmdLine.hasOption(partialOpt);
      String dataName = cmdLine.getValue(dataOpt).toString();
      String datasetName = cmdLine.getValue(datasetOpt).toString();
      String outputName = cmdLine.getValue(outputOpt).toString();
      nbTrees = Integer.parseInt(cmdLine.getValue(nbtreesOpt).toString());
      if (cmdLine.hasOption(selectionOpt)) {
        m = Integer.parseInt(cmdLine.getValue(selectionOpt).toString());
      complemented = !cmdLine.hasOption(noCompleteOpt);
      if (cmdLine.hasOption(minSplitOpt)) {
        minSplitNum = Integer.parseInt(cmdLine.getValue(minSplitOpt).toString());
      if (cmdLine.hasOption(minPropOpt)) {
        minVarianceProportion = Double.parseDouble(cmdLine.getValue(minPropOpt).toString());
      if (cmdLine.hasOption(seedOpt)) {
        seed = Long.valueOf(cmdLine.getValue(seedOpt).toString());

      if (log.isDebugEnabled()) {
        log.debug("data : {}", dataName);
        log.debug("dataset : {}", datasetName);
        log.debug("output : {}", outputName);
        log.debug("m : {}", m);
        log.debug("complemented : {}", complemented);
        log.debug("minSplitNum : {}", minSplitNum);
        log.debug("minVarianceProportion : {}", minVarianceProportion);
        log.debug("seed : {}", seed);
        log.debug("nbtrees : {}", nbTrees);
        log.debug("isPartial : {}", isPartial);

      dataPath = new Path(dataName);
      datasetPath = new Path(datasetName);
      outputPath = new Path(outputName);
    } catch (OptionException e) {
      log.error("Exception", e);
      return -1;
    return 0;

After arguments parsing, it continues with buildForest method call as shown below, it instantiates either InMemoryBuilder or PartialBuilder MapReduce job and then runs it to build the random forest (DecisionForest).

DecisionForest forest = forestBuilder.build(nbTrees);

private void buildForest() throws IOException, ClassNotFoundException, InterruptedException {
    // make sure the output path does not exist
    FileSystem ofs = outputPath.getFileSystem(getConf());
    if (ofs.exists(outputPath)) {
      log.error("Output path already exists");

    DecisionTreeBuilder treeBuilder = new DecisionTreeBuilder();
    if (m != null) {
    if (minSplitNum != null) {
    if (minVarianceProportion != null) {
    Builder forestBuilder;
    if (isPartial) {
      log.info("Partial Mapred implementation");
      forestBuilder = new PartialBuilder(treeBuilder, dataPath, datasetPath, seed, getConf());
    } else {
      log.info("InMem Mapred implementation");
      forestBuilder = new InMemBuilder(treeBuilder, dataPath, datasetPath, seed, getConf());

    log.info("Building the forest...");
    long time = System.currentTimeMillis();
    DecisionForest forest = forestBuilder.build(nbTrees);
    if (forest == null) {
    time = System.currentTimeMillis() - time;
    log.info("Build Time: {}", DFUtils.elapsedTime(time));
    log.info("Forest num Nodes: {}", forest.nbNodes());
    log.info("Forest mean num Nodes: {}", forest.meanNbNodes());
    log.info("Forest mean max Depth: {}", forest.meanMaxDepth());

    // store the decision forest in the output path
    Path forestPath = new Path(outputPath, "forest.seq");
    log.info("Storing the forest in: {}", forestPath);
    DFUtils.storeWritable(getConf(), forestPath, forest);

Both InMemoryBuilder and PartialBuilder extend the abstract Builder class. The abstract class has the following required abstract methods to be implemented by any subclass.

protected abstract void configureJob(Job job) throws IOException;
protected abstract DecisionForest parseOutput(Job job) throws IOException;

Next, we will look inside PartialBuilder, the MapReduce implementation of Random Forest job. It uses Step1Mapper as the Mapper implementation and does not have any reducer.

In each mapper task, it reads and converts each line of inputs into a dense vector and stores these vectors in an array list during the map phase.

At the end of the map phase, the mapper task starts to build a subset of trees using the input array list of vectors and then writes the these trees to the disk as the custom MapredOutput writables.

The number of trees to be built per mapper task is calculated as follows

int treesPerMapper = numTrees / numMaps;

 * First step of the Partial Data Builder. Builds the trees using the data available in the InputSplit.
 * Predict the oob classes for each tree in its growing partition (input split).
public class Step1Mapper extends MapredMapper<LongWritable,Text,TreeID,MapredOutput> {
  private static final Logger log = LoggerFactory.getLogger(Step1Mapper.class);
  /** used to convert input values to data instances */
  private DataConverter converter;
  private Random rng;
  /** number of trees to be built by this mapper */
  private int nbTrees;
  /** id of the first tree */
  private int firstTreeId;
  /** mapper's partition */
  private int partition;
  /** will contain all instances if this mapper's split */
  private final List<Instance> instances = Lists.newArrayList();
  public int getFirstTreeId() {
    return firstTreeId;
  protected void setup(Context context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    configure(Builder.getRandomSeed(conf), conf.getInt("mapred.task.partition", -1),
      Builder.getNumMaps(conf), Builder.getNbTrees(conf));
   * Useful when testing
   * @param partition
   *          current mapper inputSplit partition
   * @param numMapTasks
   *          number of running map tasks
   * @param numTrees
   *          total number of trees in the forest
  protected void configure(Long seed, int partition, int numMapTasks, int numTrees) {
    converter = new DataConverter(getDataset());
    // prepare random-numders generator
    log.debug("seed : {}", seed);
    if (seed == null) {
      rng = RandomUtils.getRandom();
    } else {
      rng = RandomUtils.getRandom(seed);
    // mapper's partition
    Preconditions.checkArgument(partition >= 0, "Wrong partition ID: " + partition + ". Partition must be >= 0!");
    this.partition = partition;
    // compute number of trees to build
    nbTrees = nbTrees(numMapTasks, numTrees, partition);
    // compute first tree id
    firstTreeId = 0;
    for (int p = 0; p < partition; p++) {
      firstTreeId += nbTrees(numMapTasks, numTrees, p);
    log.debug("partition : {}", partition);
    log.debug("nbTrees : {}", nbTrees);
    log.debug("firstTreeId : {}", firstTreeId);
   * Compute the number of trees for a given partition. The first partitions may be longer
   * than the rest because of the remainder.
   * @param numMaps
   *          total number of maps (partitions)
   * @param numTrees
   *          total number of trees to build
   * @param partition
   *          partition to compute the number of trees for
  public static int nbTrees(int numMaps, int numTrees, int partition) {
    int treesPerMapper = numTrees / numMaps;
    int remainder = numTrees - numMaps * treesPerMapper;
    return treesPerMapper + (partition < remainder ? 1 : 0);
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  protected void cleanup(Context context) throws IOException, InterruptedException {
    // prepare the data
    log.debug("partition: {} numInstances: {}", partition, instances.size());
    Data data = new Data(getDataset(), instances);
    Bagging bagging = new Bagging(getTreeBuilder(), data);
    TreeID key = new TreeID();
    log.debug("Building {} trees", nbTrees);
    for (int treeId = 0; treeId < nbTrees; treeId++) {
      log.debug("Building tree number : {}", treeId);
      Node tree = bagging.build(rng);
      key.set(partition, firstTreeId + treeId);
      if (isOutput()) {
        MapredOutput emOut = new MapredOutput(tree);
        context.write(key, emOut);


Bagging is used to create a random sample from the input data to build a decision tree. Take a look at the Bagging class below.

It delegates the task of bagging (create a random sample) to Data class. After that, it uses tree builder to build a decision tree given the bag of random sampled data.
treeBuilder.build(rng, bag);

 * Builds a tree using bagging
public class Bagging {
  private static final Logger log = LoggerFactory.getLogger(Bagging.class);
  private final TreeBuilder treeBuilder;
  private final Data data;
  private final boolean[] sampled;
  public Bagging(TreeBuilder treeBuilder, Data data) {
    this.treeBuilder = treeBuilder;
    this.data = data;
    sampled = new boolean[data.size()];
   * Builds one tree
  public Node build(Random rng) {
    Arrays.fill(sampled, false);
    Data bag = data.bagging(rng, sampled);
    return treeBuilder.build(rng, bag);

See the below org.apache.mahout.classifier.df.data.Data class. It has the following two methods to sample and return a bag of random samples.

   * if data has N cases, sample N cases at random -but with replacement.
  public Data bagging(Random rng) {
    int datasize = size();
    List<Instance> bag = Lists.newArrayListWithCapacity(datasize);
    for (int i = 0; i < datasize; i++) {
    return new Data(dataset, bag);
   * if data has N cases, sample N cases at random -but with replacement.
   * @param sampled
   *          indicating which instance has been sampled
   * @return sampled data
  public Data bagging(Random rng, boolean[] sampled) {
    int datasize = size();
    List<Instance> bag = Lists.newArrayListWithCapacity(datasize);
    for (int i = 0; i < datasize; i++) {
      int index = rng.nextInt(datasize);
      sampled[index] = true;
    return new Data(dataset, bag);

Once the decision forest is built by the PartialBuider MapReduce job, it can be used for classifying new Instance.

org.apache.mahout.classifier.df.DecisionForest is the Writable implementation of decision forest. It provides the following Classify method. For numerical output, it takes the average of all the trees’ outputs, sum / cnt.
As for categorical output, it returns the category which has the highest votes from all the trees.

   * predicts the label for the instance
   * @param rng
   *          Random number generator, used to break ties randomly
   * @return NaN if the label cannot be predicted
  public double classify(Dataset dataset, Random rng, Instance instance) {
    if (dataset.isNumerical(dataset.getLabelId())) {
      double sum = 0;
      int cnt = 0;
      for (Node tree : trees) {
        double prediction = tree.classify(instance);
        if (!Double.isNaN(prediction)) {
          sum += prediction;

      if (cnt > 0) {
        return sum / cnt;
      } else {
        return Double.NaN;
    } else {
      int[] predictions = new int[dataset.nblabels()];
      for (Node tree : trees) {
        double prediction = tree.classify(instance);
        if (!Double.isNaN(prediction)) {
          predictions[(int) prediction]++;

      if (DataUtils.sum(predictions) == 0) {
        return Double.NaN; // no prediction available

      return DataUtils.maxindex(rng, predictions);

The node split criteria used in the Mahout’s is based on the idea of entropy and information gain. Stay tuned for more infos.

Distributed K-Means Clustering

K-Means clustering is the popular and easy to understand clustering algorithm. Both Spark MLlib and Apache Mahout have implemented the algorithm. However, Mahout MapReduce version of K-Means algorithm is slow compared to Spark. This is due to the huge IO involved in every iteration of the algorithm. Basically, at each iteration, the intermediate output results (clusters)  have to be written to the disk and then read in again together with input dataset from the disk. The good news is Apache Mahout has started the work to integrate with Spark.

Let’s look at the codes. KMeansDriver is the driver class. User can choose to run a MapReduce or sequential version of the algorithm. The program arguments are the path to the initial cluster centers, dataset input path,  convergence rate, distance measure implementation class, max number of iterations, and etc.

  public int run(String[] args) throws Exception {
            &quot;The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  &quot;
                + &quot;If k is also specified, then a random set of vectors will be selected&quot;
                + &quot; and written out to this path first&quot;).create());
            &quot;The k in k-Means.  If specified, then a random selection of k Vectors will be chosen&quot;
                + &quot; as the Centroid and written to the clusters input path.&quot;).create());
    if (parseArguments(args) == null) {
      return -1;
    Path input = getInputPath();
    Path clusters = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
    Path output = getOutputPath();
    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
    if (measureClass == null) {
      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
      HadoopUtil.delete(getConf(), output);
    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
    if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
      int numClusters = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));

      Long seed = null;
      if (hasOption(DefaultOptionCreator.RANDOM_SEED)) {
        seed = Long.parseLong(getOption(DefaultOptionCreator.RANDOM_SEED));

      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, numClusters, measure, seed);
    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
    double clusterClassificationThreshold = 0.0;
    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
    run(getConf(), input, clusters, output, convergenceDelta, maxIterations, runClustering,
        clusterClassificationThreshold, runSequential);
    return 0;

The main MapReduce clustering logic can be found below. As you seen, it is an iterative process where in each iteration, we create a MapReduce job that reads in the input vectors and previous clusters and start assign the input vectors to the closest cluster until either we reach convergence or hit the specified number of iterations. There is a lot of IO overhead here. Each iteration, the MapReduce job has to read the previous results which are the clusters and input vectors all over again. (Take a look at Kluster.java which represents a cluster with cluster id, the center and the associated distance measure)

   * Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
   * implementation
   * @param conf
   *          the Configuration
   * @param inPath
   *          a Path to input VectorWritables
   * @param priorPath
   *          a Path to the prior classifier
   * @param outPath
   *          a Path of output directory
   * @param numIterations
   *          the int number of iterations to perform
  public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
    Path clustersOut = null;
    int iteration = 1;
    while (iteration &lt;= numIterations) {
      conf.set(PRIOR_PATH_KEY, priorPath.toString());
      String jobName = &quot;Cluster Iterator running iteration &quot; + iteration + &quot; over priorPath: &quot; + priorPath;
      Job job = new Job(conf, jobName);
      FileInputFormat.addInputPath(job, inPath);
      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
      priorPath = clustersOut;
      FileOutputFormat.setOutputPath(job, clustersOut);
      if (!job.waitForCompletion(true)) {
        throw new InterruptedException(&quot;Cluster Iteration &quot; + iteration + &quot; failed processing &quot; + priorPath);
      ClusterClassifier.writePolicy(policy, clustersOut);
      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
      if (isConverged(clustersOut, conf, fs)) {
    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);

Stay tuned for more infos about the Mapper and Reducer implementation of the algorithm.