# Stock Clustering

Diversification in stock portfolio is always desired to minimize the risk. One of the many ways is to cluster these stocks into many categories in which each category exhibits similar behavior.

Here are a few categories I identified with some stocks by applying simple clustering algorithm.

Category 1

Category 2:

Category 3:

Category 4:

In the experiment, I tried with different number of clusters and calculated its corresponding cost. See the following chart, I chose 15 as the ideal number of clusters to cluster 93 stocks I have in the portfolio.

The main goal of this exercise is to build a balanced portfolio with combination of stocks from different categories to minimize risk.

# Stock Clustering Part I

When come to building a well balanced stock portfolio, it is desired to have different mixture of stocks, potentially from different sectors to minimize the risk. Clustering is a great tool to discover these different types of clusters. You can use different features to categorize these stocks, eg. historical prices, returns, etc.

I know I could have used python or R to run the clustering. But as a big data explorer, I decided to use Mahout and Spark to do the job. I will post my findings and experiences of using these tools in the upcoming posts.

Here are the initial clusters found using the pre-clustering canopy algorithm in Mahout to determine the initial number of K clusters. I will perform the K-Means clustering algorithm next.

The tedious part of using Mahout clustering is you have to transform your data into sequence file format of mahout vector writables. I will share my codes soon.

Please check out my stock prediction model too 🙂 http://giantify.collective2.com/

To be continued…

# Distributed K-Means Clustering

K-Means clustering is the popular and easy to understand clustering algorithm. Both Spark MLlib and Apache Mahout have implemented the algorithm. However, Mahout MapReduce version of K-Means algorithm is slow compared to Spark. This is due to the huge IO involved in every iteration of the algorithm. Basically, at each iteration, the intermediate output results (clusters)  have to be written to the disk and then read in again together with input dataset from the disk. The good news is Apache Mahout has started the work to integrate with Spark.

Let’s look at the codes. KMeansDriver is the driver class. User can choose to run a MapReduce or sequential version of the algorithm. The program arguments are the path to the initial cluster centers, dataset input path,  convergence rate, distance measure implementation class, max number of iterations, and etc.

```@Override
public int run(String[] args) throws Exception {

.clustersInOption()
.withDescription(
&quot;The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  &quot;
+ &quot;If k is also specified, then a random set of vectors will be selected&quot;
+ &quot; and written out to this path first&quot;).create());
.numClustersOption()
.withDescription(
&quot;The k in k-Means.  If specified, then a random selection of k Vectors will be chosen&quot;
+ &quot; as the Centroid and written to the clusters input path.&quot;).create());

if (parseArguments(args) == null) {
return -1;
}

Path input = getInputPath();
Path clusters = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
Path output = getOutputPath();
String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
if (measureClass == null) {
measureClass = SquaredEuclideanDistanceMeasure.class.getName();
}
double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);

if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
int numClusters = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));

Long seed = null;
if (hasOption(DefaultOptionCreator.RANDOM_SEED)) {
seed = Long.parseLong(getOption(DefaultOptionCreator.RANDOM_SEED));
}

clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, numClusters, measure, seed);
}
boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
DefaultOptionCreator.SEQUENTIAL_METHOD);
double clusterClassificationThreshold = 0.0;
if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
}
run(getConf(), input, clusters, output, convergenceDelta, maxIterations, runClustering,
clusterClassificationThreshold, runSequential);
return 0;
}
```

The main MapReduce clustering logic can be found below. As you seen, it is an iterative process where in each iteration, we create a MapReduce job that reads in the input vectors and previous clusters and start assign the input vectors to the closest cluster until either we reach convergence or hit the specified number of iterations. There is a lot of IO overhead here. Each iteration, the MapReduce job has to read the previous results which are the clusters and input vectors all over again. (Take a look at Kluster.java which represents a cluster with cluster id, the center and the associated distance measure)

```
/**
* Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
* implementation
*
* @param conf
*          the Configuration
* @param inPath
*          a Path to input VectorWritables
* @param priorPath
*          a Path to the prior classifier
* @param outPath
*          a Path of output directory
* @param numIterations
*          the int number of iterations to perform
*/
public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
throws IOException, InterruptedException, ClassNotFoundException {
Path clustersOut = null;
int iteration = 1;
while (iteration &lt;= numIterations) {
conf.set(PRIOR_PATH_KEY, priorPath.toString());

String jobName = &quot;Cluster Iterator running iteration &quot; + iteration + &quot; over priorPath: &quot; + priorPath;
Job job = new Job(conf, jobName);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(ClusterWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(ClusterWritable.class);

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(CIMapper.class);
job.setReducerClass(CIReducer.class);

clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
priorPath = clustersOut;
FileOutputFormat.setOutputPath(job, clustersOut);

job.setJarByClass(ClusterIterator.class);
if (!job.waitForCompletion(true)) {
throw new InterruptedException(&quot;Cluster Iteration &quot; + iteration + &quot; failed processing &quot; + priorPath);
}
ClusterClassifier.writePolicy(policy, clustersOut);
FileSystem fs = FileSystem.get(outPath.toUri(), conf);
iteration++;
if (isConverged(clustersOut, conf, fs)) {
break;
}
}
Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
}

```