yarn rmadmin

Have you ever wondered what happened behind the scene when you execute
yarn rmadmin -refreshNodes

https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/bin/yarn

The above is the shell script that runs when you call the yarn command


rmadmin)
    CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
    hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS"
    HADOOP_OPTS="${HADOOP_OPTS} ${YARN_CLIENT_OPTS}"

As you can seen above, the shell script invokes the class org.apache.hadoop.yarn.client.cli.RMAdminCLI when we issue the command yarn rmadmin -refreshNodes

Here is the refreshNodes() method in the class org.apache.hadoop.yarn.client.cli.RMAdminCLI. It uses the ClientRMProxy to make RPC call to the ResourceManager refreshNodes() method.

private int refreshNodes() throws IOException, YarnException {
    // Refresh the nodes
    ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
    RefreshNodesRequest request = RefreshNodesRequest
        .newInstance(DecommissionType.NORMAL);
    adminProtocol.refreshNodes(request);
    return 0;
  }



 protected ResourceManagerAdministrationProtocol createAdminProtocol()
      throws IOException {
    // Get the current configuration
    final YarnConfiguration conf = new YarnConfiguration(getConf());
    return ClientRMProxy.createRMProxy(conf,
        ResourceManagerAdministrationProtocol.class);
  }

Mesos vs YARN

I will continue to add more infos as I learn and discover more about their differences.

Mesos paper

http://www.cs.berkeley.edu/~matei/papers/2011/nsdi_mesos.pdf

YARN intro article

http://developer.yahoo.com/blogs/hadoop/next-generation-apache-hadoop-mapreduce-scheduler-4141.html

YARN design doc

https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf

YARN Jira ticket

https://issues.apache.org/jira/browse/MAPREDUCE-279

 

YARN RPC Part II

YarnRPC is the abstract base class that defines several abstract methods to return RPC client proxy and RPC server. HadoopYarnProtoRPC extends YarnRPC to provide concrete implementations to the defined abstract methods.


public abstract class YarnRPC {

 public abstract Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf);

  public abstract void stopProxy(Object proxy, Configuration conf);

  public abstract Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig);

  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers) {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }

}

Inside HadoopYarnProtoRPC, it uses RPCFactoryProvider which acts as a Singleton Factory to return different implementations of RPCServerFactory and RPCClientFactory interfaces.

Currently, we have RPCServerFactoryPBImpl which implements RPCServerFactory interface and RPCClientFactoryPBImpl which implements RPCClientFactory interface in YARN. These PB factories in turn allows us to inject different Protocol Buffer protocol implementations based on the protocol class in the creation of RPC server and client proxy by the underlying Hadoop RPC.

RpcClientAndServerFactory

Some examples of Protocol Buffer protocol implementations are

org.apache.hadoop.yarn.api.impl.pb.service
AMRMProtocolPBServiceImpl (will be injected if the protocol is AMRMProtocol)
ClientRMProtocolPBServiceImpl (will be injected if the protocol is ClientRMProtocol)

org.apache.hadoop.yarn.api.impl.pb.client
AMRMProtocolPBClientImpl
ClientRMProtocolClientImp


/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.factories.impl.pb;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcServerFactory;

import com.google.protobuf.BlockingService;

public class RpcServerFactoryPBImpl implements RpcServerFactory {

  private static final Log LOG = LogFactory.getLog(RpcServerFactoryPBImpl.class);
  private static final String PROTO_GEN_PACKAGE_NAME = "org.apache.hadoop.yarn.proto";
  private static final String PROTO_GEN_CLASS_SUFFIX = "Service";
  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.service";
  private static final String PB_IMPL_CLASS_SUFFIX = "PBServiceImpl";
  
  private static final RpcServerFactoryPBImpl self = new RpcServerFactoryPBImpl();

  private Configuration localConf = new Configuration();
  private ConcurrentMap<Class<?>, Constructor<?>> serviceCache = new ConcurrentHashMap<Class<?>, Constructor<?>>();
  private ConcurrentMap<Class<?>, Method> protoCache = new ConcurrentHashMap<Class<?>, Method>();
  
  public static RpcServerFactoryPBImpl get() {
    return RpcServerFactoryPBImpl.self;
  }
  

  private RpcServerFactoryPBImpl() {
  }
  
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
      throws YarnException {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }
  
  @Override
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
      String portRangeConfig)
      throws YarnException {
    
    Constructor<?> constructor = serviceCache.get(protocol);
    if (constructor == null) {
      Class<?> pbServiceImplClazz = null;
      try {
        pbServiceImplClazz = localConf
            .getClassByName(getPbServiceImplClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getPbServiceImplClassName(protocol) + "]", e);
      }
      try {
        constructor = pbServiceImplClazz.getConstructor(protocol);
        constructor.setAccessible(true);
        serviceCache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnException("Could not find constructor with params: "
            + Long.TYPE + ", " + InetSocketAddress.class + ", "
            + Configuration.class, e);
      }
    }
    
    Object service = null;
    try {
      service = constructor.newInstance(instance);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (InstantiationException e) {
      throw new YarnException(e);
    }

    Class<?> pbProtocol = service.getClass().getInterfaces()[0];
    Method method = protoCache.get(protocol);
    if (method == null) {
      Class<?> protoClazz = null;
      try {
        protoClazz = localConf.getClassByName(getProtoClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getProtoClassName(protocol) + "]", e);
      }
      try {
        method = protoClazz.getMethod("newReflectiveBlockingService",
            pbProtocol.getInterfaces()[0]);
        method.setAccessible(true);
        protoCache.putIfAbsent(protocol, method);
      } catch (NoSuchMethodException e) {
        throw new YarnException(e);
      }
    }
    
    try {
      return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
          (BlockingService)method.invoke(null, service), portRangeConfig);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (IOException e) {
      throw new YarnException(e);
    }
  }
  
  private String getProtoClassName(Class<?> clazz) {
    String srcClassName = getClassName(clazz);
    return PROTO_GEN_PACKAGE_NAME + "." + srcClassName + "$" + srcClassName + PROTO_GEN_CLASS_SUFFIX;  
  }
  
  private String getPbServiceImplClassName(Class<?> clazz) {
    String srcPackagePart = getPackageName(clazz);
    String srcClassName = getClassName(clazz);
    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
    return destPackagePart + "." + destClassPart;
  }
  
  private String getClassName(Class<?> clazz) {
    String fqName = clazz.getName();
    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
  }
  
  private String getPackageName(Class<?> clazz) {
    return clazz.getPackage().getName();
  }

  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }
}

Events in MapReduce V2

In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.

AllEventTypes

Image

You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.


/**
 * Event types handled by Job.
 */
public enum JobEventType {

  //Producer:Client
  JOB_KILL,

  //Producer:MRAppMaster
  JOB_INIT,
  JOB_START,

  //Producer:Task
  JOB_TASK_COMPLETED,
  JOB_MAP_TASK_RESCHEDULED,
  JOB_TASK_ATTEMPT_COMPLETED,

  //Producer:CommitterEventHandler
  JOB_SETUP_COMPLETED,
  JOB_SETUP_FAILED,
  JOB_COMMIT_COMPLETED,
  JOB_COMMIT_FAILED,
  JOB_ABORT_COMPLETED,

  //Producer:Job
  JOB_COMPLETED,

  //Producer:Any component
  JOB_DIAGNOSTIC_UPDATE,
  INTERNAL_ERROR,
  JOB_COUNTER_UPDATE,

  //Producer:TaskAttemptListener
  JOB_TASK_ATTEMPT_FETCH_FAILURE,

  //Producer:RMContainerAllocator
  JOB_UPDATED_NODES

}

Hadoop YARN Event Bus

Hadoop YARN uses an event bus to centralize the dispatching of various events and handling of events by different declared event handlers. This means less plumbing codes to write and maintain. Also, it decouples the event handling logics from different interacting components into one place.

AsyncDispatcher is such event bus. It uses a linkedBlockingQueue to queue up incoming events. Also, it has an internal thread, called eventHandlingThread to listen on the above queue and take event out for processing by calling the dispatch(event) method. In this method, it basically retrieves the corresponding event handler to handle the event accordingly.

AsyncDispatcherUML

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;

/**
 * Dispatches events in a separate thread. Currently only single thread does
 * that. Potentially there could be multiple channels for each event type
 * class and a thread pool can be used to dispatch the events.
 */
@SuppressWarnings("rawtypes")
public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue eventQueue;
  private volatile boolean stopped = false;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  public AsyncDispatcher() {
    this(new LinkedBlockingQueue());
  }

  public AsyncDispatcher(BlockingQueue eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
  }

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }

  @Override
  public synchronized void init(Configuration conf) {
    this.exitOnDispatchException =
        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
    super.init(conf);
  }

  @Override
  public void start() {
    //start all the components
    super.start();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");
    eventHandlingThread.start();
  }

  @Override
  public void stop() {
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.stop();
  }

  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler registeredHandler = (EventHandler)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

  @Override
  public EventHandler getEventHandler() {
    return new GenericEventHandler();
  }

  class GenericEventHandler implements EventHandler {
    public void handle(Event event) {
      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new YarnException(e);
      }
    };
  }

  /**
   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param  the type of event these multiple handlers are interested in.
   */
  static class MultiListenerHandler implements EventHandler {
    List<EventHandler> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler>();
    }

    @Override
    public void handle(Event event) {
      for (EventHandler handler: listofHandlers) {
        handler.handle(event);
      }
    }

    void addHandler(EventHandler handler) {
      listofHandlers.add(handler);
    }

  }
}

Hadoop YARN RPC (part I)

I have spent some time digging into YARN RPC source codes. Personally, I like the use of Factory pattern to inject different RPC proxy client protocol and server implementations to the framework. It looks way cleaner and better compared to the older versions of Hadoop.

For example: RpcServerFactoryPBImpl is the implementation of RpcServerFactory interface to create Protobuf RPC Server. Basically, it delegates the creation using the standard Hadoop RPC class.

Look at the following snippet in RpcServerFactoryPBImpl,

private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }

We could create a new type of RPC Server Factory called RpcServerFactoryMyOwnImpl that implements the above createServer method which would return our own RPC Server.

In YARN framework, HadoopYarnProtoRPC is the class uses these factories.
Basically, it calls RpcFactoryProvider.getServerFactory(conf) to get the right RpcServerFactory implementation.


public class HadoopYarnProtoRPC extends YarnRPC {

  private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class);

  @Override
  public Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf) {
    LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
    return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
        addr, conf);
  }

  @Override
  public void stopProxy(Object proxy, Configuration conf) {
    RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
  }

  @Override
  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig) {
    LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + 
        " with " + numHandlers + " handlers");
    
    return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, 
        instance, addr, conf, secretManager, numHandlers, portRangeConfig);

  }

}

To be continued…