HBase Row Key Design part 1

When one issues a get/scan query against HBase table via HTable or HTablePool, it will first lookup the -ROOT-, .META. tables to locate the region servers that host the data. This is the phase where most of the data irrelevant to our query got skipped. That’s why the row key design is so important when come to using HBase.

For example, one can use the following composite key to partition data based on siteId so that query to scan all users belong to a specific site can be done efficiently by specifying the startRow with prefix siteId.

Composite Key:
siteId|userId

-ROOT- and .META. lookup will locate all the region servers that contain the data for the specified siteId. Then, multiple get/scan RPC calls will be initialized against these region servers to retrieve the required results.

To be continued.

For example, when we call HTable get(List) method, it will call the processBatchCallback method in HConnectionImplementation. In this method, it finds out the region servers that host the data and then establish calls to these servers.


 public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException {
      // This belongs in HTable!!! Not in here.  St.Ack

      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException(
            "argument results must be the same size as argument list");
      }
      if (list.isEmpty()) {
        return;
      }

      // Keep track of the most recent servers for any given item for better
      // exceptional reporting.  We keep HRegionLocation to save on parsing.
      // Later below when we use lastServers, we'll pull what we need from
      // lastServers.
      HRegionLocation [] lastServers = new HRegionLocation[results.length];
      List<Row> workingList = new ArrayList<Row>(list);
      boolean retry = true;
      // count that helps presize actions array
      int actionCount = 0;

      for (int tries = 0; tries < numRetries && retry; ++tries) {

        // sleep first, if this is a retry
        if (tries >= 1) {
          long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
          Thread.sleep(sleepTime);
        }
        // step 1: break up into regionserver-sized chunks and build the data structs
        Map<HRegionLocation, MultiAction<R>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<R>>();
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
            HRegionLocation loc = locateRegion(tableName, row.getRow());
            byte[] regionName = loc.getRegionInfo().getRegionName();

            MultiAction<R> actions = actionsByServer.get(loc);
            if (actions == null) {
              actions = new MultiAction<R>();
              actionsByServer.put(loc, actions);
            }

            Action<R> action = new Action<R>(row, i);
            lastServers[i] = loc;
            actions.add(regionName, action);
          }
        }

        // step 2: make the requests

        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());

        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }

        // step 3: collect the failures and successes and prepare for retry

        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
             : futures.entrySet()) {
          HRegionLocation loc = responsePerServer.getKey();

          try {
            Future<MultiResponse> future = responsePerServer.getValue();
            MultiResponse resp = future.get();

            if (resp == null) {
              // Entire server failed
              LOG.debug("Failed all for server: " + loc.getHostnamePort() +
                ", removing from cache");
              continue;
            }

            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
              byte[] regionName = e.getKey();
              List<Pair<Integer, Object>> regionResults = e.getValue();
              for (Pair<Integer, Object> regionResult : regionResults) {
                if (regionResult == null) {
                  // if the first/only record is 'null' the entire region failed.
                  LOG.debug("Failures for region: " +
                      Bytes.toStringBinary(regionName) +
                      ", removing from cache");
                } else {
                  // Result might be an Exception, including DNRIOE
                  results[regionResult.getFirst()] = regionResult.getSecond();
                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
                    callback.update(e.getKey(),
                        list.get(regionResult.getFirst()).getRow(),
                        (R)regionResult.getSecond());
                  }
                }
              }
            }
          } catch (ExecutionException e) {
            LOG.warn("Failed all from " + loc, e);
          }
        }

        // step 4: identify failures and prep for a retry (if applicable).

        // Find failures (i.e. null Result), and add them to the workingList (in
        // order), so they can be retried.
        retry = false;
        workingList.clear();
        actionCount = 0;
        for (int i = 0; i < results.length; i++) {
          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
          // then retry that row. else dont.
          if (results[i] == null ||
              (results[i] instanceof Throwable &&
                  !(results[i] instanceof DoNotRetryIOException))) {

            retry = true;
            actionCount++;
            Row row = list.get(i);
            workingList.add(row);
            deleteCachedLocation(tableName, row.getRow());
          } else {
            if (results[i] != null && results[i] instanceof Throwable) {
              actionCount++;
            }
            // add null to workingList, so the order remains consistent with the original list argument.
            workingList.add(null);
          }
        }
      }

      List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
      List<Row> actions = new ArrayList<Row>(actionCount);
      List<String> addresses = new ArrayList<String>(actionCount);

      for (int i = 0 ; i < results.length; i++) {
        if (results[i] == null || results[i] instanceof Throwable) {
          exceptions.add((Throwable)results[i]);
          actions.add(list.get(i));
          addresses.add(lastServers[i].getHostnamePort());
        }
      }

      if (!exceptions.isEmpty()) {
        throw new RetriesExhaustedWithDetailsException(exceptions,
            actions,
            addresses);
      }
    }

HBase HTable is not Thread Safe

HTable is the main class that one would use to interface with HBase table. It allows one to put/get/scan/delete operations on a HBase table. However,  it is not thread safe for read nor write.

HTable uses an arrayList writeBuffer to store put actions. As you can see in the doPut mehod, there is no synchronization. If multiple threads make calls to doPut methods, this would result in unexpected value results in your table.

private final ArrayList<Put> writeBuffer = new ArrayList<Put>();

 

private void doPut(Put put) throws IOException{
validatePut(put);
writeBuffer.add(put);
currentWriteBufferSize += put.heapSize();
if (currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
}

Also, HTable has a setting called scannerCaching and is not synchronized in anyway. So the read operation using scan is not thread safe either.


 public int getScannerCaching() {
    return scannerCaching;
  }

 
  public void setScannerCaching(int scannerCaching) {
    this.scannerCaching = scannerCaching;
  }


 public ResultScanner getScanner(final Scan scan) throws IOException {
    if (scan.getCaching() <= 0) {
      scan.setCaching(getScannerCaching());
    }
    return new ClientScanner(getConfiguration(), scan, getTableName(),
        this.connection);
  }

HBase RPC

HBaseRPC is the class that facilitates the RPC communication between client proxy and remote server.
Based on java dynamic proxy pattern, HBaseRPC uses an Invoker class which implements the InvocationHandler to
intercept client side method call and then marshall the method name and arguments through HBaseClient.

If you look at the proxy client creation method, Proxy.newProxyInstance in HBaseRPC, you will see
the following code where new Invoker instance is passed into method

 VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(addr, ticket, conf, factory, rpcTimeout));

Basically, the Invoker implements the InvocationHandler to intercept proxy side method call as follows

private static class Invoker implements InvocationHandler {

  public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
      HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address, ticket, rpcTimeout);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug(&quot;Call: &quot; + method.getName() + &quot; &quot; + callTime);
      }
      return value.get();
    }

From the above, you will notice the use of client.call method. Here, the client is of class HBaseClient that handles the network layer marshalling of method name and parameter arguments to the
remote listening server. See below.

HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address, ticket, rpcTimeout);

We will take a look at HBaseClient in the next post.

Dive into HTable internals

HTable is the class we can use to interact with a HBase table. Internally, it contains an instance of HConnection. This connection is returned by calling

HConnectionManager.getConnection(Configuration conf);

HConnectionManager is a Singleton non-instantiable class that manages HConnections. If you take a look at this HConnectionManager, there is a LinkedHashMap<Configuration, HConnectionImplementation> called HBASE_INSTANCES. It provides the mapping between Configuration and HConnection. This is to ensure that the same Configuration object would result in the same HConnection being returned by HConnectionManager. Different HTable clients would end up using the same HConnection if they were using the same Configuration. This allows they all to share the same cache of region locations information and avoid the repetitive region locations discovery. Also, the same zookeeper watcher/tracker could be reused.

The implementation of the HConnection is called HConnectionImplementation. When HConnection is instantiated, it setup zookeeper trackers/watchers on HBase master location/address and the .ROOT. region.

HConnectionImplementation provides RPC connection to HMaster, -ROOT- and .META. hosting servers. In Hadoop/HBase, RPC communication is implemented using java dynamic proxy pattern. For example, HMasterInterface acts as the proxy to call remote method on HBase master server.

To understand the underlying mechanism of RPC invocation involved in HTable, take a look at the put(Put put) method.

public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }

It will in turn call processBatchCallback method in HConnectionImplementation.

public &amp;amp;lt;R&amp;amp;gt; void processBatchCallback(
      List&amp;amp;lt;? extends Row&amp;amp;gt; list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback&amp;amp;lt;R&amp;amp;gt; callback)
    throws IOException, InterruptedException

In this method, given a list of Put operations, basically it groups them by the region servers. All those Put operations destined for the same regionserver would be grouped together so that they could be dispatched together to the same region server. It uses the following hashmap to maintain the groupings.

Map&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; actionsByServer = new HashMap&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt;();

It groups all the actions/operations by the HServerAddress. HServerAddress is a “label” for a HBase server made of host and port number.

Iterating through the map, it then fires up the requests to the corresponding region servers by creating Callable to be executed on the executor thread pool

for (Entry&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; e : actionsByServer.entrySet()) {
 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
 }

Let’s look at the createCallable method

private &amp;amp;lt;R&amp;amp;gt; Callable&amp;amp;lt;MultiResponse&amp;amp;gt; createCallable(final HRegionLocation loc,
        final MultiAction&amp;amp;lt;R&amp;amp;gt; multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable&amp;amp;lt;MultiResponse&amp;amp;gt;() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable&amp;amp;lt;MultiResponse&amp;amp;gt;(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

An anonymous Callable class with the implemented call() method is returned by the above method and will be executed in the executor thread pool. In the implemented call method, the RPC client proxy, HRegionInterface server makes the remote RPC method call, server.multi(multi), marshalling the method and params to the remote server. The connect(boolean reload) method is to establish RPC connection to the region server.

server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());

In the getHRegionConnection method, it uses HBaseRPC to create a dynamic proxy to the region server.

server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);

Stay tuned for my next post about HBaseRPC internals. We will dive into the mechanic of RPC call implementation in HBase.