IPC HBaseClient

HBaseClient (org.apache.hadoop.hbase.ipc.HBaseClient)
In the previous post, we look at how the HBaseRPC client proxy intercepts method call via Invoker (an InvocationHandler implementation) and marshalls the method call and arguments via HBaseClient.

Now, let’s dive into HBaseClient source code.

Whenever the client proxy makes a method call, the invoker which implements the InvocationHandler will intercept the call and will delegate it to HBaseClient. HBaseClient will get the corresponding Connection thread from a Hashtable that stores the mapping of ConnectionId and Connection.
The connection will then sends the data to the remote server via a socket DataOutputStream. In the same time, it also stores the Call in a ConcurrentSkipListMap. Connection thread uses producer and consumer pattern and wait on the availability of Call in ConcurrentSkipListMap. Once it receives response from the socket DataInputStream, it will then remove the corresponding Call from the ConcurrentSkipListMap and set the Call’s done variable to true.

In the Invoker’s invoke method, the following HBaseClient method will be called. In this method, it first gets the corresponding Connection thread and sends the Call. It will then wait till the Call is completed. (wait on Call’s done variable)

public Writable call(Writable param, InetSocketAddress addr,
                       Class<? extends VersionedProtocol> protocol,
                       User ticket, int rpcTimeout)
      throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ignored) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        }
        // local exception
        throw wrapException(addr, call.error);
      }
      return call.value;
    }
  }

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s