Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

Hadoop MapTask Spill Mechanism

Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?

Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)

In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.

In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.

public interface MapOutputCollector<K, V> {
  public void init(Context context
                  ) throws IOException, ClassNotFoundException;
  public void collect(K key, V value, int partition
                     ) throws IOException, InterruptedException;
  public void close() throws IOException, InterruptedException;

  public void flush() throws IOException, InterruptedException,
                             ClassNotFoundException;

  @InterfaceAudience.LimitedPrivate({"MapReduce"})
  @InterfaceStability.Unstable
  public static class Context {
    private final MapTask mapTask;
    private final JobConf jobConf;
    private final TaskReporter reporter;

    public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
      this.mapTask = mapTask;
      this.jobConf = jobConf;
      this.reporter = reporter;
    }

    public MapTask getMapTask() {
      return mapTask;
    }

    public JobConf getJobConf() {
      return jobConf;
    }

    public TaskReporter getReporter() {
      return reporter;
    }
  }
}

In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector<KEY, VALUE> collector
      = (MapOutputCollector<KEY, VALUE>)
       ReflectionUtils.newInstance(
                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                        MapOutputBuffer.class, MapOutputCollector.class), job);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    MapOutputCollector.Context context =
                           new MapOutputCollector.Context(this, job, reporter);
    collector.init(context);
    return collector;
  }

In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).

//sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];

To be continued…..

YARN log files

Viewing MapReduce job log files has been a pain. With YARN, you can enable the log aggregation. This will pull and aggregate all the individual logs belonging to a MR job and allow one to view the aggregated log with the following command.

You can view your MapReduce job log files using the following command

yarn logs -applicationId  <YOUR_APPLICATION_ID>  |less

eg.

yarn logs -applicationId  application_1399561129519_4422 |less

You can enable the log aggregation in the yarn-site.xml as follows

cat /etc/hadoop/conf/yarn-site.xml

<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

To see the list of running MapReduce jobs

mapred job -list

To check the status of a MapReduce job

mapped job -status <YOUR_JOB_ID>

Events in MapReduce V2

In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.

AllEventTypes

Image

You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.


/**
 * Event types handled by Job.
 */
public enum JobEventType {

  //Producer:Client
  JOB_KILL,

  //Producer:MRAppMaster
  JOB_INIT,
  JOB_START,

  //Producer:Task
  JOB_TASK_COMPLETED,
  JOB_MAP_TASK_RESCHEDULED,
  JOB_TASK_ATTEMPT_COMPLETED,

  //Producer:CommitterEventHandler
  JOB_SETUP_COMPLETED,
  JOB_SETUP_FAILED,
  JOB_COMMIT_COMPLETED,
  JOB_COMMIT_FAILED,
  JOB_ABORT_COMPLETED,

  //Producer:Job
  JOB_COMPLETED,

  //Producer:Any component
  JOB_DIAGNOSTIC_UPDATE,
  INTERNAL_ERROR,
  JOB_COUNTER_UPDATE,

  //Producer:TaskAttemptListener
  JOB_TASK_ATTEMPT_FETCH_FAILURE,

  //Producer:RMContainerAllocator
  JOB_UPDATED_NODES

}

TaskTracker

TaskTracker is essentially a Thread running on a node. Internally it launches two inner TaskLauncher threads called mapLauncher and reduceLauncher. TaskLauncher thread uses a List to keep track of assigned tasks and starts the task in another TaskRunner thread. TaskRunner thread will then launch a JVM process for the task.

If we dig inside the codes deeper, you will find that TaskRunner contains  a JVMmanager instance. Inside JVMmanager, it contains two different JvmManagerForType instances, one for map tasks (mapJvmManager) and another for reduce tasks (reduceJvmManager).  Each is responsible of spawning JvmRunner thread which will actually launch a JVM process.