Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

Shuffling in Spark vs Hadoop MapReduce

In the last few posts about Hadoop MapTask spill mechanism, we learn that Hadoop uses an in memory buffer during the map task intermediate output writing phase. As the memory soft limit exceeded, it starts spilling the data to disk. This results in multiple spill files that are eventually merged together at the end of the map task into a big file. During the spilling process, the data in the memory buffer are first sorted by the partitions (each partition corresponds to one Reduce task)  and then by the keys.

As of Spark  1.01 version, Spark Map Tasks write the output directly to disk on completion. There is no use of an in memory buffer. Each Map Task writes as many shuffle files as the number of Reduce Task. One shuffle file per Reduce Task. Eg. one could have 1000 Map Tasks  (M) and 5000 Reduce Tasks (R), this results in 5 millions shuffle files. Spark does not however merge them into a single partitioned shuffle file as in Hadoop MapReduce. This number of file IO somehow affects the performance.

You can learn more about Spark shuffling from the following report.

http://www.cs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf

In the above report, it also points out the potential memory issue when using compression on the Map output files.

eg. In a machine with C number of cores allocated to Spark, we have C concurrent Map Tasks, each Map Task is writing out R shuffle files, one per Reduce Task. The total number of memory usage would be C*R*400KB.

Here comes the good news. A new shuffling mechanism, called sort-based shuffling is implemented for upcoming Spark version 1.1.0. You can read the design document below and learn more about this issue in SPARK-2045 JIRA ticket.

https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf

This sort-based shuffling is quite similar but not the same as Hadoop MapReduce shuffling.

The following is the description of SPARK-2045:

“…a sort-based shuffle implementation that takes advantage of an Ordering for keys (or just sorts by hashcode for keys that don’t have it) would likely improve performance and memory usage in very large shuffles. Our current hash-based shuffle needs an open file for each reduce task, which can fill up a lot of memory for compression buffers and cause inefficient IO. This would avoid both of those issues.”

Hadoop MapTask Spill Mechanism Part 2

In the last post, we looked at MapOutputBuffer class. This is the class that maintains the in memory buffer byte array, byte[] kvbuffer during the map task intermediate output writing phase. As the buffer exceeds the threshold, it starts spilling data to the disk.

Inside MapOutputBuffer, there is a thread called SpillThread.

    final ReentrantLock spillLock = new ReentrantLock();
    final Condition spillDone = spillLock.newCondition();
    final Condition spillReady = spillLock.newCondition();
    final BlockingBuffer bb = new BlockingBuffer();
    volatile boolean spillThreadRunning = false;
    final SpillThread spillThread = new SpillThread();

It is a daemon thread and is initiated in the init method of MapOutputBuffer.


public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {

      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }

}

SpillThread is signalled to write out the data to disk (startSpill()) whenever the memory buffer soft limit is exceeded (if bufferRemaining <= 0 ) as the MapTask is writing the intermediate output in the buffer memory.


 public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {


if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }



}

Hope you all enjoy this post !

Hadoop MapTask Spill Mechanism

Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?

Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)

In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.

In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.

public interface MapOutputCollector<K, V> {
  public void init(Context context
                  ) throws IOException, ClassNotFoundException;
  public void collect(K key, V value, int partition
                     ) throws IOException, InterruptedException;
  public void close() throws IOException, InterruptedException;

  public void flush() throws IOException, InterruptedException,
                             ClassNotFoundException;

  @InterfaceAudience.LimitedPrivate({"MapReduce"})
  @InterfaceStability.Unstable
  public static class Context {
    private final MapTask mapTask;
    private final JobConf jobConf;
    private final TaskReporter reporter;

    public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
      this.mapTask = mapTask;
      this.jobConf = jobConf;
      this.reporter = reporter;
    }

    public MapTask getMapTask() {
      return mapTask;
    }

    public JobConf getJobConf() {
      return jobConf;
    }

    public TaskReporter getReporter() {
      return reporter;
    }
  }
}

In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector<KEY, VALUE> collector
      = (MapOutputCollector<KEY, VALUE>)
       ReflectionUtils.newInstance(
                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                        MapOutputBuffer.class, MapOutputCollector.class), job);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    MapOutputCollector.Context context =
                           new MapOutputCollector.Context(this, job, reporter);
    collector.init(context);
    return collector;
  }

In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).

//sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];

To be continued…..

HBase Pluggable Store, Memstore, StoreEngine

I was browsing the HBase codebase and noticed that many components in regionserver module (org.apache.hadoop.hbase.regionserver) had been refactored to become pluggable. This is a cleaner design because it means different implementations of these components can be plugged into HBase easily. Among the new interfaces are

Store, Memstore

The corresponding Implementation classes are

HStore implements Store

DefaultMemoryStore implements Memstore

Furthermore, StoreEngine is now an abstract class and acts as a factory to create StoreFileManager, CompactionPolicy and Compactor.
DefaultStoreEngine extends StoreEngine to create the default compactor, policy, and store file manager. This makes storefile management and compaction pluggable.

You can refer to the corresponding Jira and design doc

https://issues.apache.org/jira/browse/HBASE-7678

https://issues.apache.org/jira/secure/attachment/12568488/Pluggable%20compactions%20doc.pdf