HBaseClient (org.apache.hadoop.hbase.ipc.HBaseClient)
In the previous post, we look at how the HBaseRPC client proxy intercepts method call via Invoker (an InvocationHandler implementation) and marshalls the method call and arguments via HBaseClient.
Now, let’s dive into HBaseClient source code.
Whenever the client proxy makes a method call, the invoker which implements the InvocationHandler will intercept the call and will delegate it to HBaseClient. HBaseClient will get the corresponding Connection thread from a Hashtable that stores the mapping of ConnectionId and Connection.
The connection will then sends the data to the remote server via a socket DataOutputStream. In the same time, it also stores the Call in a ConcurrentSkipListMap. Connection thread uses producer and consumer pattern and wait on the availability of Call in ConcurrentSkipListMap. Once it receives response from the socket DataInputStream, it will then remove the corresponding Call from the ConcurrentSkipListMap and set the Call’s done variable to true.
In the Invoker’s invoke method, the following HBaseClient method will be called. In this method, it first gets the corresponding Connection thread and sends the Call. It will then wait till the Call is completed. (wait on Call’s done variable)
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } // local exception throw wrapException(addr, call.error); } return call.value; } }