Cache Management in HDFS

The popularity of Spark has spurred  great interest in providing memory cache capability in HDFS. With the new release of Hadoop 2.3.0, centralized cache management has been introduced.

See the following JIRA.

https://issues.apache.org/jira/browse/HDFS-4949

You can view the design doc here. It is an excellent read. Kudos to both Andrew Wang and Colin Patrick McCabe. They are the instrumental and key contributors of this feature.

This cache memory feature allows memory local HDFS clients to use zero copy read (mlock native calls) to access these memory cached data to further speed up read operation. The mlock and mlockall native calls tell the system to lock to a specified memory range. This prevents the allocated memory from page fault.

You can learn about mlock from the following page.

https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_MRG/1.3/html/Realtime_Reference_Guide/sect-Realtime_Reference_Guide-Memory_allocation-Using_mlock_to_avoid_memory_faults.html

You can also find the Java native IO mlock and munlock call to memory mapped data in org.apache.hadoop.io.nativeio.NativeIO.java

https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

and the corresponding C native code
https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c

/**
* Used to manipulate the operating system cache.
*/
@VisibleForTesting
public static class CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
POSIX.mlock(buffer, len);
}

public long getMemlockLimit() {
return NativeIO.getMemlockLimit();
}

public long getOperatingSystemPageSize() {
return NativeIO.getOperatingSystemPageSize();
}

public void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}

public boolean verifyCanMlock() {
return NativeIO.isAvailable();
}
}

NameNode has to keep track of the memory cache replicas at different DataNodes with this feature. The cache blocks infos are exchanged between them via heartbeat protocol. As a result, additional metadata e.g.. cachedHosts array is introduced in BlockLocation as seen below.


public class BlockLocation {
  private String[] hosts; // Datanode hostnames
  <strong>private String[] cachedHosts; // Datanode hostnames with a cached replica</strong>
  private String[] names; // Datanode IP:xferPort for accessing the block
  private String[] topologyPaths; // Full path name in network topology
  private long offset;  // Offset of the block in the file
  private long length;
  private boolean corrupt;

Stay tuned for more infos…

Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

Hadoop MapTask Spill Mechanism Part 2

In the last post, we looked at MapOutputBuffer class. This is the class that maintains the in memory buffer byte array, byte[] kvbuffer during the map task intermediate output writing phase. As the buffer exceeds the threshold, it starts spilling data to the disk.

Inside MapOutputBuffer, there is a thread called SpillThread.

    final ReentrantLock spillLock = new ReentrantLock();
    final Condition spillDone = spillLock.newCondition();
    final Condition spillReady = spillLock.newCondition();
    final BlockingBuffer bb = new BlockingBuffer();
    volatile boolean spillThreadRunning = false;
    final SpillThread spillThread = new SpillThread();

It is a daemon thread and is initiated in the init method of MapOutputBuffer.


public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {

      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }

}

SpillThread is signalled to write out the data to disk (startSpill()) whenever the memory buffer soft limit is exceeded (if bufferRemaining <= 0 ) as the MapTask is writing the intermediate output in the buffer memory.


 public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {


if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }



}

Hope you all enjoy this post !

Hadoop MapTask Spill Mechanism

Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?

Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)

In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.

In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.

public interface MapOutputCollector<K, V> {
  public void init(Context context
                  ) throws IOException, ClassNotFoundException;
  public void collect(K key, V value, int partition
                     ) throws IOException, InterruptedException;
  public void close() throws IOException, InterruptedException;

  public void flush() throws IOException, InterruptedException,
                             ClassNotFoundException;

  @InterfaceAudience.LimitedPrivate({"MapReduce"})
  @InterfaceStability.Unstable
  public static class Context {
    private final MapTask mapTask;
    private final JobConf jobConf;
    private final TaskReporter reporter;

    public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
      this.mapTask = mapTask;
      this.jobConf = jobConf;
      this.reporter = reporter;
    }

    public MapTask getMapTask() {
      return mapTask;
    }

    public JobConf getJobConf() {
      return jobConf;
    }

    public TaskReporter getReporter() {
      return reporter;
    }
  }
}

In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector<KEY, VALUE> collector
      = (MapOutputCollector<KEY, VALUE>)
       ReflectionUtils.newInstance(
                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                        MapOutputBuffer.class, MapOutputCollector.class), job);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    MapOutputCollector.Context context =
                           new MapOutputCollector.Context(this, job, reporter);
    collector.init(context);
    return collector;
  }

In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).

//sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];

To be continued…..

Different Maven Profiles for Hadoop 1 and Hadoop 2

Since we have different versions of Hadoop, sometimes we have to compile/deploy our codes against one but not the other. The ideal way is to setup profiles in maven to compile and create the artifacts accordingly.

Here is the profiles section defined in HBase project. You can create the same profiles in your pom.

To compile against one of these profiles,
mvn -Dhadoop.profile=1.1 clean compile

<profiles>
		<profile>
			<id>hadoop-1.1</id>
			<activation>
				<property>
					<name>hadoop.profile</name>
					<value>1.1</value>
				</property>
			</activation>
			<dependencies>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-core</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-test</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
			</dependencies>
		</profile>
		<profile>
			<id>hadoop-1.0</id>
			<activation>
				<property>
					<name>hadoop.profile</name>
					<value>1.0</value>
				</property>
			</activation>
			<dependencies>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-core</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-test</artifactId>
					<version>${hadoop-1.version}</version>
				</dependency>
			</dependencies>
		</profile>
		<profile>
			<id>hadoop-2.0</id>
			<activation>
				<property>
					<name>!hadoop.profile</name>
				</property>
			</activation>
			<dependencyManagement>
				<dependencies>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-mapreduce-client-core</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
						<version>${hadoop-2.version}</version>
						<type>test-jar</type>
						<scope>test</scope>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-hdfs</artifactId>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-hdfs</artifactId>
						<version>${hadoop-2.version}</version>
						<type>test-jar</type>
						<scope>test</scope>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-auth</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-common</artifactId>
						<version>${hadoop-2.version}</version>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-client</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-annotations</artifactId>
						<version>${hadoop-2.version}</version>
					</dependency>
					<!-- This was marked as test dep in earlier pom, but was scoped compile. 
						Where do we actually need it? -->
					<dependency>
						<groupId>org.apache.hadoop</groupId>
						<artifactId>hadoop-minicluster</artifactId>
						<version>${hadoop-2.version}</version>
						<exclusions>
							<exclusion>
								<groupId>javax.servlet.jsp</groupId>
								<artifactId>jsp-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>javax.servlet</groupId>
								<artifactId>servlet-api</artifactId>
							</exclusion>
							<exclusion>
								<groupId>stax</groupId>
								<artifactId>stax-api</artifactId>
							</exclusion>
						</exclusions>
					</dependency>
				</dependencies>
			</dependencyManagement>
		</profile>
	</profiles>

Sqoop Mysql Data Import

In Sqoop, We can import data from MySQL and write to HDFS. It basically utilizes Mapper that opens up a pipe to mysqldump and pulls data directly. If you look at the following MySQLDumpMapper, it creates CopyingAsyncSink to process the MySQLDump stream via the reader thread, CopyingStreamThread. The thread reads data from mysqldump and cleans up any extra header/characters before writing to HDFS.

On the other hand, to export data from HDFS back to MySQL, it uses MySQLExportMapper that starts a mysqlimport process and uses that to export rows from HDFS to a MySQL database at high speed. Basically, it creates FIFO, a named pipe (using mknod –mode=0) to interface with mysqlimport behind the scene. A BufferedOutputStream is used to write bytes straight to mysqlimport process via the above named pipe, which is basically can be thought as a file.


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.sqoop.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.util.AsyncSink;
import org.apache.sqoop.util.JdbcUrl;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.manager.MySQLUtils;
import com.cloudera.sqoop.util.ErrorableAsyncSink;
import com.cloudera.sqoop.util.ErrorableThread;
import com.cloudera.sqoop.util.LoggingAsyncSink;

/**
 * Mapper that opens up a pipe to mysqldump and pulls data directly.
 */
public class MySQLDumpMapper
    extends SqoopMapper {

  public static final Log LOG = LogFactory.getLog(
      MySQLDumpMapper.class.getName());

  private Configuration conf;

  // AsyncSinks used to import data from mysqldump directly into HDFS.

  /**
   * Copies data directly from mysqldump into HDFS, after stripping some
   * header and footer characters that are attached to each line in mysqldump.
   */
  public static class CopyingAsyncSink extends ErrorableAsyncSink {
    private final MySQLDumpMapper.Context context;
    private final PerfCounters counters;

    protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
        final PerfCounters ctrs) {
      this.context = context;
      this.counters = ctrs;
    }

    public void processStream(InputStream is) {
      child = new CopyingStreamThread(is, context, counters);
      child.start();
    }

    private static class CopyingStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(
          CopyingStreamThread.class.getName());

      private final MySQLDumpMapper.Context context;
      private final InputStream stream;
      private final PerfCounters counters;

      CopyingStreamThread(final InputStream is,
          final Context c, final PerfCounters ctrs) {
        this.context = c;
        this.stream = is;
        this.counters = ctrs;
      }

      public void run() {
        BufferedReader r = null;

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // Actually do the read/write transfer loop here.
          int preambleLen = -1; // set to this for "undefined"
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF.
            }

            if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
              continue; // comments and empty lines are ignored
            }

            // this line is of the form "INSERT .. VALUES ( actual value text
            // );" strip the leading preamble up to the '(' and the trailing
            // ');'.
            if (preambleLen == -1) {
              // we haven't determined how long the preamble is. It's constant
              // across all lines, so just figure this out once.
              String recordStartMark = "VALUES (";
              preambleLen = inLine.indexOf(recordStartMark)
                  + recordStartMark.length();
            }

            // chop off the leading and trailing text as we write the
            // output to HDFS.
            int len = inLine.length() - 2 - preambleLen;
            context.write(inLine.substring(preambleLen, inLine.length() - 2)
                + "n", null);
            counters.addBytes(1 + len);
          }
        } catch (IOException ioe) {
          LOG.error("IOException reading from mysqldump: " + ioe.toString());
          // flag this error so we get an error status back in the caller.
          setError();
        } catch (InterruptedException ie) {
          LOG.error("InterruptedException reading from mysqldump: "
              + ie.toString());
          // flag this error so we get an error status back in the caller.
          setError();
        } finally {
          if (null != r) {
            try {
              r.close();
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());
            }
          }
        }
      }
    }
  }


  /**
   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
   * output, and re-emit the text in the user's specified output format.
   */
  public static class ReparsingAsyncSink extends ErrorableAsyncSink {
    private final MySQLDumpMapper.Context context;
    private final Configuration conf;
    private final PerfCounters counters;

    protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
        final Configuration conf, final PerfCounters ctrs) {
      this.context = c;
      this.conf = conf;
      this.counters = ctrs;
    }

    public void processStream(InputStream is) {
      child = new ReparsingStreamThread(is, context, conf, counters);
      child.start();
    }

    private static class ReparsingStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(
          ReparsingStreamThread.class.getName());

      private final MySQLDumpMapper.Context context;
      private final Configuration conf;
      private final InputStream stream;
      private final PerfCounters counters;

      ReparsingStreamThread(final InputStream is,
          final MySQLDumpMapper.Context c, Configuration conf,
          final PerfCounters ctrs) {
        this.context = c;
        this.conf = conf;
        this.stream = is;
        this.counters = ctrs;
      }

      private static final char MYSQL_FIELD_DELIM = ',';
      private static final char MYSQL_RECORD_DELIM = 'n';
      private static final char MYSQL_ENCLOSE_CHAR = ''';
      private static final char MYSQL_ESCAPE_CHAR = '\';
      private static final boolean MYSQL_ENCLOSE_REQUIRED = false;

      private static final RecordParser MYSQLDUMP_PARSER;

      static {
        // build a record parser for mysqldump's format
        MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
      }

      public void run() {
        BufferedReader r = null;

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // Configure the output with the user's delimiters.
          char outputFieldDelim = (char) conf.getInt(
              MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
              DelimiterSet.NULL_CHAR);
          String outputFieldDelimStr = "" + outputFieldDelim;
          char outputRecordDelim = (char) conf.getInt(
              MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
              DelimiterSet.NULL_CHAR);
          String outputRecordDelimStr = "" + outputRecordDelim;
          char outputEnclose = (char) conf.getInt(
              MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
              DelimiterSet.NULL_CHAR);
          char outputEscape = (char) conf.getInt(
              MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
              DelimiterSet.NULL_CHAR);
          boolean outputEncloseRequired = conf.getBoolean(
              MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

          DelimiterSet delimiters = new DelimiterSet(
             outputFieldDelim,
             outputRecordDelim,
             outputEnclose,
             outputEscape,
             outputEncloseRequired);

          // Actually do the read/write transfer loop here.
          int preambleLen = -1; // set to this for "undefined"
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF.
            }

            if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
              continue; // comments and empty lines are ignored
            }

            // this line is of the form "INSERT .. VALUES ( actual value text
            // );" strip the leading preamble up to the '(' and the trailing
            // ');'.
            if (preambleLen == -1) {
              // we haven't determined how long the preamble is. It's constant
              // across all lines, so just figure this out once.
              String recordStartMark = "VALUES (";
              preambleLen = inLine.indexOf(recordStartMark)
                  + recordStartMark.length();
            }

            // Wrap the input string in a char buffer that ignores the leading
            // and trailing text.
            CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
                inLine.length() - 2);

            // Pass this along to the parser
            List fields = null;
            try {
              fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
            } catch (RecordParser.ParseError pe) {
              LOG.warn("ParseError reading from mysqldump: "
                  + pe.toString() + "; record skipped");
              continue; // Skip emitting this row.
            }

            // For all of the output fields, emit them using the delimiters
            // the user chooses.
            boolean first = true;
            StringBuilder sb = new StringBuilder();
            int recordLen = 1; // for the delimiter.
            for (String field : fields) {
              if (!first) {
                sb.append(outputFieldDelimStr);
              } else {
                first = false;
              }

              String fieldStr = FieldFormatter.escapeAndEnclose(field,
                  delimiters);
              sb.append(fieldStr);
              recordLen += fieldStr.length();
            }

            sb.append(outputRecordDelimStr);
            context.write(sb.toString(), null);
            counters.addBytes(recordLen);
          }
        } catch (IOException ioe) {
          LOG.error("IOException reading from mysqldump: " + ioe.toString());
          // flag this error so the parent can handle it appropriately.
          setError();
        } catch (InterruptedException ie) {
          LOG.error("InterruptedException reading from mysqldump: "
              + ie.toString());
          // flag this error so we get an error status back in the caller.
          setError();
        } finally {
          if (null != r) {
            try {
              r.close();
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());
            }
          }
        }
      }
    }
  }

  // TODO(aaron): Refactor this method to be much shorter.
  // CHECKSTYLE:OFF
  /**
   * Import the table into HDFS by using mysqldump to pull out the data from
   * the database and upload the files directly to HDFS.
   */
  public void map(String splitConditions, NullWritable val, Context context)
      throws IOException, InterruptedException {

    LOG.info("Beginning mysqldump fast path import");

    ArrayList args = new ArrayList();
    String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);

    // We need to parse the connect string URI to determine the database name.
    // Using java.net.URL directly on the connect string will fail because
    // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
    // scheme (everything before '://') and replace it with 'http', which we
    // know will work.
    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
    String databaseName = JdbcUrl.getDatabaseName(connectString);
    String hostname = JdbcUrl.getHostName(connectString);
    int port = JdbcUrl.getPort(connectString);

    if (null == databaseName) {
      throw new IOException("Could not determine database name");
    }

    LOG.info("Performing import of table " + tableName + " from database "
        + databaseName);

    args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.

    String password = conf.get(MySQLUtils.PASSWORD_KEY);
    String passwordFile = null;

    Process p = null;
    AsyncSink sink = null;
    AsyncSink errSink = null;
    PerfCounters counters = new PerfCounters();
    try {
      // --defaults-file must be the first argument.
      if (null != password &amp;&amp; password.length() &gt; 0) {
        passwordFile = MySQLUtils.writePasswordFile(conf);
        args.add("--defaults-file=" + passwordFile);
      }

      // Don't use the --where="" version because spaces in it can
      // confuse Java, and adding in surrounding quotes confuses Java as well.
      String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
          + " AND (" + splitConditions + ")";
      args.add("-w");
      args.add(whereClause);

      args.add("--host=" + hostname);
      if (-1 != port) {
        args.add("--port=" + Integer.toString(port));
      }
      args.add("--skip-opt");
      args.add("--compact");
      args.add("--no-create-db");
      args.add("--no-create-info");
      args.add("--quick"); // no buffering
      args.add("--single-transaction");

      String username = conf.get(MySQLUtils.USERNAME_KEY);
      if (null != username) {
        args.add("--user=" + username);
      }

      // If the user supplied extra args, add them here.
      String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
      if (null != extra) {
        for (String arg : extra) {
          args.add(arg);
        }
      }

      args.add(databaseName);
      args.add(tableName);

      // begin the import in an external process.
      LOG.debug("Starting mysqldump with arguments:");
      for (String arg : args) {
        LOG.debug("  " + arg);
      }

      // Actually start the mysqldump.
      p = Runtime.getRuntime().exec(args.toArray(new String[0]));

      // read from the stdout pipe into the HDFS writer.
      InputStream is = p.getInputStream();

      if (MySQLUtils.outputDelimsAreMySQL(conf)) {
        LOG.debug("Output delimiters conform to mysqldump; "
            + "using straight copy");
        sink = new CopyingAsyncSink(context, counters);
      } else {
        LOG.debug("User-specified delimiters; using reparsing import");
        LOG.info("Converting data to use specified delimiters.");
        LOG.info("(For the fastest possible import, use");
        LOG.info("--mysql-delimiters to specify the same field");
        LOG.info("delimiters as are used by mysqldump.)");
        sink = new ReparsingAsyncSink(context, conf, counters);
      }

      // Start an async thread to read and upload the whole stream.
      counters.startClock();
      sink.processStream(is);

      // Start an async thread to send stderr to log4j.
      errSink = new LoggingAsyncSink(LOG);
      errSink.processStream(p.getErrorStream());
    } finally {

      // block until the process is done.
      int result = 0;
      if (null != p) {
        while (true) {
          try {
            result = p.waitFor();
          } catch (InterruptedException ie) {
            // interrupted; loop around.
            continue;
          }

          break;
        }
      }

      // Remove the password file.
      if (null != passwordFile) {
        if (!new File(passwordFile).delete()) {
          LOG.error("Could not remove mysql password file " + passwordFile);
          LOG.error("You should remove this file to protect your credentials.");
        }
      }

      // block until the stream sink is done too.
      int streamResult = 0;
      if (null != sink) {
        while (true) {
          try {
            streamResult = sink.join();
          } catch (InterruptedException ie) {
            // interrupted; loop around.
            continue;
          }

          break;
        }
      }

      // Try to wait for stderr to finish, but regard any errors as advisory.
      if (null != errSink) {
        try {
          if (0 != errSink.join()) {
            LOG.info("Encountered exception reading stderr stream");
          }
        } catch (InterruptedException ie) {
          LOG.info("Thread interrupted waiting for stderr to complete: "
              + ie.toString());
        }
      }

      LOG.info("Transfer loop complete.");

      if (0 != result) {
        throw new IOException("mysqldump terminated with status "
            + Integer.toString(result));
      }

      if (0 != streamResult) {
        throw new IOException("Encountered exception in stream sink");
      }

      counters.stopClock();
      LOG.info("Transferred " + counters.toString());
    }
  }
  // CHECKSTYLE:ON

  @Override
  protected void setup(Context context)
    throws IOException, InterruptedException {
    super.setup(context);
    this.conf = context.getConfiguration();
  }
}