yarn rmadmin

Have you ever wondered what happened behind the scene when you execute
yarn rmadmin -refreshNodes

https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/bin/yarn

The above is the shell script that runs when you call the yarn command


rmadmin)
    CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
    hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS"
    HADOOP_OPTS="${HADOOP_OPTS} ${YARN_CLIENT_OPTS}"

As you can seen above, the shell script invokes the class org.apache.hadoop.yarn.client.cli.RMAdminCLI when we issue the command yarn rmadmin -refreshNodes

Here is the refreshNodes() method in the class org.apache.hadoop.yarn.client.cli.RMAdminCLI. It uses the ClientRMProxy to make RPC call to the ResourceManager refreshNodes() method.

private int refreshNodes() throws IOException, YarnException {
    // Refresh the nodes
    ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
    RefreshNodesRequest request = RefreshNodesRequest
        .newInstance(DecommissionType.NORMAL);
    adminProtocol.refreshNodes(request);
    return 0;
  }



 protected ResourceManagerAdministrationProtocol createAdminProtocol()
      throws IOException {
    // Get the current configuration
    final YarnConfiguration conf = new YarnConfiguration(getConf());
    return ClientRMProxy.createRMProxy(conf,
        ResourceManagerAdministrationProtocol.class);
  }

YARN NodesListManager

In a YARN cluster, you might encounter some strangler or problematic nodes occasionally that you would like to decommission from being used in your MR jobs.

You can add those nodes in the exclude file you defined in the yarn-site.xml. The property used to specify the exclude file is called yarn.resourcemanager.nodes.exclude-path

Then, you can execute the command to dynamically remove the nodes from the cluster

yarn rmadmin -refreshNodes

Behind the scene, if you look at the ResourceManager class, it has an instance of NodesListManager. This service class is responsible of parsing the exclude file and use the nodes list to check against any node manager request for registration.

public class ResourceManager extends CompositeService implements Recoverable {

  /**
   * Priority of the ResourceManager shutdown hook.
   */
  public static final int SHUTDOWN_HOOK_PRIORITY = 30;

  private static final Log LOG = LogFactory.getLog(ResourceManager.class);
  private static long clusterTimeStamp = System.currentTimeMillis();

  /**
   * "Always On" services. Services that need to run always irrespective of
   * the HA state of the RM.
   */
  @VisibleForTesting
  protected RMContextImpl rmContext;
  private Dispatcher rmDispatcher;
  @VisibleForTesting
  protected AdminService adminService;

  /**
   * "Active" services. Services that need to run only on the Active RM.
   * These services are managed (initialized, started, stopped) by the
   * {@link CompositeService} RMActiveServices.
   *
   * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
   * in Active state.
   */
  protected RMActiveServices activeServices;
  protected RMSecretManagerService rmSecretManagerService;

  protected ResourceScheduler scheduler;
  protected ReservationSystem reservationSystem;
  private ClientRMService clientRM;
  protected ApplicationMasterService masterService;
  protected NMLivelinessMonitor nmLivelinessMonitor;
  protected NodesListManager nodesListManager;
 

If you check the NodesListManager, there is a refreshNodes method which reads the configuration file and then read the exclude nodes in the specified exclude file.

Stay tuned for more infos.


 public void refreshNodes(Configuration yarnConf) throws IOException,
      YarnException {
    refreshHostsReader(yarnConf);

    for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
      if (!isValidNode(nodeId.getHost())) {
        this.rmContext.getDispatcher().getEventHandler().handle(
            new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
      }
    }
  }

private void refreshHostsReader(Configuration yarnConf) throws IOException,
      YarnException {
    synchronized (hostsReader) {
      if (null == yarnConf) {
        yarnConf = new YarnConfiguration();
      }
      includesFile =
          yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
              YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
      excludesFile =
          yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
              YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
      hostsReader.updateFileNames(includesFile, excludesFile);
      hostsReader.refresh(
          includesFile.isEmpty() ? null : this.rmContext
              .getConfigurationProvider().getConfigurationInputStream(
                  this.conf, includesFile), excludesFile.isEmpty() ? null
              : this.rmContext.getConfigurationProvider()
                  .getConfigurationInputStream(this.conf, excludesFile));
      printConfiguredHosts();
    }
  }