Randomness in Random Forest

In the last post, we looked at the bagging process where a random sample of input is used to build each individual tree. This is one of the two types of randomness used in the algorithm. The second kind of randomness is used in the node splitting process. During the process, a random set of m attributes out of all attributes is used to compute the best split based on information gain.

If m is not specified in the algorithm, then the following logic is used to determine m.

For regression tree,

m = (int) Math.ceil(total_number_attributes / 3.0);

For classification tree,

m = (int) Math.ceil(Math.sqrt(total_number_attributes));

If you take a look at the Apache Mahout DecisionTreeBuilder, you will find the randomAttributes method below. This method selects m attributes to consider for split.

org.apache.mahout.classifier.df.builder.DecisionTreeBuilder

/**
   * Randomly selects m attributes to consider for split, excludes IGNORED and LABEL attributes
   *
   * @param rng      random-numbers generator
   * @param selected attributes' state (selected or not)
   * @param m        number of attributes to choose
   * @return list of selected attributes' indices, or null if all attributes have already been selected
   */
  private static int[] randomAttributes(Random rng, boolean[] selected, int m) {
    int nbNonSelected = 0; // number of non selected attributes
    for (boolean sel : selected) {
      if (!sel) {
        nbNonSelected++;
      }
    }

    if (nbNonSelected == 0) {
      log.warn("All attributes are selected !");
      return NO_ATTRIBUTES;
    }

    int[] result;
    if (nbNonSelected <= m) {
      // return all non selected attributes
      result = new int[nbNonSelected];
      int index = 0;
      for (int attr = 0; attr < selected.length; attr++) {
        if (!selected[attr]) {
          result[index++] = attr;
        }
      }
    } else {
      result = new int[m];
      for (int index = 0; index < m; index++) {
        // randomly choose a "non selected" attribute
        int rind;
        do {
          rind = rng.nextInt(selected.length);
        } while (selected[rind]);

        result[index] = rind;
        selected[rind] = true; // temporarily set the chosen attribute to be selected
      }

      // the chosen attributes are not yet selected
      for (int attr : result) {
        selected[attr] = false;
      }
    }

    return result;
  }

Given the list of chosen attributes, it loops through each one and computes the information gains for different splits generated by using different attributes and try to find the best split.


 // find the best split
    Split best = null;
    for (int attr : attributes) {
      Split split = igSplit.computeSplit(data, attr);
      if (best == null || best.getIg() < split.getIg()) {
        best = split;
      }
    }

After getting the best split, if the information gain is less than the specified threshold, then we stop and create the leaf node with the label.
For regression, the label is the regression value which is calculated by taking the average of all the values of the sample data set. For classification, we use the majority label, the value that appears the most.

 // information gain is near to zero.
    if (best.getIg() < EPSILON) {
      double label;
      if (data.getDataset().isNumerical(data.getDataset().getLabelId())) {
        label = sum / data.size();
      } else {
        label = data.majorityLabel(rng);
      }
      log.debug("ig is near to zero Leaf({})", label);
      return new Leaf(label);
    }

Apache Mahout Random Forest

Random forest is a popular ensemble learning technique where a collection of decision trees are trained to make prediction. Apache Mahout, Spark MLLib, and H2O have all implemented the Random Forest algorithm. Today, we will look at Mahout’s MapReduce version of Random Forest.

To run Random Forest using Mahout, you can use the following command

hadoop jar mahout-examples-0.9-job.jar org.apache.mahout.classifier.df.mapreduce.BuildForest -Dmapred.max.split.size=1874231 -d data/YOUR_TRAINING_DATA.txt -ds DATASET_METADATA.info -sl 5 -t 100 -o YOUR_OUTPUT_FOLDER

If you look at the org.apache.mahout.classifier.df.mapreduce.BuildForest class below, it sets up the MapReduce job with the following available options.

–data –dataset –selection –no-complete –minsplit
–minprop –seed –partial –nbtrees
–output

org.apache.mahout.classifier.df.mapreduce.BuildForest


@Override
  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
    ArgumentBuilder abuilder = new ArgumentBuilder();
    GroupBuilder gbuilder = new GroupBuilder();
    
    Option dataOpt = obuilder.withLongName("data").withShortName("d").withRequired(true)
        .withArgument(abuilder.withName("path").withMinimum(1).withMaximum(1).create())
        .withDescription("Data path").create();
    
    Option datasetOpt = obuilder.withLongName("dataset").withShortName("ds").withRequired(true)
        .withArgument(abuilder.withName("dataset").withMinimum(1).withMaximum(1).create())
        .withDescription("Dataset path").create();
    
    Option selectionOpt = obuilder.withLongName("selection").withShortName("sl").withRequired(false)
        .withArgument(abuilder.withName("m").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, Number of variables to select randomly at each tree-node.\n" 
        + "For classification problem, the default is square root of the number of explanatory variables.\n"
        + "For regression problem, the default is 1/3 of the number of explanatory variables.").create();

    Option noCompleteOpt = obuilder.withLongName("no-complete").withShortName("nc").withRequired(false)
        .withDescription("Optional, The tree is not complemented").create();

    Option minSplitOpt = obuilder.withLongName("minsplit").withShortName("ms").withRequired(false)
        .withArgument(abuilder.withName("minsplit").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, The tree-node is not divided, if the branching data size is " 
        + "smaller than this value.\nThe default is 2.").create();

    Option minPropOpt = obuilder.withLongName("minprop").withShortName("mp").withRequired(false)
        .withArgument(abuilder.withName("minprop").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, The tree-node is not divided, if the proportion of the " 
        + "variance of branching data is smaller than this value.\n"
        + "In the case of a regression problem, this value is used. "
        + "The default is 1/1000(0.001).").create();

    Option seedOpt = obuilder.withLongName("seed").withShortName("sd").withRequired(false)
        .withArgument(abuilder.withName("seed").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, seed value used to initialise the Random number generator").create();
    
    Option partialOpt = obuilder.withLongName("partial").withShortName("p").withRequired(false)
        .withDescription("Optional, use the Partial Data implementation").create();
    
    Option nbtreesOpt = obuilder.withLongName("nbtrees").withShortName("t").withRequired(true)
        .withArgument(abuilder.withName("nbtrees").withMinimum(1).withMaximum(1).create())
        .withDescription("Number of trees to grow").create();
    
    Option outputOpt = obuilder.withLongName("output").withShortName("o").withRequired(true)
        .withArgument(abuilder.withName("path").withMinimum(1).withMaximum(1).create())
        .withDescription("Output path, will contain the Decision Forest").create();

    Option helpOpt = obuilder.withLongName("help").withShortName("h")
        .withDescription("Print out help").create();
    
    Group group = gbuilder.withName("Options").withOption(dataOpt).withOption(datasetOpt)
        .withOption(selectionOpt).withOption(noCompleteOpt).withOption(minSplitOpt)
        .withOption(minPropOpt).withOption(seedOpt).withOption(partialOpt).withOption(nbtreesOpt)
        .withOption(outputOpt).withOption(helpOpt).create();
    
    try {
      Parser parser = new Parser();
      parser.setGroup(group);
      CommandLine cmdLine = parser.parse(args);
      
      if (cmdLine.hasOption("help")) {
        CommandLineUtil.printHelp(group);
        return -1;
      }
      
      isPartial = cmdLine.hasOption(partialOpt);
      String dataName = cmdLine.getValue(dataOpt).toString();
      String datasetName = cmdLine.getValue(datasetOpt).toString();
      String outputName = cmdLine.getValue(outputOpt).toString();
      nbTrees = Integer.parseInt(cmdLine.getValue(nbtreesOpt).toString());
      
      if (cmdLine.hasOption(selectionOpt)) {
        m = Integer.parseInt(cmdLine.getValue(selectionOpt).toString());
      }
      complemented = !cmdLine.hasOption(noCompleteOpt);
      if (cmdLine.hasOption(minSplitOpt)) {
        minSplitNum = Integer.parseInt(cmdLine.getValue(minSplitOpt).toString());
      }
      if (cmdLine.hasOption(minPropOpt)) {
        minVarianceProportion = Double.parseDouble(cmdLine.getValue(minPropOpt).toString());
      }
      if (cmdLine.hasOption(seedOpt)) {
        seed = Long.valueOf(cmdLine.getValue(seedOpt).toString());
      }

      if (log.isDebugEnabled()) {
        log.debug("data : {}", dataName);
        log.debug("dataset : {}", datasetName);
        log.debug("output : {}", outputName);
        log.debug("m : {}", m);
        log.debug("complemented : {}", complemented);
        log.debug("minSplitNum : {}", minSplitNum);
        log.debug("minVarianceProportion : {}", minVarianceProportion);
        log.debug("seed : {}", seed);
        log.debug("nbtrees : {}", nbTrees);
        log.debug("isPartial : {}", isPartial);
      }

      dataPath = new Path(dataName);
      datasetPath = new Path(datasetName);
      outputPath = new Path(outputName);
      
    } catch (OptionException e) {
      log.error("Exception", e);
      CommandLineUtil.printHelp(group);
      return -1;
    }
    
    buildForest();
    
    return 0;
  }

After arguments parsing, it continues with buildForest method call as shown below, it instantiates either InMemoryBuilder or PartialBuilder MapReduce job and then runs it to build the random forest (DecisionForest).

DecisionForest forest = forestBuilder.build(nbTrees);


private void buildForest() throws IOException, ClassNotFoundException, InterruptedException {
    // make sure the output path does not exist
    FileSystem ofs = outputPath.getFileSystem(getConf());
    if (ofs.exists(outputPath)) {
      log.error("Output path already exists");
      return;
    }

    DecisionTreeBuilder treeBuilder = new DecisionTreeBuilder();
    if (m != null) {
      treeBuilder.setM(m);
    }
    treeBuilder.setComplemented(complemented);
    if (minSplitNum != null) {
      treeBuilder.setMinSplitNum(minSplitNum);
    }
    if (minVarianceProportion != null) {
      treeBuilder.setMinVarianceProportion(minVarianceProportion);
    }
    
    Builder forestBuilder;
    
    if (isPartial) {
      log.info("Partial Mapred implementation");
      forestBuilder = new PartialBuilder(treeBuilder, dataPath, datasetPath, seed, getConf());
    } else {
      log.info("InMem Mapred implementation");
      forestBuilder = new InMemBuilder(treeBuilder, dataPath, datasetPath, seed, getConf());
    }

    forestBuilder.setOutputDirName(outputPath.getName());
    
    log.info("Building the forest...");
    long time = System.currentTimeMillis();
    
    DecisionForest forest = forestBuilder.build(nbTrees);
    if (forest == null) {
      return;
    }
    
    time = System.currentTimeMillis() - time;
    log.info("Build Time: {}", DFUtils.elapsedTime(time));
    log.info("Forest num Nodes: {}", forest.nbNodes());
    log.info("Forest mean num Nodes: {}", forest.meanNbNodes());
    log.info("Forest mean max Depth: {}", forest.meanMaxDepth());

    // store the decision forest in the output path
    Path forestPath = new Path(outputPath, "forest.seq");
    log.info("Storing the forest in: {}", forestPath);
    DFUtils.storeWritable(getConf(), forestPath, forest);
  }
  
 

Both InMemoryBuilder and PartialBuilder extend the abstract Builder class. The abstract class has the following required abstract methods to be implemented by any subclass.

protected abstract void configureJob(Job job) throws IOException;
 
protected abstract DecisionForest parseOutput(Job job) throws IOException;

Next, we will look inside PartialBuilder, the MapReduce implementation of Random Forest job. It uses Step1Mapper as the Mapper implementation and does not have any reducer.

In each mapper task, it reads and converts each line of inputs into a dense vector and stores these vectors in an array list during the map phase.

At the end of the map phase, the mapper task starts to build a subset of trees using the input array list of vectors and then writes the these trees to the disk as the custom MapredOutput writables.

The number of trees to be built per mapper task is calculated as follows

int treesPerMapper = numTrees / numMaps;

/**
 * First step of the Partial Data Builder. Builds the trees using the data available in the InputSplit.
 * Predict the oob classes for each tree in its growing partition (input split).
 */
public class Step1Mapper extends MapredMapper<LongWritable,Text,TreeID,MapredOutput> {
  
  private static final Logger log = LoggerFactory.getLogger(Step1Mapper.class);
  
  /** used to convert input values to data instances */
  private DataConverter converter;
  
  private Random rng;
  
  /** number of trees to be built by this mapper */
  private int nbTrees;
  
  /** id of the first tree */
  private int firstTreeId;
  
  /** mapper's partition */
  private int partition;
  
  /** will contain all instances if this mapper's split */
  private final List<Instance> instances = Lists.newArrayList();
  
  public int getFirstTreeId() {
    return firstTreeId;
  }
  
  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    
    configure(Builder.getRandomSeed(conf), conf.getInt("mapred.task.partition", -1),
      Builder.getNumMaps(conf), Builder.getNbTrees(conf));
  }
  
  /**
   * Useful when testing
   * 
   * @param partition
   *          current mapper inputSplit partition
   * @param numMapTasks
   *          number of running map tasks
   * @param numTrees
   *          total number of trees in the forest
   */
  protected void configure(Long seed, int partition, int numMapTasks, int numTrees) {
    converter = new DataConverter(getDataset());
    
    // prepare random-numders generator
    log.debug("seed : {}", seed);
    if (seed == null) {
      rng = RandomUtils.getRandom();
    } else {
      rng = RandomUtils.getRandom(seed);
    }
    
    // mapper's partition
    Preconditions.checkArgument(partition >= 0, "Wrong partition ID: " + partition + ". Partition must be >= 0!");
    this.partition = partition;
    
    // compute number of trees to build
    nbTrees = nbTrees(numMapTasks, numTrees, partition);
    
    // compute first tree id
    firstTreeId = 0;
    for (int p = 0; p < partition; p++) {
      firstTreeId += nbTrees(numMapTasks, numTrees, p);
    }
    
    log.debug("partition : {}", partition);
    log.debug("nbTrees : {}", nbTrees);
    log.debug("firstTreeId : {}", firstTreeId);
  }
  
  /**
   * Compute the number of trees for a given partition. The first partitions may be longer
   * than the rest because of the remainder.
   * 
   * @param numMaps
   *          total number of maps (partitions)
   * @param numTrees
   *          total number of trees to build
   * @param partition
   *          partition to compute the number of trees for
   */
  public static int nbTrees(int numMaps, int numTrees, int partition) {
    int treesPerMapper = numTrees / numMaps;
    int remainder = numTrees - numMaps * treesPerMapper;
    return treesPerMapper + (partition < remainder ? 1 : 0);
  }
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    instances.add(converter.convert(value.toString()));
  }
  
  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    // prepare the data
    log.debug("partition: {} numInstances: {}", partition, instances.size());
    
    Data data = new Data(getDataset(), instances);
    Bagging bagging = new Bagging(getTreeBuilder(), data);
    
    TreeID key = new TreeID();
    
    log.debug("Building {} trees", nbTrees);
    for (int treeId = 0; treeId < nbTrees; treeId++) {
      log.debug("Building tree number : {}", treeId);
      
      Node tree = bagging.build(rng);
      
      key.set(partition, firstTreeId + treeId);
      
      if (isOutput()) {
        MapredOutput emOut = new MapredOutput(tree);
        context.write(key, emOut);
      }

      context.progress();
    }
  }
  
}

Bagging is used to create a random sample from the input data to build a decision tree. Take a look at the Bagging class below.

It delegates the task of bagging (create a random sample) to Data class. After that, it uses tree builder to build a decision tree given the bag of random sampled data.
treeBuilder.build(rng, bag);


/**
 * Builds a tree using bagging
 */
public class Bagging {
  
  private static final Logger log = LoggerFactory.getLogger(Bagging.class);
  
  private final TreeBuilder treeBuilder;
  
  private final Data data;
  
  private final boolean[] sampled;
  
  public Bagging(TreeBuilder treeBuilder, Data data) {
    this.treeBuilder = treeBuilder;
    this.data = data;
    sampled = new boolean[data.size()];
  }
  
  /**
   * Builds one tree
   */
  public Node build(Random rng) {
    log.debug("Bagging...");
    Arrays.fill(sampled, false);
    Data bag = data.bagging(rng, sampled);
    
    log.debug("Building...");
    return treeBuilder.build(rng, bag);
  }
  
}

See the below org.apache.mahout.classifier.df.data.Data class. It has the following two methods to sample and return a bag of random samples.


   /**
   * if data has N cases, sample N cases at random -but with replacement.
   */
  public Data bagging(Random rng) {
    int datasize = size();
    List<Instance> bag = Lists.newArrayListWithCapacity(datasize);
    
    for (int i = 0; i < datasize; i++) {
      bag.add(instances.get(rng.nextInt(datasize)));
    }
    
    return new Data(dataset, bag);
  }
  
  /**
   * if data has N cases, sample N cases at random -but with replacement.
   * 
   * @param sampled
   *          indicating which instance has been sampled
   * 
   * @return sampled data
   */
  public Data bagging(Random rng, boolean[] sampled) {
    int datasize = size();
    List<Instance> bag = Lists.newArrayListWithCapacity(datasize);
    
    for (int i = 0; i < datasize; i++) {
      int index = rng.nextInt(datasize);
      bag.add(instances.get(index));
      sampled[index] = true;
    }
    
    return new Data(dataset, bag);
  }
  

Once the decision forest is built by the PartialBuider MapReduce job, it can be used for classifying new Instance.

org.apache.mahout.classifier.df.DecisionForest is the Writable implementation of decision forest. It provides the following Classify method. For numerical output, it takes the average of all the trees’ outputs, sum / cnt.
As for categorical output, it returns the category which has the highest votes from all the trees.


 /**
   * predicts the label for the instance
   * 
   * @param rng
   *          Random number generator, used to break ties randomly
   * @return NaN if the label cannot be predicted
   */
  public double classify(Dataset dataset, Random rng, Instance instance) {
    if (dataset.isNumerical(dataset.getLabelId())) {
      double sum = 0;
      int cnt = 0;
      for (Node tree : trees) {
        double prediction = tree.classify(instance);
        if (!Double.isNaN(prediction)) {
          sum += prediction;
          cnt++;
        }
      }

      if (cnt > 0) {
        return sum / cnt;
      } else {
        return Double.NaN;
      }
    } else {
      int[] predictions = new int[dataset.nblabels()];
      for (Node tree : trees) {
        double prediction = tree.classify(instance);
        if (!Double.isNaN(prediction)) {
          predictions[(int) prediction]++;
        }
      }

      if (DataUtils.sum(predictions) == 0) {
        return Double.NaN; // no prediction available
      }

      return DataUtils.maxindex(rng, predictions);
    }
  }

The node split criteria used in the Mahout’s is based on the idea of entropy and information gain. Stay tuned for more infos.

Distributed K-Means Clustering

K-Means clustering is the popular and easy to understand clustering algorithm. Both Spark MLlib and Apache Mahout have implemented the algorithm. However, Mahout MapReduce version of K-Means algorithm is slow compared to Spark. This is due to the huge IO involved in every iteration of the algorithm. Basically, at each iteration, the intermediate output results (clusters)  have to be written to the disk and then read in again together with input dataset from the disk. The good news is Apache Mahout has started the work to integrate with Spark.

Let’s look at the codes. KMeansDriver is the driver class. User can choose to run a MapReduce or sequential version of the algorithm. The program arguments are the path to the initial cluster centers, dataset input path,  convergence rate, distance measure implementation class, max number of iterations, and etc.

@Override
  public int run(String[] args) throws Exception {
    
    addInputOption();
    addOutputOption();
    addOption(DefaultOptionCreator.distanceMeasureOption().create());
    addOption(DefaultOptionCreator
        .clustersInOption()
        .withDescription(
            &quot;The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  &quot;
                + &quot;If k is also specified, then a random set of vectors will be selected&quot;
                + &quot; and written out to this path first&quot;).create());
    addOption(DefaultOptionCreator
        .numClustersOption()
        .withDescription(
            &quot;The k in k-Means.  If specified, then a random selection of k Vectors will be chosen&quot;
                + &quot; as the Centroid and written to the clusters input path.&quot;).create());
    addOption(DefaultOptionCreator.useSetRandomSeedOption().create());
    addOption(DefaultOptionCreator.convergenceOption().create());
    addOption(DefaultOptionCreator.maxIterationsOption().create());
    addOption(DefaultOptionCreator.overwriteOption().create());
    addOption(DefaultOptionCreator.clusteringOption().create());
    addOption(DefaultOptionCreator.methodOption().create());
    addOption(DefaultOptionCreator.outlierThresholdOption().create());
   
    if (parseArguments(args) == null) {
      return -1;
    }
    
    Path input = getInputPath();
    Path clusters = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
    Path output = getOutputPath();
    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
    if (measureClass == null) {
      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
    }
    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
      HadoopUtil.delete(getConf(), output);
    }
    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
    
    if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
      int numClusters = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));

      Long seed = null;
      if (hasOption(DefaultOptionCreator.RANDOM_SEED)) {
        seed = Long.parseLong(getOption(DefaultOptionCreator.RANDOM_SEED));
      }

      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, numClusters, measure, seed);
    }
    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
        DefaultOptionCreator.SEQUENTIAL_METHOD);
    double clusterClassificationThreshold = 0.0;
    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
    }
    run(getConf(), input, clusters, output, convergenceDelta, maxIterations, runClustering,
        clusterClassificationThreshold, runSequential);
    return 0;
  }

The main MapReduce clustering logic can be found below. As you seen, it is an iterative process where in each iteration, we create a MapReduce job that reads in the input vectors and previous clusters and start assign the input vectors to the closest cluster until either we reach convergence or hit the specified number of iterations. There is a lot of IO overhead here. Each iteration, the MapReduce job has to read the previous results which are the clusters and input vectors all over again. (Take a look at Kluster.java which represents a cluster with cluster id, the center and the associated distance measure)


/**
   * Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
   * implementation
   * 
   * @param conf
   *          the Configuration
   * @param inPath
   *          a Path to input VectorWritables
   * @param priorPath
   *          a Path to the prior classifier
   * @param outPath
   *          a Path of output directory
   * @param numIterations
   *          the int number of iterations to perform
   */
  public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
    Path clustersOut = null;
    int iteration = 1;
    while (iteration &lt;= numIterations) {
      conf.set(PRIOR_PATH_KEY, priorPath.toString());
      
      String jobName = &quot;Cluster Iterator running iteration &quot; + iteration + &quot; over priorPath: &quot; + priorPath;
      Job job = new Job(conf, jobName);
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(ClusterWritable.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(ClusterWritable.class);
      
      job.setInputFormatClass(SequenceFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);
      job.setMapperClass(CIMapper.class);
      job.setReducerClass(CIReducer.class);
      
      FileInputFormat.addInputPath(job, inPath);
      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
      priorPath = clustersOut;
      FileOutputFormat.setOutputPath(job, clustersOut);
      
      job.setJarByClass(ClusterIterator.class);
      if (!job.waitForCompletion(true)) {
        throw new InterruptedException(&quot;Cluster Iteration &quot; + iteration + &quot; failed processing &quot; + priorPath);
      }
      ClusterClassifier.writePolicy(policy, clustersOut);
      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
      iteration++;
      if (isConverged(clustersOut, conf, fs)) {
        break;
      }
    }
    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
  }

Stay tuned for more infos about the Mapper and Reducer implementation of the algorithm.

Cache Management in HDFS

The popularity of Spark has spurred  great interest in providing memory cache capability in HDFS. With the new release of Hadoop 2.3.0, centralized cache management has been introduced.

See the following JIRA.

https://issues.apache.org/jira/browse/HDFS-4949

You can view the design doc here. It is an excellent read. Kudos to both Andrew Wang and Colin Patrick McCabe. They are the instrumental and key contributors of this feature.

This cache memory feature allows memory local HDFS clients to use zero copy read (mlock native calls) to access these memory cached data to further speed up read operation. The mlock and mlockall native calls tell the system to lock to a specified memory range. This prevents the allocated memory from page fault.

You can learn about mlock from the following page.

https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_MRG/1.3/html/Realtime_Reference_Guide/sect-Realtime_Reference_Guide-Memory_allocation-Using_mlock_to_avoid_memory_faults.html

You can also find the Java native IO mlock and munlock call to memory mapped data in org.apache.hadoop.io.nativeio.NativeIO.java

https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

and the corresponding C native code
https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c

/**
* Used to manipulate the operating system cache.
*/
@VisibleForTesting
public static class CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
POSIX.mlock(buffer, len);
}

public long getMemlockLimit() {
return NativeIO.getMemlockLimit();
}

public long getOperatingSystemPageSize() {
return NativeIO.getOperatingSystemPageSize();
}

public void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}

public boolean verifyCanMlock() {
return NativeIO.isAvailable();
}
}

NameNode has to keep track of the memory cache replicas at different DataNodes with this feature. The cache blocks infos are exchanged between them via heartbeat protocol. As a result, additional metadata e.g.. cachedHosts array is introduced in BlockLocation as seen below.


public class BlockLocation {
  private String[] hosts; // Datanode hostnames
  <strong>private String[] cachedHosts; // Datanode hostnames with a cached replica</strong>
  private String[] names; // Datanode IP:xferPort for accessing the block
  private String[] topologyPaths; // Full path name in network topology
  private long offset;  // Offset of the block in the file
  private long length;
  private boolean corrupt;

Stay tuned for more infos…

Quick and Easy Macbook Pro Development Setup

I just bought a new Macbook Pro to replace my old one. I documented the following steps to set it up for Java development.

1) Download iTerm2

2) Download color schemes for iTerm2
4) Set up JAVA_HOME environment variable. On terminal, type the following commands
a) echo export “JAVA_HOME=\$(/usr/libexec/java_home)” >> ~/.bash_profile
b) source ~/.bash_profile
5) Install homebrew
on terminal, run the following command
6) Install maven
brew install maven
Run the following command to check the maven version
mvn -version
7) Download and Install Eclipse Kepler
Download the google java coding style xml for Eclipse
Run the command
svn checkout http://google-styleguide.googlecode.com/svn/trunk/ google-styleguide-read-only
You will find the eclipse java style xml at
google-styleguide-read-only/eclipse-java-google-style.xml
Go to Eclipse, and install the above eclipse-java-google-style.xml. Follow the below steps
Click on Window -> Preferences -> Java -> Code Style -> Formatter
import the google code style
  • To install Eclipse Color Themes, get the following plugin  http://eclipsecolorthemes.org/?view=plugin
8) Install Git
brew install git
9) Install Maven
brew install maven
10) Install Tmux
brew install tmux
You could use the following handy tmux.conf provided by Christian Pelczarski
You might also want to use to following useful applications on your Mac
Dropbox, Evernote, Wunderlist

Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

Shuffling in Spark vs Hadoop MapReduce

In the last few posts about Hadoop MapTask spill mechanism, we learn that Hadoop uses an in memory buffer during the map task intermediate output writing phase. As the memory soft limit exceeded, it starts spilling the data to disk. This results in multiple spill files that are eventually merged together at the end of the map task into a big file. During the spilling process, the data in the memory buffer are first sorted by the partitions (each partition corresponds to one Reduce task)  and then by the keys.

As of Spark  1.01 version, Spark Map Tasks write the output directly to disk on completion. There is no use of an in memory buffer. Each Map Task writes as many shuffle files as the number of Reduce Task. One shuffle file per Reduce Task. Eg. one could have 1000 Map Tasks  (M) and 5000 Reduce Tasks (R), this results in 5 millions shuffle files. Spark does not however merge them into a single partitioned shuffle file as in Hadoop MapReduce. This number of file IO somehow affects the performance.

You can learn more about Spark shuffling from the following report.

http://www.cs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf

In the above report, it also points out the potential memory issue when using compression on the Map output files.

eg. In a machine with C number of cores allocated to Spark, we have C concurrent Map Tasks, each Map Task is writing out R shuffle files, one per Reduce Task. The total number of memory usage would be C*R*400KB.

Here comes the good news. A new shuffling mechanism, called sort-based shuffling is implemented for upcoming Spark version 1.1.0. You can read the design document below and learn more about this issue in SPARK-2045 JIRA ticket.

https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf

This sort-based shuffling is quite similar but not the same as Hadoop MapReduce shuffling.

The following is the description of SPARK-2045:

“…a sort-based shuffle implementation that takes advantage of an Ordering for keys (or just sorts by hashcode for keys that don’t have it) would likely improve performance and memory usage in very large shuffles. Our current hash-based shuffle needs an open file for each reduce task, which can fill up a lot of memory for compression buffers and cause inefficient IO. This would avoid both of those issues.”

Hadoop MapTask Spill Mechanism Part 2

In the last post, we looked at MapOutputBuffer class. This is the class that maintains the in memory buffer byte array, byte[] kvbuffer during the map task intermediate output writing phase. As the buffer exceeds the threshold, it starts spilling data to the disk.

Inside MapOutputBuffer, there is a thread called SpillThread.

    final ReentrantLock spillLock = new ReentrantLock();
    final Condition spillDone = spillLock.newCondition();
    final Condition spillReady = spillLock.newCondition();
    final BlockingBuffer bb = new BlockingBuffer();
    volatile boolean spillThreadRunning = false;
    final SpillThread spillThread = new SpillThread();

It is a daemon thread and is initiated in the init method of MapOutputBuffer.


public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {

      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }

}

SpillThread is signalled to write out the data to disk (startSpill()) whenever the memory buffer soft limit is exceeded (if bufferRemaining <= 0 ) as the MapTask is writing the intermediate output in the buffer memory.


 public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {


if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }



}

Hope you all enjoy this post !

Hadoop MapTask Spill Mechanism

Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?

Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)

In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.

In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.

public interface MapOutputCollector<K, V> {
  public void init(Context context
                  ) throws IOException, ClassNotFoundException;
  public void collect(K key, V value, int partition
                     ) throws IOException, InterruptedException;
  public void close() throws IOException, InterruptedException;

  public void flush() throws IOException, InterruptedException,
                             ClassNotFoundException;

  @InterfaceAudience.LimitedPrivate({"MapReduce"})
  @InterfaceStability.Unstable
  public static class Context {
    private final MapTask mapTask;
    private final JobConf jobConf;
    private final TaskReporter reporter;

    public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
      this.mapTask = mapTask;
      this.jobConf = jobConf;
      this.reporter = reporter;
    }

    public MapTask getMapTask() {
      return mapTask;
    }

    public JobConf getJobConf() {
      return jobConf;
    }

    public TaskReporter getReporter() {
      return reporter;
    }
  }
}

In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector<KEY, VALUE> collector
      = (MapOutputCollector<KEY, VALUE>)
       ReflectionUtils.newInstance(
                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                        MapOutputBuffer.class, MapOutputCollector.class), job);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    MapOutputCollector.Context context =
                           new MapOutputCollector.Context(this, job, reporter);
    collector.init(context);
    return collector;
  }

In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).

//sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];

To be continued…..

HBase Pluggable Store, Memstore, StoreEngine

I was browsing the HBase codebase and noticed that many components in regionserver module (org.apache.hadoop.hbase.regionserver) had been refactored to become pluggable. This is a cleaner design because it means different implementations of these components can be plugged into HBase easily. Among the new interfaces are

Store, Memstore

The corresponding Implementation classes are

HStore implements Store

DefaultMemoryStore implements Memstore

Furthermore, StoreEngine is now an abstract class and acts as a factory to create StoreFileManager, CompactionPolicy and Compactor.
DefaultStoreEngine extends StoreEngine to create the default compactor, policy, and store file manager. This makes storefile management and compaction pluggable.

You can refer to the corresponding Jira and design doc

https://issues.apache.org/jira/browse/HBASE-7678

https://issues.apache.org/jira/secure/attachment/12568488/Pluggable%20compactions%20doc.pdf