Stock Clustering Part I

When come to building a well balanced stock portfolio, it is desired to have different mixture of stocks, potentially from different sectors to minimize the risk. Clustering is a great tool to discover these different types of clusters. You can use different features to categorize these stocks, eg. historical prices, returns, etc.

I know I could have used python or R to run the clustering. But as a big data explorer, I decided to use Mahout and Spark to do the job. I will post my findings and experiences of using these tools in the upcoming posts.

Here are the initial clusters found using the pre-clustering canopy algorithm in Mahout to determine the initial number of K clusters. I will perform the K-Means clustering algorithm next.

image(4)

image(3)

image(2)

image(1)

image

The tedious part of using Mahout clustering is you have to transform your data into sequence file format of mahout vector writables. I will share my codes soon.

Please check out my stock prediction model too ūüôā http://giantify.collective2.com/

To be continued…

Advertisements

Randomness in Random Forest

In the last post, we looked at the bagging process where a random sample of input is used to build each individual tree. This is one of the two types of randomness used in the algorithm. The second kind of randomness is used in the node splitting process. During the process, a random set of m attributes out of all attributes is used to compute the best split based on information gain.

If m is not specified in the algorithm, then the following logic is used to determine m.

For regression tree,

m = (int) Math.ceil(total_number_attributes / 3.0);

For classification tree,

m = (int) Math.ceil(Math.sqrt(total_number_attributes));

If you take a look at the Apache Mahout DecisionTreeBuilder, you will find the randomAttributes method below. This method selects m attributes to consider for split.

org.apache.mahout.classifier.df.builder.DecisionTreeBuilder

/**
   * Randomly selects m attributes to consider for split, excludes IGNORED and LABEL attributes
   *
   * @param rng      random-numbers generator
   * @param selected attributes' state (selected or not)
   * @param m        number of attributes to choose
   * @return list of selected attributes' indices, or null if all attributes have already been selected
   */
  private static int[] randomAttributes(Random rng, boolean[] selected, int m) {
    int nbNonSelected = 0; // number of non selected attributes
    for (boolean sel : selected) {
      if (!sel) {
        nbNonSelected++;
      }
    }

    if (nbNonSelected == 0) {
      log.warn("All attributes are selected !");
      return NO_ATTRIBUTES;
    }

    int[] result;
    if (nbNonSelected <= m) {
      // return all non selected attributes
      result = new int[nbNonSelected];
      int index = 0;
      for (int attr = 0; attr < selected.length; attr++) {
        if (!selected[attr]) {
          result[index++] = attr;
        }
      }
    } else {
      result = new int[m];
      for (int index = 0; index < m; index++) {
        // randomly choose a "non selected" attribute
        int rind;
        do {
          rind = rng.nextInt(selected.length);
        } while (selected[rind]);

        result[index] = rind;
        selected[rind] = true; // temporarily set the chosen attribute to be selected
      }

      // the chosen attributes are not yet selected
      for (int attr : result) {
        selected[attr] = false;
      }
    }

    return result;
  }

Given the list of chosen attributes, it loops through each one and computes the information gains for different splits generated by using different attributes and try to find the best split.


 // find the best split
    Split best = null;
    for (int attr : attributes) {
      Split split = igSplit.computeSplit(data, attr);
      if (best == null || best.getIg() < split.getIg()) {
        best = split;
      }
    }

After getting the best split, if the information gain is less than the specified threshold, then we stop and create the leaf node with the label.
For regression, the label is the regression value which is calculated by taking the average of all the values of the sample data set. For classification, we use the majority label, the value that appears the most.

 // information gain is near to zero.
    if (best.getIg() < EPSILON) {
      double label;
      if (data.getDataset().isNumerical(data.getDataset().getLabelId())) {
        label = sum / data.size();
      } else {
        label = data.majorityLabel(rng);
      }
      log.debug("ig is near to zero Leaf({})", label);
      return new Leaf(label);
    }

Apache Mahout Random Forest

Random forest is a popular ensemble learning technique where a collection of decision trees are trained to make prediction. Apache Mahout, Spark MLLib, and H2O have all implemented the Random Forest algorithm. Today, we will look at Mahout’s MapReduce version of Random Forest.

To run Random Forest using Mahout, you can use the following command

hadoop jar mahout-examples-0.9-job.jar org.apache.mahout.classifier.df.mapreduce.BuildForest -Dmapred.max.split.size=1874231 -d data/YOUR_TRAINING_DATA.txt -ds DATASET_METADATA.info -sl 5 -t 100 -o YOUR_OUTPUT_FOLDER

If you look at the org.apache.mahout.classifier.df.mapreduce.BuildForest class below, it sets up the MapReduce job with the following available options.

–data –dataset –selection –no-complete –minsplit
–minprop –seed –partial –nbtrees
–output

org.apache.mahout.classifier.df.mapreduce.BuildForest


@Override
  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
    ArgumentBuilder abuilder = new ArgumentBuilder();
    GroupBuilder gbuilder = new GroupBuilder();
    
    Option dataOpt = obuilder.withLongName("data").withShortName("d").withRequired(true)
        .withArgument(abuilder.withName("path").withMinimum(1).withMaximum(1).create())
        .withDescription("Data path").create();
    
    Option datasetOpt = obuilder.withLongName("dataset").withShortName("ds").withRequired(true)
        .withArgument(abuilder.withName("dataset").withMinimum(1).withMaximum(1).create())
        .withDescription("Dataset path").create();
    
    Option selectionOpt = obuilder.withLongName("selection").withShortName("sl").withRequired(false)
        .withArgument(abuilder.withName("m").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, Number of variables to select randomly at each tree-node.\n" 
        + "For classification problem, the default is square root of the number of explanatory variables.\n"
        + "For regression problem, the default is 1/3 of the number of explanatory variables.").create();

    Option noCompleteOpt = obuilder.withLongName("no-complete").withShortName("nc").withRequired(false)
        .withDescription("Optional, The tree is not complemented").create();

    Option minSplitOpt = obuilder.withLongName("minsplit").withShortName("ms").withRequired(false)
        .withArgument(abuilder.withName("minsplit").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, The tree-node is not divided, if the branching data size is " 
        + "smaller than this value.\nThe default is 2.").create();

    Option minPropOpt = obuilder.withLongName("minprop").withShortName("mp").withRequired(false)
        .withArgument(abuilder.withName("minprop").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, The tree-node is not divided, if the proportion of the " 
        + "variance of branching data is smaller than this value.\n"
        + "In the case of a regression problem, this value is used. "
        + "The default is 1/1000(0.001).").create();

    Option seedOpt = obuilder.withLongName("seed").withShortName("sd").withRequired(false)
        .withArgument(abuilder.withName("seed").withMinimum(1).withMaximum(1).create())
        .withDescription("Optional, seed value used to initialise the Random number generator").create();
    
    Option partialOpt = obuilder.withLongName("partial").withShortName("p").withRequired(false)
        .withDescription("Optional, use the Partial Data implementation").create();
    
    Option nbtreesOpt = obuilder.withLongName("nbtrees").withShortName("t").withRequired(true)
        .withArgument(abuilder.withName("nbtrees").withMinimum(1).withMaximum(1).create())
        .withDescription("Number of trees to grow").create();
    
    Option outputOpt = obuilder.withLongName("output").withShortName("o").withRequired(true)
        .withArgument(abuilder.withName("path").withMinimum(1).withMaximum(1).create())
        .withDescription("Output path, will contain the Decision Forest").create();

    Option helpOpt = obuilder.withLongName("help").withShortName("h")
        .withDescription("Print out help").create();
    
    Group group = gbuilder.withName("Options").withOption(dataOpt).withOption(datasetOpt)
        .withOption(selectionOpt).withOption(noCompleteOpt).withOption(minSplitOpt)
        .withOption(minPropOpt).withOption(seedOpt).withOption(partialOpt).withOption(nbtreesOpt)
        .withOption(outputOpt).withOption(helpOpt).create();
    
    try {
      Parser parser = new Parser();
      parser.setGroup(group);
      CommandLine cmdLine = parser.parse(args);
      
      if (cmdLine.hasOption("help")) {
        CommandLineUtil.printHelp(group);
        return -1;
      }
      
      isPartial = cmdLine.hasOption(partialOpt);
      String dataName = cmdLine.getValue(dataOpt).toString();
      String datasetName = cmdLine.getValue(datasetOpt).toString();
      String outputName = cmdLine.getValue(outputOpt).toString();
      nbTrees = Integer.parseInt(cmdLine.getValue(nbtreesOpt).toString());
      
      if (cmdLine.hasOption(selectionOpt)) {
        m = Integer.parseInt(cmdLine.getValue(selectionOpt).toString());
      }
      complemented = !cmdLine.hasOption(noCompleteOpt);
      if (cmdLine.hasOption(minSplitOpt)) {
        minSplitNum = Integer.parseInt(cmdLine.getValue(minSplitOpt).toString());
      }
      if (cmdLine.hasOption(minPropOpt)) {
        minVarianceProportion = Double.parseDouble(cmdLine.getValue(minPropOpt).toString());
      }
      if (cmdLine.hasOption(seedOpt)) {
        seed = Long.valueOf(cmdLine.getValue(seedOpt).toString());
      }

      if (log.isDebugEnabled()) {
        log.debug("data : {}", dataName);
        log.debug("dataset : {}", datasetName);
        log.debug("output : {}", outputName);
        log.debug("m : {}", m);
        log.debug("complemented : {}", complemented);
        log.debug("minSplitNum : {}", minSplitNum);
        log.debug("minVarianceProportion : {}", minVarianceProportion);
        log.debug("seed : {}", seed);
        log.debug("nbtrees : {}", nbTrees);
        log.debug("isPartial : {}", isPartial);
      }

      dataPath = new Path(dataName);
      datasetPath = new Path(datasetName);
      outputPath = new Path(outputName);
      
    } catch (OptionException e) {
      log.error("Exception", e);
      CommandLineUtil.printHelp(group);
      return -1;
    }
    
    buildForest();
    
    return 0;
  }

After arguments parsing, it continues with buildForest method call as shown below, it instantiates either InMemoryBuilder or PartialBuilder MapReduce job and then runs it to build the random forest (DecisionForest).

DecisionForest forest = forestBuilder.build(nbTrees);


private void buildForest() throws IOException, ClassNotFoundException, InterruptedException {
    // make sure the output path does not exist
    FileSystem ofs = outputPath.getFileSystem(getConf());
    if (ofs.exists(outputPath)) {
      log.error("Output path already exists");
      return;
    }

    DecisionTreeBuilder treeBuilder = new DecisionTreeBuilder();
    if (m != null) {
      treeBuilder.setM(m);
    }
    treeBuilder.setComplemented(complemented);
    if (minSplitNum != null) {
      treeBuilder.setMinSplitNum(minSplitNum);
    }
    if (minVarianceProportion != null) {
      treeBuilder.setMinVarianceProportion(minVarianceProportion);
    }
    
    Builder forestBuilder;
    
    if (isPartial) {
      log.info("Partial Mapred implementation");
      forestBuilder = new PartialBuilder(treeBuilder, dataPath, datasetPath, seed, getConf());
    } else {
      log.info("InMem Mapred implementation");
      forestBuilder = new InMemBuilder(treeBuilder, dataPath, datasetPath, seed, getConf());
    }

    forestBuilder.setOutputDirName(outputPath.getName());
    
    log.info("Building the forest...");
    long time = System.currentTimeMillis();
    
    DecisionForest forest = forestBuilder.build(nbTrees);
    if (forest == null) {
      return;
    }
    
    time = System.currentTimeMillis() - time;
    log.info("Build Time: {}", DFUtils.elapsedTime(time));
    log.info("Forest num Nodes: {}", forest.nbNodes());
    log.info("Forest mean num Nodes: {}", forest.meanNbNodes());
    log.info("Forest mean max Depth: {}", forest.meanMaxDepth());

    // store the decision forest in the output path
    Path forestPath = new Path(outputPath, "forest.seq");
    log.info("Storing the forest in: {}", forestPath);
    DFUtils.storeWritable(getConf(), forestPath, forest);
  }
  
 

Both InMemoryBuilder and PartialBuilder extend the abstract Builder class. The abstract class has the following required abstract methods to be implemented by any subclass.

protected abstract void configureJob(Job job) throws IOException;
 
protected abstract DecisionForest parseOutput(Job job) throws IOException;

Next, we will look inside PartialBuilder, the MapReduce implementation of Random Forest job. It uses Step1Mapper as the Mapper implementation and does not have any reducer.

In each mapper task, it reads and converts each line of inputs into a dense vector and stores these vectors in an array list during the map phase.

At the end of the map phase, the mapper task starts to build a subset of trees using the input array list of vectors and then writes the these trees to the disk as the custom MapredOutput writables.

The number of trees to be built per mapper task is calculated as follows

int treesPerMapper = numTrees / numMaps;

/**
 * First step of the Partial Data Builder. Builds the trees using the data available in the InputSplit.
 * Predict the oob classes for each tree in its growing partition (input split).
 */
public class Step1Mapper extends MapredMapper<LongWritable,Text,TreeID,MapredOutput> {
  
  private static final Logger log = LoggerFactory.getLogger(Step1Mapper.class);
  
  /** used to convert input values to data instances */
  private DataConverter converter;
  
  private Random rng;
  
  /** number of trees to be built by this mapper */
  private int nbTrees;
  
  /** id of the first tree */
  private int firstTreeId;
  
  /** mapper's partition */
  private int partition;
  
  /** will contain all instances if this mapper's split */
  private final List<Instance> instances = Lists.newArrayList();
  
  public int getFirstTreeId() {
    return firstTreeId;
  }
  
  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    
    configure(Builder.getRandomSeed(conf), conf.getInt("mapred.task.partition", -1),
      Builder.getNumMaps(conf), Builder.getNbTrees(conf));
  }
  
  /**
   * Useful when testing
   * 
   * @param partition
   *          current mapper inputSplit partition
   * @param numMapTasks
   *          number of running map tasks
   * @param numTrees
   *          total number of trees in the forest
   */
  protected void configure(Long seed, int partition, int numMapTasks, int numTrees) {
    converter = new DataConverter(getDataset());
    
    // prepare random-numders generator
    log.debug("seed : {}", seed);
    if (seed == null) {
      rng = RandomUtils.getRandom();
    } else {
      rng = RandomUtils.getRandom(seed);
    }
    
    // mapper's partition
    Preconditions.checkArgument(partition >= 0, "Wrong partition ID: " + partition + ". Partition must be >= 0!");
    this.partition = partition;
    
    // compute number of trees to build
    nbTrees = nbTrees(numMapTasks, numTrees, partition);
    
    // compute first tree id
    firstTreeId = 0;
    for (int p = 0; p < partition; p++) {
      firstTreeId += nbTrees(numMapTasks, numTrees, p);
    }
    
    log.debug("partition : {}", partition);
    log.debug("nbTrees : {}", nbTrees);
    log.debug("firstTreeId : {}", firstTreeId);
  }
  
  /**
   * Compute the number of trees for a given partition. The first partitions may be longer
   * than the rest because of the remainder.
   * 
   * @param numMaps
   *          total number of maps (partitions)
   * @param numTrees
   *          total number of trees to build
   * @param partition
   *          partition to compute the number of trees for
   */
  public static int nbTrees(int numMaps, int numTrees, int partition) {
    int treesPerMapper = numTrees / numMaps;
    int remainder = numTrees - numMaps * treesPerMapper;
    return treesPerMapper + (partition < remainder ? 1 : 0);
  }
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    instances.add(converter.convert(value.toString()));
  }
  
  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    // prepare the data
    log.debug("partition: {} numInstances: {}", partition, instances.size());
    
    Data data = new Data(getDataset(), instances);
    Bagging bagging = new Bagging(getTreeBuilder(), data);
    
    TreeID key = new TreeID();
    
    log.debug("Building {} trees", nbTrees);
    for (int treeId = 0; treeId < nbTrees; treeId++) {
      log.debug("Building tree number : {}", treeId);
      
      Node tree = bagging.build(rng);
      
      key.set(partition, firstTreeId + treeId);
      
      if (isOutput()) {
        MapredOutput emOut = new MapredOutput(tree);
        context.write(key, emOut);
      }

      context.progress();
    }
  }
  
}

Bagging is used to create a random sample from the input data to build a decision tree. Take a look at the Bagging class below.

It delegates the task of bagging (create a random sample) to Data class. After that, it uses tree builder to build a decision tree given the bag of random sampled data.
treeBuilder.build(rng, bag);


/**
 * Builds a tree using bagging
 */
public class Bagging {
  
  private static final Logger log = LoggerFactory.getLogger(Bagging.class);
  
  private final TreeBuilder treeBuilder;
  
  private final Data data;
  
  private final boolean[] sampled;
  
  public Bagging(TreeBuilder treeBuilder, Data data) {
    this.treeBuilder = treeBuilder;
    this.data = data;
    sampled = new boolean[data.size()];
  }
  
  /**
   * Builds one tree
   */
  public Node build(Random rng) {
    log.debug("Bagging...");
    Arrays.fill(sampled, false);
    Data bag = data.bagging(rng, sampled);
    
    log.debug("Building...");
    return treeBuilder.build(rng, bag);
  }
  
}

See the below org.apache.mahout.classifier.df.data.Data class. It has the following two methods to sample and return a bag of random samples.


   /**
   * if data has N cases, sample N cases at random -but with replacement.
   */
  public Data bagging(Random rng) {
    int datasize = size();
    List<Instance> bag = Lists.newArrayListWithCapacity(datasize);
    
    for (int i = 0; i < datasize; i++) {
      bag.add(instances.get(rng.nextInt(datasize)));
    }
    
    return new Data(dataset, bag);
  }
  
  /**
   * if data has N cases, sample N cases at random -but with replacement.
   * 
   * @param sampled
   *          indicating which instance has been sampled
   * 
   * @return sampled data
   */
  public Data bagging(Random rng, boolean[] sampled) {
    int datasize = size();
    List<Instance> bag = Lists.newArrayListWithCapacity(datasize);
    
    for (int i = 0; i < datasize; i++) {
      int index = rng.nextInt(datasize);
      bag.add(instances.get(index));
      sampled[index] = true;
    }
    
    return new Data(dataset, bag);
  }
  

Once the decision forest is built by the PartialBuider MapReduce job, it can be used for classifying new Instance.

org.apache.mahout.classifier.df.DecisionForest is the Writable implementation of decision forest. It provides the following Classify method. For numerical output, it takes the average of all the trees’ outputs, sum / cnt.
As for categorical output, it returns the category which has the highest votes from all the trees.


 /**
   * predicts the label for the instance
   * 
   * @param rng
   *          Random number generator, used to break ties randomly
   * @return NaN if the label cannot be predicted
   */
  public double classify(Dataset dataset, Random rng, Instance instance) {
    if (dataset.isNumerical(dataset.getLabelId())) {
      double sum = 0;
      int cnt = 0;
      for (Node tree : trees) {
        double prediction = tree.classify(instance);
        if (!Double.isNaN(prediction)) {
          sum += prediction;
          cnt++;
        }
      }

      if (cnt > 0) {
        return sum / cnt;
      } else {
        return Double.NaN;
      }
    } else {
      int[] predictions = new int[dataset.nblabels()];
      for (Node tree : trees) {
        double prediction = tree.classify(instance);
        if (!Double.isNaN(prediction)) {
          predictions[(int) prediction]++;
        }
      }

      if (DataUtils.sum(predictions) == 0) {
        return Double.NaN; // no prediction available
      }

      return DataUtils.maxindex(rng, predictions);
    }
  }

The node split criteria used in the Mahout’s is based on the idea of entropy and information gain. Stay tuned for more infos.

Distributed K-Means Clustering

K-Means clustering is the popular and easy to understand clustering algorithm. Both Spark MLlib and Apache Mahout have implemented the algorithm. However, Mahout MapReduce version of K-Means algorithm is slow compared to Spark. This is due to the huge IO involved in every iteration of the algorithm. Basically, at each iteration, the intermediate output results (clusters)  have to be written to the disk and then read in again together with input dataset from the disk. The good news is Apache Mahout has started the work to integrate with Spark.

Let’s look at the codes. KMeansDriver is the driver class. User can choose to run a MapReduce or sequential version of the algorithm. The program arguments are the path to¬†the initial cluster centers, dataset input path, ¬†convergence rate, distance measure implementation class, max number of iterations, and etc.

@Override
  public int run(String[] args) throws Exception {
    
    addInputOption();
    addOutputOption();
    addOption(DefaultOptionCreator.distanceMeasureOption().create());
    addOption(DefaultOptionCreator
        .clustersInOption()
        .withDescription(
            &quot;The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  &quot;
                + &quot;If k is also specified, then a random set of vectors will be selected&quot;
                + &quot; and written out to this path first&quot;).create());
    addOption(DefaultOptionCreator
        .numClustersOption()
        .withDescription(
            &quot;The k in k-Means.  If specified, then a random selection of k Vectors will be chosen&quot;
                + &quot; as the Centroid and written to the clusters input path.&quot;).create());
    addOption(DefaultOptionCreator.useSetRandomSeedOption().create());
    addOption(DefaultOptionCreator.convergenceOption().create());
    addOption(DefaultOptionCreator.maxIterationsOption().create());
    addOption(DefaultOptionCreator.overwriteOption().create());
    addOption(DefaultOptionCreator.clusteringOption().create());
    addOption(DefaultOptionCreator.methodOption().create());
    addOption(DefaultOptionCreator.outlierThresholdOption().create());
   
    if (parseArguments(args) == null) {
      return -1;
    }
    
    Path input = getInputPath();
    Path clusters = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
    Path output = getOutputPath();
    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
    if (measureClass == null) {
      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
    }
    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
      HadoopUtil.delete(getConf(), output);
    }
    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
    
    if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
      int numClusters = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));

      Long seed = null;
      if (hasOption(DefaultOptionCreator.RANDOM_SEED)) {
        seed = Long.parseLong(getOption(DefaultOptionCreator.RANDOM_SEED));
      }

      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, numClusters, measure, seed);
    }
    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
        DefaultOptionCreator.SEQUENTIAL_METHOD);
    double clusterClassificationThreshold = 0.0;
    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
    }
    run(getConf(), input, clusters, output, convergenceDelta, maxIterations, runClustering,
        clusterClassificationThreshold, runSequential);
    return 0;
  }

The main MapReduce clustering logic can be found below. As you seen, it is an iterative process where in each iteration, we create a MapReduce job that reads in the input vectors and previous clusters and start assign the input vectors to the closest cluster until either we reach convergence or hit the specified number of iterations. There is a lot of IO overhead here. Each iteration, the MapReduce job has to read the previous results which are the clusters and input vectors all over again. (Take a look at Kluster.java which represents a cluster with cluster id, the center and the associated distance measure)


/**
   * Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
   * implementation
   * 
   * @param conf
   *          the Configuration
   * @param inPath
   *          a Path to input VectorWritables
   * @param priorPath
   *          a Path to the prior classifier
   * @param outPath
   *          a Path of output directory
   * @param numIterations
   *          the int number of iterations to perform
   */
  public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
    Path clustersOut = null;
    int iteration = 1;
    while (iteration &lt;= numIterations) {
      conf.set(PRIOR_PATH_KEY, priorPath.toString());
      
      String jobName = &quot;Cluster Iterator running iteration &quot; + iteration + &quot; over priorPath: &quot; + priorPath;
      Job job = new Job(conf, jobName);
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(ClusterWritable.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(ClusterWritable.class);
      
      job.setInputFormatClass(SequenceFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);
      job.setMapperClass(CIMapper.class);
      job.setReducerClass(CIReducer.class);
      
      FileInputFormat.addInputPath(job, inPath);
      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
      priorPath = clustersOut;
      FileOutputFormat.setOutputPath(job, clustersOut);
      
      job.setJarByClass(ClusterIterator.class);
      if (!job.waitForCompletion(true)) {
        throw new InterruptedException(&quot;Cluster Iteration &quot; + iteration + &quot; failed processing &quot; + priorPath);
      }
      ClusterClassifier.writePolicy(policy, clustersOut);
      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
      iteration++;
      if (isConverged(clustersOut, conf, fs)) {
        break;
      }
    }
    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
  }

Stay tuned for more infos about the Mapper and Reducer implementation of the algorithm.

Mahout Distance Measure

In Mahout, there are different distance measure classes defined in the org.apache.mahout.common.distance package. Among them are SquaredEuclideanDistanceMeasure, EuclideanDistanceMeasure, CosineDistanceMeasure, MahalanobisDistanceMeasure, ManhattanDistanceMeasure. These distance classes implements the DistanceMeasure interface.

There is also support for weighted version of DistanceMeasure eg. WeightedEuclideanDistanceMeasure and WeightedManhattanDistanceMeasure. Both extends the abstract class WeightedDistanceMeasure.

DistanceMeasure

DistanceMeasure Mahout

WeightedDistanceMeasure

WeightedDistanceMeasure

Mahout Math Package

I was looking into Mahout Math package especially the various Vector implementations, such as DenseVector, NamedVector, WeightedVector, and etc. Here is the class diagram.

mahout-vector-class-diagram

mahout math Vector class diagram

 

Vector is the base interface for all different Vector implementations. There is a base abstract class, AbstractVector which provides the base implementations of some defined methods, such as norm, normalize, logNormalize, getDistanceSquared, maxValue, minValue, plus, minus, times and etc in the interface.

There are a few interesting methods  in the AbstractVector. Eg. createOptimizedCopy(Vector vector)

</pre>
private static Vector createOptimizedCopy(Vector vector) {

Vector result;

if (vector.isDense()) {

result = vector.like().assign(vector, Functions.SECOND_LEFT_ZERO);

} else {

result = vector.clone();

}

return result;

}

ConstantVector implementation is also quite interesting. Since it is a vector of size n with a constant value, it only has a int size member and double value member.

As the name implies, DenseVector is an implementation of  a dense vector. It contains an array of double internally. It also provides NonDefaultIterator and AllIterator which implement Iterator interface to allow for each type operation. NonDefaultIterator iterates through non zeros  elements whereas AllIterator iterates through all elements.

DelegatingVector implements the Vector interface without extending the AbstractVector. It has a Vector as a delegate internally. WeightedVector extends DelegatingVector and decorate the delegate with a weight and positional index.

</pre>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.mahout.math;
import org.apache.mahout.math.function.DoubleDoubleFunction;
import org.apache.mahout.math.function.DoubleFunction;

/**
* The basic interface including numerous convenience functions <p/> NOTE: All implementing classes must have a
* constructor that takes an int for cardinality and a no-arg constructor that can be used for marshalling the Writable
* instance <p/> NOTE: Implementations may choose to reuse the Vector.Element in the Iterable methods
*/
public interface Vector extends Cloneable {

/** @return a formatted String suitable for output */
String asFormatString();

/**
* Assign the value to all elements of the receiver
*
* @param value a double value
* @return the modified receiver
*/
Vector assign(double value);

/**
* Assign the values to the receiver
*
* @param values a double[] of values
* @return the modified receiver
* @throws CardinalityException if the cardinalities differ
*/
Vector assign(double[] values);

/**
* Assign the other vector values to the receiver
*
* @param other a Vector
* @return the modified receiver
* @throws CardinalityException if the cardinalities differ
*/
Vector assign(Vector other);

/**
* Apply the function to each element of the receiver
*
* @param function a DoubleFunction to apply
* @return the modified receiver
*/
Vector assign(DoubleFunction function);

/**
* Apply the function to each element of the receiver and the corresponding element of the other argument
*
* @param other a Vector containing the second arguments to the function
* @param function a DoubleDoubleFunction to apply
* @return the modified receiver
* @throws CardinalityException if the cardinalities differ
*/
Vector assign(Vector other, DoubleDoubleFunction function);

/**
* Apply the function to each element of the receiver, using the y value as the second argument of the
* DoubleDoubleFunction
*
* @param f a DoubleDoubleFunction to be applied
* @param y a double value to be argument to the function
* @return the modified receiver
*/
Vector assign(DoubleDoubleFunction f, double y);

/**
* Return the cardinality of the recipient (the maximum number of values)
*
* @return an int
*/
int size();

/**
* @return true iff this implementation should be considered dense -- that it explicitly
* represents every value
*/
boolean isDense();

/**
* @return true iff this implementation should be considered to be iterable in index order in an efficient way.
* In particular this implies that {@link #all()} and {@link #nonZeroes()} ()} return elements
* in ascending order by index.
*/
boolean isSequentialAccess();

/**
* Return a copy of the recipient
*
* @return a new Vector
*/
@SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
Vector clone();

Iterable<Element> all();

Iterable<Element> nonZeroes();

/**
* Return an object of Vector.Element representing an element of this Vector. Useful when designing new iterator
* types.
*
* @param index Index of the Vector.Element required
* @return The Vector.Element Object
*/
Element getElement(int index);

/**
* Merge a set of (index, value) pairs into the vector.
* @param updates an ordered mapping of indices to values to be merged in.
*/
void mergeUpdates(OrderedIntDoubleMapping updates);

/**
* A holder for information about a specific item in the Vector. <p/> When using with an Iterator, the implementation
* may choose to reuse this element, so you may need to make a copy if you want to keep it
*/
interface Element {

/** @return the value of this vector element. */
double get();

/** @return the index of this vector element. */
int index();

/** @param value Set the current element to value. */
void set(double value);
}

/**
* Return a new vector containing the values of the recipient divided by the argument
*
* @param x a double value
* @return a new Vector
*/
Vector divide(double x);

/**
* Return the dot product of the recipient and the argument
*
* @param x a Vector
* @return a new Vector
* @throws CardinalityException if the cardinalities differ
*/
double dot(Vector x);

/**
* Return the value at the given index
*
* @param index an int index
* @return the double at the index
* @throws IndexException if the index is out of bounds
*/
double get(int index);

/**
* Return the value at the given index, without checking bounds
*
* @param index an int index
* @return the double at the index
*/
double getQuick(int index);

/**
* Return an empty vector of the same underlying class as the receiver
*
* @return a Vector
*/
Vector like();

/**
* Return a new vector containing the element by element difference of the recipient and the argument
*
* @param x a Vector
* @return a new Vector
* @throws CardinalityException if the cardinalities differ
*/
Vector minus(Vector x);

/**
* Return a new vector containing the normalized (L_2 norm) values of the recipient
*
* @return a new Vector
*/
Vector normalize();

/**
* Return a new Vector containing the normalized (L_power norm) values of the recipient. <p/> See
* http://en.wikipedia.org/wiki/Lp_space <p/> Technically, when 0 < power < 1, we don't have a norm, just a metric,
* but we'll overload this here. <p/> Also supports power == 0 (number of non-zero elements) and power = {@link
* Double#POSITIVE_INFINITY} (max element). Again, see the Wikipedia page for more info
*
* @param power The power to use. Must be >= 0. May also be {@link Double#POSITIVE_INFINITY}. See the Wikipedia link
* for more on this.
* @return a new Vector x such that norm(x, power) == 1
*/
Vector normalize(double power);

/**
* Return a new vector containing the log(1 + entry)/ L_2 norm values of the recipient
*
* @return a new Vector
*/
Vector logNormalize();

/**
* Return a new Vector with a normalized value calculated as log_power(1 + entry)/ L_power norm. <p/>
*
* @param power The power to use. Must be > 1. Cannot be {@link Double#POSITIVE_INFINITY}.
* @return a new Vector
*/
Vector logNormalize(double power);

/**
* Return the k-norm of the vector. <p/> See http://en.wikipedia.org/wiki/Lp_space <p/> Technically, when 0 &gt; power
* &lt; 1, we don't have a norm, just a metric, but we'll overload this here. Also supports power == 0 (number of
* non-zero elements) and power = {@link Double#POSITIVE_INFINITY} (max element). Again, see the Wikipedia page for
* more info.
*
* @param power The power to use.
* @see #normalize(double)
*/
double norm(double power);

/** @return The minimum value in the Vector */
double minValue();

/** @return The index of the minimum value */
int minValueIndex();

/** @return The maximum value in the Vector */
double maxValue();

/** @return The index of the maximum value */
int maxValueIndex();

/**
* Return a new vector containing the sum of each value of the recipient and the argument
*
* @param x a double
* @return a new Vector
*/
Vector plus(double x);

/**
* Return a new vector containing the element by element sum of the recipient and the argument
*
* @param x a Vector
* @return a new Vector
* @throws CardinalityException if the cardinalities differ
*/
Vector plus(Vector x);

/**
* Set the value at the given index
*
* @param index an int index into the receiver
* @param value a double value to set
* @throws IndexException if the index is out of bounds
*/
void set(int index, double value);

/**
* Set the value at the given index, without checking bounds
*
* @param index an int index into the receiver
* @param value a double value to set
*/
void setQuick(int index, double value);

/**
* Increment the value at the given index by the given value.
*
* @param index an int index into the receiver
* @param increment sets the value at the given index to value + increment;
*/
void incrementQuick(int index, double increment);

/**
* Return the number of values in the recipient which are not the default value. For instance, for a
* sparse vector, this would be the number of non-zero values.
*
* @return an int
*/
int getNumNondefaultElements();

/**
* Return the number of non zero elements in the vector.
*
* @return an int
*/
int getNumNonZeroElements();

/**
* Return a new vector containing the product of each value of the recipient and the argument
*
* @param x a double argument
* @return a new Vector
*/
Vector times(double x);

/**
* Return a new vector containing the element-wise product of the recipient and the argument
*
* @param x a Vector argument
* @return a new Vector
* @throws CardinalityException if the cardinalities differ
*/
Vector times(Vector x);

/**
* Return a new vector containing the subset of the recipient
*
* @param offset an int offset into the receiver
* @param length the cardinality of the desired result
* @return a new Vector
* @throws CardinalityException if the length is greater than the cardinality of the receiver
* @throws IndexException if the offset is negative or the offset+length is outside of the receiver
*/
Vector viewPart(int offset, int length);

/**
* Return the sum of all the elements of the receiver
*
* @return a double
*/
double zSum();

/**
* Return the cross product of the receiver and the other vector
*
* @param other another Vector
* @return a Matrix
*/
Matrix cross(Vector other);

/*
* Need stories for these but keeping them here for now.
*/
// void getNonZeros(IntArrayList jx, DoubleArrayList values);
// void foreachNonZero(IntDoubleFunction f);
// DoubleDoubleFunction map);
// NewVector assign(Vector y, DoubleDoubleFunction function, IntArrayList
// nonZeroIndexes);

/**
* Examples speak louder than words: aggregate(plus, pow(2)) is another way to say
* getLengthSquared(), aggregate(max, abs) is norm(Double.POSITIVE_INFINITY). To sum all of the positive values,
* aggregate(plus, max(0)).
* @param aggregator used to combine the current value of the aggregation with the result of map.apply(nextValue)
* @param map a function to apply to each element of the vector in turn before passing to the aggregator
* @return the final aggregation
*/
double aggregate(DoubleDoubleFunction aggregator, DoubleFunction map);

/**
* <p>Generalized inner product - take two vectors, iterate over them both, using the combiner to combine together
* (and possibly map in some way) each pair of values, which are then aggregated with the previous accumulated
* value in the combiner.</p>
* <p>
* Example: dot(other) could be expressed as aggregate(other, Plus, Times), and kernelized inner products (which
* are symmetric on the indices) work similarly.
* @param other a vector to aggregate in combination with
* @param aggregator function we're aggregating with; fa
* @param combiner function we're combining with; fc
* @return the final aggregation; if r0 = fc(this[0], other[0]), ri = fa(r_{i-1}, fc(this[i], other[i]))
* for all i > 0
*/
double aggregate(Vector other, DoubleDoubleFunction aggregator, DoubleDoubleFunction combiner);

/** Return the sum of squares of all elements in the vector. Square root of this value is the length of the vector. */
double getLengthSquared();

/** Get the square of the distance between this vector and the other vector. */
double getDistanceSquared(Vector v);

/**
* Gets an estimate of the cost (in number of operations) it takes to lookup a random element in this vector.
*/
double getLookupCost();

/**
* Gets an estimate of the cost (in number of operations) it takes to advance an iterator through the nonzero
* elements of this vector.
*/
double getIteratorAdvanceCost();

/**
* Return true iff adding a new (nonzero) element takes constant time for this vector.
*/
boolean isAddConstantTime();
}