Hadoop YARN Event Bus

Hadoop YARN uses an event bus to centralize the dispatching of various events and handling of events by different declared event handlers. This means less plumbing codes to write and maintain. Also, it decouples the event handling logics from different interacting components into one place.

AsyncDispatcher is such event bus. It uses a linkedBlockingQueue to queue up incoming events. Also, it has an internal thread, called eventHandlingThread to listen on the above queue and take event out for processing by calling the dispatch(event) method. In this method, it basically retrieves the corresponding event handler to handle the event accordingly.

AsyncDispatcherUML

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;

/**
 * Dispatches events in a separate thread. Currently only single thread does
 * that. Potentially there could be multiple channels for each event type
 * class and a thread pool can be used to dispatch the events.
 */
@SuppressWarnings("rawtypes")
public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue eventQueue;
  private volatile boolean stopped = false;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  public AsyncDispatcher() {
    this(new LinkedBlockingQueue());
  }

  public AsyncDispatcher(BlockingQueue eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
  }

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }

  @Override
  public synchronized void init(Configuration conf) {
    this.exitOnDispatchException =
        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
    super.init(conf);
  }

  @Override
  public void start() {
    //start all the components
    super.start();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");
    eventHandlingThread.start();
  }

  @Override
  public void stop() {
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.stop();
  }

  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler registeredHandler = (EventHandler)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

  @Override
  public EventHandler getEventHandler() {
    return new GenericEventHandler();
  }

  class GenericEventHandler implements EventHandler {
    public void handle(Event event) {
      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new YarnException(e);
      }
    };
  }

  /**
   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param  the type of event these multiple handlers are interested in.
   */
  static class MultiListenerHandler implements EventHandler {
    List<EventHandler> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler>();
    }

    @Override
    public void handle(Event event) {
      for (EventHandler handler: listofHandlers) {
        handler.handle(event);
      }
    }

    void addHandler(EventHandler handler) {
      listofHandlers.add(handler);
    }

  }
}

Hadoop YARN RPC (part I)

I have spent some time digging into YARN RPC source codes. Personally, I like the use of Factory pattern to inject different RPC proxy client protocol and server implementations to the framework. It looks way cleaner and better compared to the older versions of Hadoop.

For example: RpcServerFactoryPBImpl is the implementation of RpcServerFactory interface to create Protobuf RPC Server. Basically, it delegates the creation using the standard Hadoop RPC class.

Look at the following snippet in RpcServerFactoryPBImpl,

private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }

We could create a new type of RPC Server Factory called RpcServerFactoryMyOwnImpl that implements the above createServer method which would return our own RPC Server.

In YARN framework, HadoopYarnProtoRPC is the class uses these factories.
Basically, it calls RpcFactoryProvider.getServerFactory(conf) to get the right RpcServerFactory implementation.


public class HadoopYarnProtoRPC extends YarnRPC {

  private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class);

  @Override
  public Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf) {
    LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
    return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
        addr, conf);
  }

  @Override
  public void stopProxy(Object proxy, Configuration conf) {
    RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
  }

  @Override
  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig) {
    LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + 
        " with " + numHandlers + " handlers");
    
    return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, 
        instance, addr, conf, secretManager, numHandlers, portRangeConfig);

  }

}

To be continued…

RpcEngine

There are different RPC engines available for Hadoop. They are WritableRpcEngine, ProtobufRpcEngine, and AvroRpcEngine. Every RPC engine implements the RpcEngine interface. As you see in the interface, any new RpcEngine has to provide implementations of getProxy, getServer, getProtocolMetaInfoProxy, and call methods.

 
class WritableRpcEngine implements RpcEngine
class AvroRpcEngine implements RpcEngine
class ProtobufRpcEngine implements RpcEngine 
 
public interface RpcEngine {

  /** Construct a client-side proxy object. 
   * @param <T>*/
  <T> ProtocolProxy<T> getProxy(Class<T> protocol,
                  long clientVersion, InetSocketAddress addr,
                  UserGroupInformation ticket, Configuration conf,
                  SocketFactory factory, int rpcTimeout) throws IOException;


  /** Expert: Make multiple, parallel calls to a set of servers. */
  Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
                UserGroupInformation ticket, Configuration conf)
    throws IOException, InterruptedException;

  /** 
   * Construct a server for a protocol implementation instance.
   */
  RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
                       int port, int numHandlers, int numReaders,
                       int queueSizePerHandler, boolean verbose,
                       Configuration conf, 
                       SecretManager<? extends TokenIdentifier> secretManager,
                       String portRangeConfig
                       ) throws IOException;

  /**
   * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection
   * id.
   */
  ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
      ConnectionId connId, Configuration conf, SocketFactory factory)
      throws IOException;
}

The main difference among different RpcEngines is the data exchange wire format they uses. For example,
WritableRpcEngine uses Writable as the data exchange wire format whereas ProtobufRpcEngine uses Protobuf as the wire format.

If you examine the codes of these RpcEngines, you will notice that all of them have a static Server class which extends RPC.Server to inherit basic remote server networking services from the base abstract class. In my previous post, we look at the underlying networking mechanism in the RPC.Server. (eg. start Listener for incoming request and start multiple Readers to read incoming RPC data and queue RPC calls to be processed by multiple Handlers)

The static Server class in ProtobufRpcEngine

public static class Server extends RPC.Server {
    /**
     * Construct an RPC server.
     * 
     * @param protocolClass the class of protocol
     * @param protocolImpl the protocolImpl whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     * @param portRangeConfig A config parameter that can be used to restrict
     * the range of ports used when port is 0 (an ephemeral port)
     */
    public Server(Class<?> protocolClass, Object protocolImpl,
        Configuration conf, String bindAddress, int port, int numHandlers,
        int numReaders, int queueSizePerHandler, boolean verbose,
        SecretManager<? extends TokenIdentifier> secretManager, 
        String portRangeConfig)
        throws IOException {
      super(bindAddress, port, null, numHandlers,
          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
              .getClass().getName()), secretManager, portRangeConfig);
      this.verbose = verbose;  
      registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
          protocolImpl);
    }

The static Server class in WritableRpcEngine

/** An RPC Server. */
  public static class Server extends RPC.Server {
    /**
     * Construct an RPC server.
     * @param instance the instance whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * 
     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
     */
    @Deprecated
    public Server(Object instance, Configuration conf, String bindAddress,
        int port) throws IOException {
      this(null, instance, conf,  bindAddress, port);
    }
    
    
    /** Construct an RPC server.
     * @param protocolClass class
     * @param protocolImpl the instance whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     */
    public Server(Class<?> protocolClass, Object protocolImpl, 
        Configuration conf, String bindAddress, int port) 
      throws IOException {
      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
          false, null, null);
    }
    
    /** 
     * Construct an RPC server.
     * @param protocolImpl the instance whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     * 
     * @deprecated use Server#Server(Class, Object, 
     *      Configuration, String, int, int, int, int, boolean, SecretManager)
     */
    @Deprecated
    public Server(Object protocolImpl, Configuration conf, String bindAddress,
        int port, int numHandlers, int numReaders, int queueSizePerHandler,
        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
            throws IOException {
       this(null, protocolImpl,  conf,  bindAddress,   port,
                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
                   secretManager, null);
   
    }
    
    /** 
     * Construct an RPC server.
     * @param protocolClass - the protocol being registered
     *     can be null for compatibility with old usage (see below for details)
     * @param protocolImpl the protocol impl that will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     */
    public Server(Class<?> protocolClass, Object protocolImpl,
        Configuration conf, String bindAddress,  int port,
        int numHandlers, int numReaders, int queueSizePerHandler, 
        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
        String portRangeConfig) 
        throws IOException {
      super(bindAddress, port, null, numHandlers, numReaders,
          queueSizePerHandler, conf,
          classNameBase(protocolImpl.getClass().getName()), secretManager,
          portRangeConfig);

      this.verbose = verbose;
      
      
      Class<?>[] protocols;
      if (protocolClass == null) { // derive protocol from impl
        /*
         * In order to remain compatible with the old usage where a single
         * target protocolImpl is suppled for all protocol interfaces, and
         * the protocolImpl is derived from the protocolClass(es) 
         * we register all interfaces extended by the protocolImpl
         */
        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());

      } else {
        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
          throw new IOException("protocolClass "+ protocolClass +
              " is not implemented by protocolImpl which is of class " +
              protocolImpl.getClass());
        }
        // register protocol class and its super interfaces
        registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
        protocols = RPC.getProtocolInterfaces(protocolClass);
      }
      for (Class<?> p : protocols) {
        if (!p.equals(VersionedProtocol.class)) {
          registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
        }
      }

    }

    private static void log(String value) {
      if (value!= null && value.length() > 55)
        value = value.substring(0, 55)+"...";
      LOG.info(value);
    }

Also, each implementation has an Invoker class that implements the InvocationHandler interface in order to intercept the RPC method call requested by Proxy client and translate and marshall them to bytes over the network to the remote RPC server.

We will dig deeper into each of the RpcEngine implementations in upcoming posts.

But, I still feel like the abstract layer of pluggable wire format could be improved further.

Hadoop RPC

There are two RPC servers (org.apache.hadoop.ipc.Server) inside NameNode. One is mainly for communication with clients while the other is for communication between Datanodes, BackupNode and NameNode.

RPC server has a Listener thread listening for incoming connection. Listener thread creates a server socket in non blocking mode and uses Selector to listen for connection accept event. At the same time, it instantiates an array of Reader threads. Each reader thread is given a read Selector. This allows multiple reader threads to read incoming remote RPC method calls.

Server start method: start the Listener thread, responder thread, and an array of Handler threads

public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i &amp;lt; handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

In Listener thread constructor, configure ServerSocketChannel to listen for new incoming connection and
initialize an array of Reader threads with registered separate read Selector

public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i &amp;lt; readThreads; i++) {
        Reader reader = new Reader(
            &amp;quot;Socket Reader #&amp;quot; + (i + 1) + &amp;quot; for port &amp;quot; + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName(&amp;quot;IPC Server listener on &amp;quot; + port);
      this.setDaemon(true);
    }

In the Listener thread run method, if there is any event detected, it will poll the selectedKeys from the Selector and figure out whether it is a connection accepted.

 public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        
        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }

When there is new incoming network connection, the following doAccept method will be called and
one Reader object will be selected (round robin) and new channel with Read Selector will be registered to listen for read event.

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {

        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {
          reader.finishAdd(); 
        }
      }
    }

When a reader receives a read event from the read Selector, the following method is called.

 void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) {
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c.getHostAddress() + ". Number of active connections: "+
                    numConnections);
        closeConnection(c);
        c = null;
      }
      else {
        c.setLastContact(System.currentTimeMillis());
      }
    }   

The use of multiple Reader threads with separate read Selectors is an improvement introduced in version Hadoop 0.21.0. Prior to 0.21.0, the same accept Selector object that is used for listening for connection is also used as read Selector among all other threads. As a result, all the threads are sharing a single Selector object. To learn more, please refer to the ticket

https://issues.apache.org/jira/browse/HADOOP-6713

Whenever a new connection is established, a reader thread will be ready to read data (method call) from client. After receiving the input bytes from client, it reads the bytes data to figure out the method name and required params. Once it has the method name/params, it goes ahead to create Call object encapsulating the method name/params and enqueue it into Call queue.

Multiple Handler threads are waiting on the above Call queue (BlockingQueue callQueue) and poll Call for processing.

The next question is can we do even better ?

HBase RPC basically is a copy of Hadoop RPC. It retains most of the Hadoop RPC codes.

IPC HBaseClient

HBaseClient (org.apache.hadoop.hbase.ipc.HBaseClient)
In the previous post, we look at how the HBaseRPC client proxy intercepts method call via Invoker (an InvocationHandler implementation) and marshalls the method call and arguments via HBaseClient.

Now, let’s dive into HBaseClient source code.

Whenever the client proxy makes a method call, the invoker which implements the InvocationHandler will intercept the call and will delegate it to HBaseClient. HBaseClient will get the corresponding Connection thread from a Hashtable that stores the mapping of ConnectionId and Connection.
The connection will then sends the data to the remote server via a socket DataOutputStream. In the same time, it also stores the Call in a ConcurrentSkipListMap. Connection thread uses producer and consumer pattern and wait on the availability of Call in ConcurrentSkipListMap. Once it receives response from the socket DataInputStream, it will then remove the corresponding Call from the ConcurrentSkipListMap and set the Call’s done variable to true.

In the Invoker’s invoke method, the following HBaseClient method will be called. In this method, it first gets the corresponding Connection thread and sends the Call. It will then wait till the Call is completed. (wait on Call’s done variable)

public Writable call(Writable param, InetSocketAddress addr,
                       Class<? extends VersionedProtocol> protocol,
                       User ticket, int rpcTimeout)
      throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ignored) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        }
        // local exception
        throw wrapException(addr, call.error);
      }
      return call.value;
    }
  }

HBase RPC

HBaseRPC is the class that facilitates the RPC communication between client proxy and remote server.
Based on java dynamic proxy pattern, HBaseRPC uses an Invoker class which implements the InvocationHandler to
intercept client side method call and then marshall the method name and arguments through HBaseClient.

If you look at the proxy client creation method, Proxy.newProxyInstance in HBaseRPC, you will see
the following code where new Invoker instance is passed into method

 VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(addr, ticket, conf, factory, rpcTimeout));

Basically, the Invoker implements the InvocationHandler to intercept proxy side method call as follows

private static class Invoker implements InvocationHandler {

  public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
      HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address, ticket, rpcTimeout);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug(&quot;Call: &quot; + method.getName() + &quot; &quot; + callTime);
      }
      return value.get();
    }

From the above, you will notice the use of client.call method. Here, the client is of class HBaseClient that handles the network layer marshalling of method name and parameter arguments to the
remote listening server. See below.

HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address, ticket, rpcTimeout);

We will take a look at HBaseClient in the next post.

Dive into HTable internals

HTable is the class we can use to interact with a HBase table. Internally, it contains an instance of HConnection. This connection is returned by calling

HConnectionManager.getConnection(Configuration conf);

HConnectionManager is a Singleton non-instantiable class that manages HConnections. If you take a look at this HConnectionManager, there is a LinkedHashMap<Configuration, HConnectionImplementation> called HBASE_INSTANCES. It provides the mapping between Configuration and HConnection. This is to ensure that the same Configuration object would result in the same HConnection being returned by HConnectionManager. Different HTable clients would end up using the same HConnection if they were using the same Configuration. This allows they all to share the same cache of region locations information and avoid the repetitive region locations discovery. Also, the same zookeeper watcher/tracker could be reused.

The implementation of the HConnection is called HConnectionImplementation. When HConnection is instantiated, it setup zookeeper trackers/watchers on HBase master location/address and the .ROOT. region.

HConnectionImplementation provides RPC connection to HMaster, -ROOT- and .META. hosting servers. In Hadoop/HBase, RPC communication is implemented using java dynamic proxy pattern. For example, HMasterInterface acts as the proxy to call remote method on HBase master server.

To understand the underlying mechanism of RPC invocation involved in HTable, take a look at the put(Put put) method.

public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }

It will in turn call processBatchCallback method in HConnectionImplementation.

public &amp;amp;lt;R&amp;amp;gt; void processBatchCallback(
      List&amp;amp;lt;? extends Row&amp;amp;gt; list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback&amp;amp;lt;R&amp;amp;gt; callback)
    throws IOException, InterruptedException

In this method, given a list of Put operations, basically it groups them by the region servers. All those Put operations destined for the same regionserver would be grouped together so that they could be dispatched together to the same region server. It uses the following hashmap to maintain the groupings.

Map&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; actionsByServer = new HashMap&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt;();

It groups all the actions/operations by the HServerAddress. HServerAddress is a “label” for a HBase server made of host and port number.

Iterating through the map, it then fires up the requests to the corresponding region servers by creating Callable to be executed on the executor thread pool

for (Entry&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; e : actionsByServer.entrySet()) {
 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
 }

Let’s look at the createCallable method

private &amp;amp;lt;R&amp;amp;gt; Callable&amp;amp;lt;MultiResponse&amp;amp;gt; createCallable(final HRegionLocation loc,
        final MultiAction&amp;amp;lt;R&amp;amp;gt; multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable&amp;amp;lt;MultiResponse&amp;amp;gt;() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable&amp;amp;lt;MultiResponse&amp;amp;gt;(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

An anonymous Callable class with the implemented call() method is returned by the above method and will be executed in the executor thread pool. In the implemented call method, the RPC client proxy, HRegionInterface server makes the remote RPC method call, server.multi(multi), marshalling the method and params to the remote server. The connect(boolean reload) method is to establish RPC connection to the region server.

server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());

In the getHRegionConnection method, it uses HBaseRPC to create a dynamic proxy to the region server.

server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);

Stay tuned for my next post about HBaseRPC internals. We will dive into the mechanic of RPC call implementation in HBase.