Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?
Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)
In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.
In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.
public interface MapOutputCollector<K, V> { public void init(Context context ) throws IOException, ClassNotFoundException; public void collect(K key, V value, int partition ) throws IOException, InterruptedException; public void close() throws IOException, InterruptedException; public void flush() throws IOException, InterruptedException, ClassNotFoundException; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable public static class Context { private final MapTask mapTask; private final JobConf jobConf; private final TaskReporter reporter; public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) { this.mapTask = mapTask; this.jobConf = jobConf; this.reporter = reporter; } public MapTask getMapTask() { return mapTask; } public JobConf getJobConf() { return jobConf; } public TaskReporter getReporter() { return reporter; } } }
In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.
private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector<KEY, VALUE>) ReflectionUtils.newInstance( job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class, MapOutputCollector.class), job); LOG.info("Map output collector class = " + collector.getClass().getName()); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); collector.init(context); return collector; }
In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).
//sanity checks final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; kvbuffer = new byte[maxMemUsage];
To be continued…..