This is a great paper about implementing an efficient large scale parallel version of L-BFGS algorithm in MapReduce.
http://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf
This is a great paper about implementing an efficient large scale parallel version of L-BFGS algorithm in MapReduce.
http://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf
What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?
If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.
Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.
http://www.ibm.com/developerworks/library/j-zerocopy/
Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)
A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).
As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.
https://issues.apache.org/jira/browse/HADOOP-3164
Also, it uses the same technique in sending shuffle files. Please see
In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.
Spark-2468 : introduce the same transferTo technique in sending shuffle files.
Have you ever wondered how the Hadoop map task’s sort and spill mechanism code looks like ?
Here you goes. You can browse the following class.
MapTask.java (from hadoop mapreduce project on github)
In the map task, there is a buffer in memory to store the output of the map task. When the buffer exceeds the threshold, it spills the data to disk. Two parameters are specified to control this behavior. io.sort.mb (defaults to 100MB) is the size of the buffer and io.sort.spill.percent (defaults to 80%) is the threshold before spilling to the disk.
In fact, the MapOutputBuffer is pluggable now (MAPREDUCE-4807) New interface of MapOutputCollector.
public interface MapOutputCollector<K, V> { public void init(Context context ) throws IOException, ClassNotFoundException; public void collect(K key, V value, int partition ) throws IOException, InterruptedException; public void close() throws IOException, InterruptedException; public void flush() throws IOException, InterruptedException, ClassNotFoundException; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable public static class Context { private final MapTask mapTask; private final JobConf jobConf; private final TaskReporter reporter; public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) { this.mapTask = mapTask; this.jobConf = jobConf; this.reporter = reporter; } public MapTask getMapTask() { return mapTask; } public JobConf getJobConf() { return jobConf; } public TaskReporter getReporter() { return reporter; } } }
In the MapTask.java, creatingSortingCollector instantiates MapOutputBuffer, the implementation of MapOutputCollector via reflection.
private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector<KEY, VALUE>) ReflectionUtils.newInstance( job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class, MapOutputCollector.class), job); LOG.info("Map output collector class = " + collector.getClass().getName()); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); collector.init(context); return collector; }
In the MapOutputBuffer, you can see both the io.sort.mb and io.sort.spill.percent are used to determine the size of the in memory buffer (maxMemoryUsage).
//sanity checks final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; kvbuffer = new byte[maxMemUsage];
To be continued…..
Viewing MapReduce job log files has been a pain. With YARN, you can enable the log aggregation. This will pull and aggregate all the individual logs belonging to a MR job and allow one to view the aggregated log with the following command.
You can view your MapReduce job log files using the following command
yarn logs -applicationId <YOUR_APPLICATION_ID> |less
eg.
yarn logs -applicationId application_1399561129519_4422 |less
You can enable the log aggregation in the yarn-site.xml as follows
cat /etc/hadoop/conf/yarn-site.xml
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
To see the list of running MapReduce jobs
mapred job -list
To check the status of a MapReduce job
mapped job -status <YOUR_JOB_ID>
In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.
You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.
/** * Event types handled by Job. */ public enum JobEventType { //Producer:Client JOB_KILL, //Producer:MRAppMaster JOB_INIT, JOB_START, //Producer:Task JOB_TASK_COMPLETED, JOB_MAP_TASK_RESCHEDULED, JOB_TASK_ATTEMPT_COMPLETED, //Producer:CommitterEventHandler JOB_SETUP_COMPLETED, JOB_SETUP_FAILED, JOB_COMMIT_COMPLETED, JOB_COMMIT_FAILED, JOB_ABORT_COMPLETED, //Producer:Job JOB_COMPLETED, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, INTERNAL_ERROR, JOB_COUNTER_UPDATE, //Producer:TaskAttemptListener JOB_TASK_ATTEMPT_FETCH_FAILURE, //Producer:RMContainerAllocator JOB_UPDATED_NODES }
TaskTracker is essentially a Thread running on a node. Internally it launches two inner TaskLauncher threads called mapLauncher and reduceLauncher. TaskLauncher thread uses a List to keep track of assigned tasks and starts the task in another TaskRunner thread. TaskRunner thread will then launch a JVM process for the task.
If we dig inside the codes deeper, you will find that TaskRunner contains a JVMmanager instance. Inside JVMmanager, it contains two different JvmManagerForType instances, one for map tasks (mapJvmManager) and another for reduce tasks (reduceJvmManager). Each is responsible of spawning JvmRunner thread which will actually launch a JVM process.