RpcEngine

There are different RPC engines available for Hadoop. They are WritableRpcEngine, ProtobufRpcEngine, and AvroRpcEngine. Every RPC engine implements the RpcEngine interface. As you see in the interface, any new RpcEngine has to provide implementations of getProxy, getServer, getProtocolMetaInfoProxy, and call methods.

 
class WritableRpcEngine implements RpcEngine
class AvroRpcEngine implements RpcEngine
class ProtobufRpcEngine implements RpcEngine 
 
public interface RpcEngine {

  /** Construct a client-side proxy object. 
   * @param <T>*/
  <T> ProtocolProxy<T> getProxy(Class<T> protocol,
                  long clientVersion, InetSocketAddress addr,
                  UserGroupInformation ticket, Configuration conf,
                  SocketFactory factory, int rpcTimeout) throws IOException;


  /** Expert: Make multiple, parallel calls to a set of servers. */
  Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
                UserGroupInformation ticket, Configuration conf)
    throws IOException, InterruptedException;

  /** 
   * Construct a server for a protocol implementation instance.
   */
  RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
                       int port, int numHandlers, int numReaders,
                       int queueSizePerHandler, boolean verbose,
                       Configuration conf, 
                       SecretManager<? extends TokenIdentifier> secretManager,
                       String portRangeConfig
                       ) throws IOException;

  /**
   * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection
   * id.
   */
  ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
      ConnectionId connId, Configuration conf, SocketFactory factory)
      throws IOException;
}

The main difference among different RpcEngines is the data exchange wire format they uses. For example,
WritableRpcEngine uses Writable as the data exchange wire format whereas ProtobufRpcEngine uses Protobuf as the wire format.

If you examine the codes of these RpcEngines, you will notice that all of them have a static Server class which extends RPC.Server to inherit basic remote server networking services from the base abstract class. In my previous post, we look at the underlying networking mechanism in the RPC.Server. (eg. start Listener for incoming request and start multiple Readers to read incoming RPC data and queue RPC calls to be processed by multiple Handlers)

The static Server class in ProtobufRpcEngine

public static class Server extends RPC.Server {
    /**
     * Construct an RPC server.
     * 
     * @param protocolClass the class of protocol
     * @param protocolImpl the protocolImpl whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     * @param portRangeConfig A config parameter that can be used to restrict
     * the range of ports used when port is 0 (an ephemeral port)
     */
    public Server(Class<?> protocolClass, Object protocolImpl,
        Configuration conf, String bindAddress, int port, int numHandlers,
        int numReaders, int queueSizePerHandler, boolean verbose,
        SecretManager<? extends TokenIdentifier> secretManager, 
        String portRangeConfig)
        throws IOException {
      super(bindAddress, port, null, numHandlers,
          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
              .getClass().getName()), secretManager, portRangeConfig);
      this.verbose = verbose;  
      registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
          protocolImpl);
    }

The static Server class in WritableRpcEngine

/** An RPC Server. */
  public static class Server extends RPC.Server {
    /**
     * Construct an RPC server.
     * @param instance the instance whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * 
     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
     */
    @Deprecated
    public Server(Object instance, Configuration conf, String bindAddress,
        int port) throws IOException {
      this(null, instance, conf,  bindAddress, port);
    }
    
    
    /** Construct an RPC server.
     * @param protocolClass class
     * @param protocolImpl the instance whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     */
    public Server(Class<?> protocolClass, Object protocolImpl, 
        Configuration conf, String bindAddress, int port) 
      throws IOException {
      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
          false, null, null);
    }
    
    /** 
     * Construct an RPC server.
     * @param protocolImpl the instance whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     * 
     * @deprecated use Server#Server(Class, Object, 
     *      Configuration, String, int, int, int, int, boolean, SecretManager)
     */
    @Deprecated
    public Server(Object protocolImpl, Configuration conf, String bindAddress,
        int port, int numHandlers, int numReaders, int queueSizePerHandler,
        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
            throws IOException {
       this(null, protocolImpl,  conf,  bindAddress,   port,
                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
                   secretManager, null);
   
    }
    
    /** 
     * Construct an RPC server.
     * @param protocolClass - the protocol being registered
     *     can be null for compatibility with old usage (see below for details)
     * @param protocolImpl the protocol impl that will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     */
    public Server(Class<?> protocolClass, Object protocolImpl,
        Configuration conf, String bindAddress,  int port,
        int numHandlers, int numReaders, int queueSizePerHandler, 
        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
        String portRangeConfig) 
        throws IOException {
      super(bindAddress, port, null, numHandlers, numReaders,
          queueSizePerHandler, conf,
          classNameBase(protocolImpl.getClass().getName()), secretManager,
          portRangeConfig);

      this.verbose = verbose;
      
      
      Class<?>[] protocols;
      if (protocolClass == null) { // derive protocol from impl
        /*
         * In order to remain compatible with the old usage where a single
         * target protocolImpl is suppled for all protocol interfaces, and
         * the protocolImpl is derived from the protocolClass(es) 
         * we register all interfaces extended by the protocolImpl
         */
        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());

      } else {
        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
          throw new IOException("protocolClass "+ protocolClass +
              " is not implemented by protocolImpl which is of class " +
              protocolImpl.getClass());
        }
        // register protocol class and its super interfaces
        registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
        protocols = RPC.getProtocolInterfaces(protocolClass);
      }
      for (Class<?> p : protocols) {
        if (!p.equals(VersionedProtocol.class)) {
          registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
        }
      }

    }

    private static void log(String value) {
      if (value!= null && value.length() > 55)
        value = value.substring(0, 55)+"...";
      LOG.info(value);
    }

Also, each implementation has an Invoker class that implements the InvocationHandler interface in order to intercept the RPC method call requested by Proxy client and translate and marshall them to bytes over the network to the remote RPC server.

We will dig deeper into each of the RpcEngine implementations in upcoming posts.

But, I still feel like the abstract layer of pluggable wire format could be improved further.

Hadoop RPC

There are two RPC servers (org.apache.hadoop.ipc.Server) inside NameNode. One is mainly for communication with clients while the other is for communication between Datanodes, BackupNode and NameNode.

RPC server has a Listener thread listening for incoming connection. Listener thread creates a server socket in non blocking mode and uses Selector to listen for connection accept event. At the same time, it instantiates an array of Reader threads. Each reader thread is given a read Selector. This allows multiple reader threads to read incoming remote RPC method calls.

Server start method: start the Listener thread, responder thread, and an array of Handler threads

public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i &amp;lt; handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

In Listener thread constructor, configure ServerSocketChannel to listen for new incoming connection and
initialize an array of Reader threads with registered separate read Selector

public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i &amp;lt; readThreads; i++) {
        Reader reader = new Reader(
            &amp;quot;Socket Reader #&amp;quot; + (i + 1) + &amp;quot; for port &amp;quot; + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName(&amp;quot;IPC Server listener on &amp;quot; + port);
      this.setDaemon(true);
    }

In the Listener thread run method, if there is any event detected, it will poll the selectedKeys from the Selector and figure out whether it is a connection accepted.

 public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        
        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }

When there is new incoming network connection, the following doAccept method will be called and
one Reader object will be selected (round robin) and new channel with Read Selector will be registered to listen for read event.

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {

        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {
          reader.finishAdd(); 
        }
      }
    }

When a reader receives a read event from the read Selector, the following method is called.

 void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) {
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c.getHostAddress() + ". Number of active connections: "+
                    numConnections);
        closeConnection(c);
        c = null;
      }
      else {
        c.setLastContact(System.currentTimeMillis());
      }
    }   

The use of multiple Reader threads with separate read Selectors is an improvement introduced in version Hadoop 0.21.0. Prior to 0.21.0, the same accept Selector object that is used for listening for connection is also used as read Selector among all other threads. As a result, all the threads are sharing a single Selector object. To learn more, please refer to the ticket

https://issues.apache.org/jira/browse/HADOOP-6713

Whenever a new connection is established, a reader thread will be ready to read data (method call) from client. After receiving the input bytes from client, it reads the bytes data to figure out the method name and required params. Once it has the method name/params, it goes ahead to create Call object encapsulating the method name/params and enqueue it into Call queue.

Multiple Handler threads are waiting on the above Call queue (BlockingQueue callQueue) and poll Call for processing.

The next question is can we do even better ?

HBase RPC basically is a copy of Hadoop RPC. It retains most of the Hadoop RPC codes.

IPC HBaseClient

HBaseClient (org.apache.hadoop.hbase.ipc.HBaseClient)
In the previous post, we look at how the HBaseRPC client proxy intercepts method call via Invoker (an InvocationHandler implementation) and marshalls the method call and arguments via HBaseClient.

Now, let’s dive into HBaseClient source code.

Whenever the client proxy makes a method call, the invoker which implements the InvocationHandler will intercept the call and will delegate it to HBaseClient. HBaseClient will get the corresponding Connection thread from a Hashtable that stores the mapping of ConnectionId and Connection.
The connection will then sends the data to the remote server via a socket DataOutputStream. In the same time, it also stores the Call in a ConcurrentSkipListMap. Connection thread uses producer and consumer pattern and wait on the availability of Call in ConcurrentSkipListMap. Once it receives response from the socket DataInputStream, it will then remove the corresponding Call from the ConcurrentSkipListMap and set the Call’s done variable to true.

In the Invoker’s invoke method, the following HBaseClient method will be called. In this method, it first gets the corresponding Connection thread and sends the Call. It will then wait till the Call is completed. (wait on Call’s done variable)

public Writable call(Writable param, InetSocketAddress addr,
                       Class<? extends VersionedProtocol> protocol,
                       User ticket, int rpcTimeout)
      throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ignored) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        }
        // local exception
        throw wrapException(addr, call.error);
      }
      return call.value;
    }
  }

HBase RPC

HBaseRPC is the class that facilitates the RPC communication between client proxy and remote server.
Based on java dynamic proxy pattern, HBaseRPC uses an Invoker class which implements the InvocationHandler to
intercept client side method call and then marshall the method name and arguments through HBaseClient.

If you look at the proxy client creation method, Proxy.newProxyInstance in HBaseRPC, you will see
the following code where new Invoker instance is passed into method

 VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(addr, ticket, conf, factory, rpcTimeout));

Basically, the Invoker implements the InvocationHandler to intercept proxy side method call as follows

private static class Invoker implements InvocationHandler {

  public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
      HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address, ticket, rpcTimeout);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug(&quot;Call: &quot; + method.getName() + &quot; &quot; + callTime);
      }
      return value.get();
    }

From the above, you will notice the use of client.call method. Here, the client is of class HBaseClient that handles the network layer marshalling of method name and parameter arguments to the
remote listening server. See below.

HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address, ticket, rpcTimeout);

We will take a look at HBaseClient in the next post.

Dive into HTable internals

HTable is the class we can use to interact with a HBase table. Internally, it contains an instance of HConnection. This connection is returned by calling

HConnectionManager.getConnection(Configuration conf);

HConnectionManager is a Singleton non-instantiable class that manages HConnections. If you take a look at this HConnectionManager, there is a LinkedHashMap<Configuration, HConnectionImplementation> called HBASE_INSTANCES. It provides the mapping between Configuration and HConnection. This is to ensure that the same Configuration object would result in the same HConnection being returned by HConnectionManager. Different HTable clients would end up using the same HConnection if they were using the same Configuration. This allows they all to share the same cache of region locations information and avoid the repetitive region locations discovery. Also, the same zookeeper watcher/tracker could be reused.

The implementation of the HConnection is called HConnectionImplementation. When HConnection is instantiated, it setup zookeeper trackers/watchers on HBase master location/address and the .ROOT. region.

HConnectionImplementation provides RPC connection to HMaster, -ROOT- and .META. hosting servers. In Hadoop/HBase, RPC communication is implemented using java dynamic proxy pattern. For example, HMasterInterface acts as the proxy to call remote method on HBase master server.

To understand the underlying mechanism of RPC invocation involved in HTable, take a look at the put(Put put) method.

public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }

It will in turn call processBatchCallback method in HConnectionImplementation.

public &amp;amp;lt;R&amp;amp;gt; void processBatchCallback(
      List&amp;amp;lt;? extends Row&amp;amp;gt; list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback&amp;amp;lt;R&amp;amp;gt; callback)
    throws IOException, InterruptedException

In this method, given a list of Put operations, basically it groups them by the region servers. All those Put operations destined for the same regionserver would be grouped together so that they could be dispatched together to the same region server. It uses the following hashmap to maintain the groupings.

Map&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; actionsByServer = new HashMap&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt;();

It groups all the actions/operations by the HServerAddress. HServerAddress is a “label” for a HBase server made of host and port number.

Iterating through the map, it then fires up the requests to the corresponding region servers by creating Callable to be executed on the executor thread pool

for (Entry&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; e : actionsByServer.entrySet()) {
 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
 }

Let’s look at the createCallable method

private &amp;amp;lt;R&amp;amp;gt; Callable&amp;amp;lt;MultiResponse&amp;amp;gt; createCallable(final HRegionLocation loc,
        final MultiAction&amp;amp;lt;R&amp;amp;gt; multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable&amp;amp;lt;MultiResponse&amp;amp;gt;() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable&amp;amp;lt;MultiResponse&amp;amp;gt;(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

An anonymous Callable class with the implemented call() method is returned by the above method and will be executed in the executor thread pool. In the implemented call method, the RPC client proxy, HRegionInterface server makes the remote RPC method call, server.multi(multi), marshalling the method and params to the remote server. The connect(boolean reload) method is to establish RPC connection to the region server.

server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());

In the getHRegionConnection method, it uses HBaseRPC to create a dynamic proxy to the region server.

server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);

Stay tuned for my next post about HBaseRPC internals. We will dive into the mechanic of RPC call implementation in HBase.

TaskTracker

TaskTracker is essentially a Thread running on a node. Internally it launches two inner TaskLauncher threads called mapLauncher and reduceLauncher. TaskLauncher thread uses a List to keep track of assigned tasks and starts the task in another TaskRunner thread. TaskRunner thread will then launch a JVM process for the task.

If we dig inside the codes deeper, you will find that TaskRunner contains  a JVMmanager instance. Inside JVMmanager, it contains two different JvmManagerForType instances, one for map tasks (mapJvmManager) and another for reduce tasks (reduceJvmManager).  Each is responsible of spawning JvmRunner thread which will actually launch a JVM process.

Hadoop GenericOptionsParser

GenericOptionsParser is used in ToolRunner to parse out the generic options, -conf, -D, -jt, -files, -libjars, -archives and set these generic options in the job configuration accordingly. Any leftover remaining command options will then be passed to our mapreduce job by calling the run(toolArgs).

String[] toolArgs = parser.getRemainingArgs();  //get any remaining command options other than the generic options

return tool.run(toolArgs);

Since our mapreduce job implements the run(String[] args) method, we will get passed all the remaining command options and can use them for configuring our job.

Hadoop ToolRunner and Tool

When your create a MapReduce Job, you can implement the Tool interface’s method

int run(String[] args) throws Exception;

This is the only method in Tool interface. This would allow the ToolRunner to invoke our implemented run(String[]) method of our MapReduce job. In the main method, you call ToolRunner.run(new Configuration(), new MapReduceJob(), args) as follows:

public static void main(String[] args) {

int status = ToolRunner.run(new Configuration(), new MapReduceJob(), args);

}

If you take a look at the ToolRunner class, you will find the method, since our MapReduceJob is an implementation of Tool interface, ToolRunner will in turn invokes our implementation of run(String[] args)

public static int run(Configuration conf, Tool tool, String[] args)

throws Exception{

if(conf == null) {

conf = new Configuration();

}

GenericOptionsParser parser = new GenericOptionsParser(conf, args);

//set the configuration back, so that Tool can configure itself

tool.setConf(conf);

//get the args w/o generic hadoop args

String[] toolArgs = parser.getRemainingArgs();

return tool.run(toolArgs);

}