Deep Dive into YARN NodeManager

YARN framework is an event driven framework. It uses event handlers to listen and trigger callbacks to handle various events sent by components to the event queue. Threads are also being used by some event handlers to run long running logic after receiving the event. eg. in ResourceLocalizationService, during the event loop handling, it initializes LocalizerRunner thread to download remote resources. NodeManager is a core component in YARN. It consists of the following services internally

NodeStatusUpdater
NodeLabelsProvider
NodeResourceMonitor
ContainerManagerImpl
NMStateStoreService
NodeHealthCheckerService
LocalDirsHandlerService

Yes, it has a WebServer internally too.

These services are initialized within the serviceInit method


 @Override
  protected void serviceInit(Configuration conf) throws Exception 

One of the internal services, ContainerManagerImpl extends CompositeService and implements ServiceStateChangeListener, ContainerManagementProtocol, and EventHandler interfaces. It contains event handlers called ResourceLocalizationService and ContainersLauncher.

public class ContainerManagerImpl extends CompositeService implements
    ServiceStateChangeListener, ContainerManagementProtocol,
    EventHandler<ContainerManagerEvent>

Both ResourceLocalizationService and ContainerLauncher are defined in ContainerManagerImpl as follows

 private final ResourceLocalizationService rsrcLocalizationSrvc;
 private final ContainersLauncher containersLauncher;

As you can see in the class definition of ResourceLocalizationService, it implements EventHandler interface to handle LocalizationEvent type.

public class ResourceLocalizationService extends CompositeService
    implements EventHandler<LocalizationEvent>, LocalizationProtocol {

ContainersLauncher is the event handler responsible for initialization, starting, launching, and termination of containers. It implements EventHandler interface to handle ContainersLauncherEvent type, eg. LAUNCH_CONTAINER, RECOVER_CONTAINER, CLEANUP_CONTAINER events.

public class ContainersLauncher extends AbstractService
    implements EventHandler<ContainersLauncherEvent> {

ContainerManagerImpl registers the above event handlers as shown below.


dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);

In addition, it also registers other event handlers to listen and handle ContainersMonitorEvent, ContainerEvent, ApplicationEvent, and AuxServicesEvent.

dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, new ApplicationEventDispatcher());

In NodeManager, it has an important class called DefaultContainerExecutor which extends ContainerExecutor.

ContainerExecutor exec = ReflectionUtils.newInstance(
        conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
          DefaultContainerExecutor.class, ContainerExecutor.class), conf);
    try {
      exec.init();
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to initialize container executor", e);
    }    

This DefaultContainerExecutor provides the methods to activate and launch container. This class is used by both ResourceLocalizationService and ContainersLauncher. Both ResourceLocalizationService and ContainersLauncher are instantiated in ContainerManagerImpl.

If you look into DefaultContainerExecutor, you will find methods like
activateContainer, launchContainer used by ContainersLauncher whereas method startLocalizer is called by ResourceLocalizationService.

After a container is created, the required artifacts for the job have to be downloaded first, this is what we refer to as localization in YARN. Localization is to download remote resources onto the local file system. You can check out this excellent blog post by Hortonworks about resource localization.
http://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/

I will blog post about resource localization next time. Stay tuned.

yarn rmadmin

Have you ever wondered what happened behind the scene when you execute
yarn rmadmin -refreshNodes

https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/bin/yarn

The above is the shell script that runs when you call the yarn command


rmadmin)
    CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
    hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS"
    HADOOP_OPTS="${HADOOP_OPTS} ${YARN_CLIENT_OPTS}"

As you can seen above, the shell script invokes the class org.apache.hadoop.yarn.client.cli.RMAdminCLI when we issue the command yarn rmadmin -refreshNodes

Here is the refreshNodes() method in the class org.apache.hadoop.yarn.client.cli.RMAdminCLI. It uses the ClientRMProxy to make RPC call to the ResourceManager refreshNodes() method.

private int refreshNodes() throws IOException, YarnException {
    // Refresh the nodes
    ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
    RefreshNodesRequest request = RefreshNodesRequest
        .newInstance(DecommissionType.NORMAL);
    adminProtocol.refreshNodes(request);
    return 0;
  }



 protected ResourceManagerAdministrationProtocol createAdminProtocol()
      throws IOException {
    // Get the current configuration
    final YarnConfiguration conf = new YarnConfiguration(getConf());
    return ClientRMProxy.createRMProxy(conf,
        ResourceManagerAdministrationProtocol.class);
  }

YARN NodesListManager

In a YARN cluster, you might encounter some strangler or problematic nodes occasionally that you would like to decommission from being used in your MR jobs.

You can add those nodes in the exclude file you defined in the yarn-site.xml. The property used to specify the exclude file is called yarn.resourcemanager.nodes.exclude-path

Then, you can execute the command to dynamically remove the nodes from the cluster

yarn rmadmin -refreshNodes

Behind the scene, if you look at the ResourceManager class, it has an instance of NodesListManager. This service class is responsible of parsing the exclude file and use the nodes list to check against any node manager request for registration.

public class ResourceManager extends CompositeService implements Recoverable {

  /**
   * Priority of the ResourceManager shutdown hook.
   */
  public static final int SHUTDOWN_HOOK_PRIORITY = 30;

  private static final Log LOG = LogFactory.getLog(ResourceManager.class);
  private static long clusterTimeStamp = System.currentTimeMillis();

  /**
   * "Always On" services. Services that need to run always irrespective of
   * the HA state of the RM.
   */
  @VisibleForTesting
  protected RMContextImpl rmContext;
  private Dispatcher rmDispatcher;
  @VisibleForTesting
  protected AdminService adminService;

  /**
   * "Active" services. Services that need to run only on the Active RM.
   * These services are managed (initialized, started, stopped) by the
   * {@link CompositeService} RMActiveServices.
   *
   * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
   * in Active state.
   */
  protected RMActiveServices activeServices;
  protected RMSecretManagerService rmSecretManagerService;

  protected ResourceScheduler scheduler;
  protected ReservationSystem reservationSystem;
  private ClientRMService clientRM;
  protected ApplicationMasterService masterService;
  protected NMLivelinessMonitor nmLivelinessMonitor;
  protected NodesListManager nodesListManager;
 

If you check the NodesListManager, there is a refreshNodes method which reads the configuration file and then read the exclude nodes in the specified exclude file.

Stay tuned for more infos.


 public void refreshNodes(Configuration yarnConf) throws IOException,
      YarnException {
    refreshHostsReader(yarnConf);

    for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
      if (!isValidNode(nodeId.getHost())) {
        this.rmContext.getDispatcher().getEventHandler().handle(
            new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
      }
    }
  }

private void refreshHostsReader(Configuration yarnConf) throws IOException,
      YarnException {
    synchronized (hostsReader) {
      if (null == yarnConf) {
        yarnConf = new YarnConfiguration();
      }
      includesFile =
          yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
              YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
      excludesFile =
          yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
              YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
      hostsReader.updateFileNames(includesFile, excludesFile);
      hostsReader.refresh(
          includesFile.isEmpty() ? null : this.rmContext
              .getConfigurationProvider().getConfigurationInputStream(
                  this.conf, includesFile), excludesFile.isEmpty() ? null
              : this.rmContext.getConfigurationProvider()
                  .getConfigurationInputStream(this.conf, excludesFile));
      printConfiguredHosts();
    }
  }

YARN log files

Viewing MapReduce job log files has been a pain. With YARN, you can enable the log aggregation. This will pull and aggregate all the individual logs belonging to a MR job and allow one to view the aggregated log with the following command.

You can view your MapReduce job log files using the following command

yarn logs -applicationId  <YOUR_APPLICATION_ID>  |less

eg.

yarn logs -applicationId  application_1399561129519_4422 |less

You can enable the log aggregation in the yarn-site.xml as follows

cat /etc/hadoop/conf/yarn-site.xml

<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

To see the list of running MapReduce jobs

mapred job -list

To check the status of a MapReduce job

mapped job -status <YOUR_JOB_ID>

Mesos vs YARN

I will continue to add more infos as I learn and discover more about their differences.

Mesos paper

http://www.cs.berkeley.edu/~matei/papers/2011/nsdi_mesos.pdf

YARN intro article

http://developer.yahoo.com/blogs/hadoop/next-generation-apache-hadoop-mapreduce-scheduler-4141.html

YARN design doc

https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf

YARN Jira ticket

https://issues.apache.org/jira/browse/MAPREDUCE-279

 

YARN RPC Part II

YarnRPC is the abstract base class that defines several abstract methods to return RPC client proxy and RPC server. HadoopYarnProtoRPC extends YarnRPC to provide concrete implementations to the defined abstract methods.


public abstract class YarnRPC {

 public abstract Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf);

  public abstract void stopProxy(Object proxy, Configuration conf);

  public abstract Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig);

  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers) {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }

}

Inside HadoopYarnProtoRPC, it uses RPCFactoryProvider which acts as a Singleton Factory to return different implementations of RPCServerFactory and RPCClientFactory interfaces.

Currently, we have RPCServerFactoryPBImpl which implements RPCServerFactory interface and RPCClientFactoryPBImpl which implements RPCClientFactory interface in YARN. These PB factories in turn allows us to inject different Protocol Buffer protocol implementations based on the protocol class in the creation of RPC server and client proxy by the underlying Hadoop RPC.

RpcClientAndServerFactory

Some examples of Protocol Buffer protocol implementations are

org.apache.hadoop.yarn.api.impl.pb.service
AMRMProtocolPBServiceImpl (will be injected if the protocol is AMRMProtocol)
ClientRMProtocolPBServiceImpl (will be injected if the protocol is ClientRMProtocol)

org.apache.hadoop.yarn.api.impl.pb.client
AMRMProtocolPBClientImpl
ClientRMProtocolClientImp


/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.factories.impl.pb;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcServerFactory;

import com.google.protobuf.BlockingService;

public class RpcServerFactoryPBImpl implements RpcServerFactory {

  private static final Log LOG = LogFactory.getLog(RpcServerFactoryPBImpl.class);
  private static final String PROTO_GEN_PACKAGE_NAME = "org.apache.hadoop.yarn.proto";
  private static final String PROTO_GEN_CLASS_SUFFIX = "Service";
  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.service";
  private static final String PB_IMPL_CLASS_SUFFIX = "PBServiceImpl";
  
  private static final RpcServerFactoryPBImpl self = new RpcServerFactoryPBImpl();

  private Configuration localConf = new Configuration();
  private ConcurrentMap<Class<?>, Constructor<?>> serviceCache = new ConcurrentHashMap<Class<?>, Constructor<?>>();
  private ConcurrentMap<Class<?>, Method> protoCache = new ConcurrentHashMap<Class<?>, Method>();
  
  public static RpcServerFactoryPBImpl get() {
    return RpcServerFactoryPBImpl.self;
  }
  

  private RpcServerFactoryPBImpl() {
  }
  
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
      throws YarnException {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }
  
  @Override
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
      String portRangeConfig)
      throws YarnException {
    
    Constructor<?> constructor = serviceCache.get(protocol);
    if (constructor == null) {
      Class<?> pbServiceImplClazz = null;
      try {
        pbServiceImplClazz = localConf
            .getClassByName(getPbServiceImplClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getPbServiceImplClassName(protocol) + "]", e);
      }
      try {
        constructor = pbServiceImplClazz.getConstructor(protocol);
        constructor.setAccessible(true);
        serviceCache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnException("Could not find constructor with params: "
            + Long.TYPE + ", " + InetSocketAddress.class + ", "
            + Configuration.class, e);
      }
    }
    
    Object service = null;
    try {
      service = constructor.newInstance(instance);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (InstantiationException e) {
      throw new YarnException(e);
    }

    Class<?> pbProtocol = service.getClass().getInterfaces()[0];
    Method method = protoCache.get(protocol);
    if (method == null) {
      Class<?> protoClazz = null;
      try {
        protoClazz = localConf.getClassByName(getProtoClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getProtoClassName(protocol) + "]", e);
      }
      try {
        method = protoClazz.getMethod("newReflectiveBlockingService",
            pbProtocol.getInterfaces()[0]);
        method.setAccessible(true);
        protoCache.putIfAbsent(protocol, method);
      } catch (NoSuchMethodException e) {
        throw new YarnException(e);
      }
    }
    
    try {
      return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
          (BlockingService)method.invoke(null, service), portRangeConfig);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (IOException e) {
      throw new YarnException(e);
    }
  }
  
  private String getProtoClassName(Class<?> clazz) {
    String srcClassName = getClassName(clazz);
    return PROTO_GEN_PACKAGE_NAME + "." + srcClassName + "$" + srcClassName + PROTO_GEN_CLASS_SUFFIX;  
  }
  
  private String getPbServiceImplClassName(Class<?> clazz) {
    String srcPackagePart = getPackageName(clazz);
    String srcClassName = getClassName(clazz);
    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
    return destPackagePart + "." + destClassPart;
  }
  
  private String getClassName(Class<?> clazz) {
    String fqName = clazz.getName();
    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
  }
  
  private String getPackageName(Class<?> clazz) {
    return clazz.getPackage().getName();
  }

  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }
}

Events in MapReduce V2

In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.

AllEventTypes

Image

You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.


/**
 * Event types handled by Job.
 */
public enum JobEventType {

  //Producer:Client
  JOB_KILL,

  //Producer:MRAppMaster
  JOB_INIT,
  JOB_START,

  //Producer:Task
  JOB_TASK_COMPLETED,
  JOB_MAP_TASK_RESCHEDULED,
  JOB_TASK_ATTEMPT_COMPLETED,

  //Producer:CommitterEventHandler
  JOB_SETUP_COMPLETED,
  JOB_SETUP_FAILED,
  JOB_COMMIT_COMPLETED,
  JOB_COMMIT_FAILED,
  JOB_ABORT_COMPLETED,

  //Producer:Job
  JOB_COMPLETED,

  //Producer:Any component
  JOB_DIAGNOSTIC_UPDATE,
  INTERNAL_ERROR,
  JOB_COUNTER_UPDATE,

  //Producer:TaskAttemptListener
  JOB_TASK_ATTEMPT_FETCH_FAILURE,

  //Producer:RMContainerAllocator
  JOB_UPDATED_NODES

}