HadoopUtil class in Mahout

One can use the HadoopUtil class to prepare a Hadoop MapReduce job

org.apache.mahout.common.HadoopUtil

public static Job prepareJob(Path inputPath,

Path outputPath,

Class<? extends InputFormat> inputFormat,

Class<? extends Mapper> mapper,

Class<? extends Writable> mapperKey,

Class<? extends Writable> mapperValue,

Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {

Job job = new Job(new Configuration(conf));

Configuration jobConf = job.getConfiguration();

if (mapper.equals(Mapper.class)) {

throw new IllegalStateException(“Can’t figure out the user class jar file from mapper/reducer”);

}

job.setJarByClass(mapper);

job.setInputFormatClass(inputFormat);

jobConf.set(“mapred.input.dir”, inputPath.toString());

job.setMapperClass(mapper);

job.setMapOutputKeyClass(mapperKey);

job.setMapOutputValueClass(mapperValue);

job.setOutputKeyClass(mapperKey);

job.setOutputValueClass(mapperValue);

jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setNumReduceTasks(0);

job.setOutputFormatClass(outputFormat);

jobConf.set(“mapred.output.dir”, outputPath.toString());

return job;

}

public static Job prepareJob(Path inputPath,

Path outputPath,

Class<? extends InputFormat> inputFormat,

Class<? extends Mapper> mapper,

Class<? extends Writable> mapperKey,

Class<? extends Writable> mapperValue,

Class<? extends Reducer> reducer,

Class<? extends Writable> reducerKey,

Class<? extends Writable> reducerValue,

Class<? extends OutputFormat> outputFormat,

Configuration conf) throws IOException {

Job job = new Job(new Configuration(conf));

Configuration jobConf = job.getConfiguration();

if (reducer.equals(Reducer.class)) {

if (mapper.equals(Mapper.class)) {

throw new IllegalStateException(“Can’t figure out the user class jar file from mapper/reducer”);

}

job.setJarByClass(mapper);

} else {

job.setJarByClass(reducer);

}

job.setInputFormatClass(inputFormat);

jobConf.set(“mapred.input.dir”, inputPath.toString());

job.setMapperClass(mapper);

if (mapperKey != null) {

job.setMapOutputKeyClass(mapperKey);

}

if (mapperValue != null) {

job.setMapOutputValueClass(mapperValue);

}

jobConf.setBoolean(“mapred.compress.map.output”, true);

job.setReducerClass(reducer);

job.setOutputKeyClass(reducerKey);

job.setOutputValueClass(reducerValue);

job.setOutputFormatClass(outputFormat);

jobConf.set(“mapred.output.dir”, outputPath.toString());

return job;

}

Commands to Run MapReduce Job

ant build your mapreduce job jar first. See the following ant build.xml

<project name=”MyProject” basedir=”.” default=”distr”>

<property name=”Name” value=”TSMapReduce”/>

<property name=”name” value=”TSMapReduce”/>

 

<property name=”version” value=”0.1.0″/>

<property name=”final.name” value=”${name}-${version}”/>

 

<property name=”src.dir” value=”src”/>

    <property name=”lib.dir” value=”lib”/>

 

<property name=”build.dir” value=”build”/>

<property name=”jar.dir” value=”${build.dir}”/>

<property name=”build.lib.dir” value=”${build.dir}/lib”/>

 

<property name=”main-class” value=”com.your_xxx_domain.timeseries.MyMapReduceJob”/>

<property name=”tmp.dir” value=”tmp”/>

    <path id=”classpath”>

        <fileset dir=”${lib.dir}” includes=”**/*.jar”/>

    </path>

 

<target name=”clean”>

<delete dir=”${build.dir}”/>

<delete dir=”${tmp.dir}”/>

</target>

 

<target name=”compile”>

<mkdir dir=”${tmp.dir}”/>

<mkdir dir=”${build.dir}”/>

 

<javac srcdir=”${src.dir}” destdir=”${tmp.dir}” debug=”on”

classpathref=”classpath”/>

</target>

 

<target name=”distr” depends=”jar”>

<mkdir dir=”${build.lib.dir}”/>

<copy todir=”${build.lib.dir}”>

<fileset dir=”${lib.dir}”/>

</copy>

 

<delete dir=”${tmp.dir}”/>

</target>

 

<target name=”jar” depends=”clean, compile”>

 

<jar destfile=”${build.dir}/${final.name}.jar” basedir=”${tmp.dir}”>

<!– <manifest>

<attribute name=”Main-Class” value=”${main-class}”/>

<attribute name=”Class-Path” value=”classpath”/>

</manifest>

–>

<fileset dir=”.”>

<include name=”**/${lib.dir}/**” />

</fileset>

 

</jar>

</target>

</project>

./bin/hadoop jar YOUR_MAP_REDUCE_JOB.jar com.your_xxx_domain.path.to.your.class.MyMapReduceClass programArgs1 programArgs2

Mahout distance measure

All the distance measure classes are in org.apache.mahout.common.distance package.

They all implements the DistanceMeasure interface which also extends the Parametered interface.

There are two methods in the DistanceMeasure interface:

double distance(Vector v1, Vector v2)

double distance(double centroidLengthSquare, Vector centroid, Vector v)

You can create a new distance measure by implementing the DistanceMeasure interface 🙂

Mahout clustering

I have been exploring the mahout clustering packages. Some brief description of the codes

org.apache.mahout.clustering.kmeans.KMeansDriver is the main entry point. The run method setup the clustering job by specifying the input path to data points, path to initial k clusters, distance measure class used.

In the KMeansDriver, it also setup the settings in the Configuration as follows:

conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);

KMeansConfigKeys is an interface which defines the config keys for the settings.

org.apache.mahout.clustering.kmeans.KMeansMapper loads the initial predefined clusters via the setup method. In the map method, it reads the VectorWritable and passes it to the KMeansCluster to do the clustering, comparing against the predefined clusters via the following call.

this.clusterer.emitPointToNearestCluster(point.get(), this.clusters, context);

In the emitPointToNeareastCluster, it finds the nearest cluster by comparing the distance to each cluster in the list. It then writes the cluster identifier and wraps the point in the ClusterObservations to HDFS.

context.write(new Text(nearestCluster.getIdentifier()), new ClusterObservations(1, point, point.times(point)));

In the KMeansReducer, it aggregates the list of ClusterObservations into a Cluster before writing it the Cluster id as key and Cluster as the value.

In the setup method, KMeansReducer loads the predefined initial cluster from HDFS path and creates the ClusterMap of key as cluster id and value as Cluster itself.