Log Structured Merge Tree (LSM Tree) in HBase

If you have ever worked with HBase, Cassandra, Google Big Table, LevelDB, you probably have heard about Log Structured Merge tree. LSM differentiates these no-sql variants from the majority of RDBMS which use B+ tree.

In HBase, the LSM tree data structure concept is materialized by the use of HLog, Memstores, and storefiles. The main idea of LSM is that data is first kept in memory cache, eg. Memstores in HBase.  Each region server has multiple regions (HRegion). Each HRegion contains a section of data for a table. It has as many memstores as the number of column families for the table. HRegion keeps track of the total memstore size contributed by all these memstores. As you see in the following method in HRegion, after applying any mutation operations, it will check if the total memstore size exceeded the configured max flush size. If so, it will call the requestFlush() method. In the requestFlush() method, it basically delegates the call to the regionserver’s MemstoreFlusher

  /*

   * @param size

   * @return True if size is over the flush threshold

   */

  private boolean isFlushSize(final long size) {

    return size > this.memstoreFlushSize;

  }

 /**
   * Perform a batch of mutations.
   * It supports only Put and Delete mutations and will ignore other types passed.
   * @param batchOp contains the list of mutations
   * @return an array of OperationStatus which internally contains the
   *         OperationStatusCode and the exceptionMessage if any.
   * @throws IOException
   */
  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {
          checkReadOnly();
        }
        checkResources();

        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            doPreMutationHook(batchOp);
          }
          initialized = true;
        }
        long addedSize = doMiniBatchMutation(batchOp);
        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
        if (isFlushSize(newSize)) {
          requestFlush();
        }
      }
    } finally {
      closeRegionOperation(op);
    }
    return batchOp.retCodeDetails;
  }


  private void requestFlush() {

    if (this.rsServices == null) {

      return;

    }

    synchronized (writestate) {

      if (this.writestate.isFlushRequested()) {

        return;

      }

      writestate.flushRequested = true;

    }

    // Make request outside of synchronize block; HBASE-818.

    this.rsServices.getFlushRequester().requestFlush(this, false);

    if (LOG.isDebugEnabled()) {

      LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());

    }

  }


 

HRegion loads up the configured max Memstore flush size.

This in memory cache has pre-configured max size. See the HRegion’s method below.

 void setHTableSpecificConf() {
    if (this.htableDescriptor == null) return;
    long flushSize = this.htableDescriptor.getMemStoreFlushSize();

    if (flushSize <= 0) {
      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
    }
    this.memstoreFlushSize = flushSize;
    this.blockingMemStoreSize = this.memstoreFlushSize *
        conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
                HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
  }

As soon as it exceeds the max size, the data will be flushed to disk, as storefiles in HBase.

In addition, there is a configurable max limit of number of storefiles permitted in HBase. When the number of storefiles exceeds the allowable max limit, compaction will be triggered to merge and compact these storefiles into a bigger one. The main purpose of this compaction is to speed up the read path to reduce number of store files to be read.

Please check out this great paper of LSM tree

http://db.cs.berkeley.edu/cs286/papers/lsm-acta1996.pdf

HBase Pluggable Store, Memstore, StoreEngine

I was browsing the HBase codebase and noticed that many components in regionserver module (org.apache.hadoop.hbase.regionserver) had been refactored to become pluggable. This is a cleaner design because it means different implementations of these components can be plugged into HBase easily. Among the new interfaces are

Store, Memstore

The corresponding Implementation classes are

HStore implements Store

DefaultMemoryStore implements Memstore

Furthermore, StoreEngine is now an abstract class and acts as a factory to create StoreFileManager, CompactionPolicy and Compactor.
DefaultStoreEngine extends StoreEngine to create the default compactor, policy, and store file manager. This makes storefile management and compaction pluggable.

You can refer to the corresponding Jira and design doc

https://issues.apache.org/jira/browse/HBASE-7678

https://issues.apache.org/jira/secure/attachment/12568488/Pluggable%20compactions%20doc.pdf

HBase Multiversion Concurrency Control

For those who are interested in Multiversion Concurrency Control MVCC in HBase, check out the below post, a very good introduction post.

http://blogs.apache.org/hbase/entry/apache_hbase_internals_locking_and

Also, there is a good paper on the Theory and Algorithms of Multiversion Concurrency Control.
http://www.cse.ust.hk/~yjrobin/reading_list/[Transaction]Multiversion%20Concurrency%20Control-Theory%20and%20Algorithms.pdf

We can find the implementation of Multiversion Concurrency Control class in HBase code base, org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.
It is used by org.apache.hadoop.hbase.regionserver.HRegion.


/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.regionserver;

import java.util.LinkedList;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
 * Manages the read/write consistency within memstore. This provides
 * an interface for readers to determine what entries to ignore, and
 * a mechanism for writers to obtain new write numbers, then "commit"
 * the new writes for readers to read (thus forming atomic transactions).
 */
@InterfaceAudience.Private
public class MultiVersionConsistencyControl {
  private volatile long memstoreRead = 0;
  private volatile long memstoreWrite = 0;

  private final Object readWaiters = new Object();

  // This is the pending queue of writes.
  private final LinkedList<WriteEntry> writeQueue =
      new LinkedList<WriteEntry>();

  private static final ThreadLocal<Long> perThreadReadPoint =
      new ThreadLocal<Long>() {
       @Override
      protected
       Long initialValue() {
         return Long.MAX_VALUE;
       }
  };

  /**
   * Default constructor. Initializes the memstoreRead/Write points to 0.
   */
  public MultiVersionConsistencyControl() {
    this.memstoreRead = this.memstoreWrite = 0;
  }

  /**
   * Initializes the memstoreRead/Write points appropriately.
   * @param startPoint
   */
  public void initialize(long startPoint) {
    synchronized (writeQueue) {
      if (this.memstoreWrite != this.memstoreRead) {
        throw new RuntimeException("Already used this mvcc. Too late to initialize");
      }

      this.memstoreRead = this.memstoreWrite = startPoint;
    }
  }

  /**
   * Get this thread's read point. Used primarily by the memstore scanner to
   * know which values to skip (ie: have not been completed/committed to
   * memstore).
   */
  public static long getThreadReadPoint() {
      return perThreadReadPoint.get();
  }

  /**
   * Set the thread read point to the given value. The thread MVCC
   * is used by the Memstore scanner so it knows which values to skip.
   * Give it a value of 0 if you want everything.
   */
  public static void setThreadReadPoint(long readPoint) {
    perThreadReadPoint.set(readPoint);
  }

  /**
   * Set the thread MVCC read point to whatever the current read point is in
   * this particular instance of MVCC.  Returns the new thread read point value.
   */
  public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
    perThreadReadPoint.set(mvcc.memstoreReadPoint());
    return getThreadReadPoint();
  }

  /**
   * Set the thread MVCC read point to 0 (include everything).
   */
  public static void resetThreadReadPoint() {
    perThreadReadPoint.set(0L);
  }

  /**
   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
   */
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
  }

  /**
   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   *
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
   */
  public void completeMemstoreInsert(WriteEntry e) {
    advanceMemstore(e);
    waitForRead(e);
  }

  /**
   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   *
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   *
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
   */
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {
      e.markCompleted();

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("never was a first");
      }

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
          readWaiters.notifyAll();
        }
      }
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      }
      return false;
    }
  }

  /**
   * Wait for the global readPoint to advance upto
   * the specified transaction number.
   */
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {
        try {
          readWaiters.wait(0);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
  }

  public long memstoreReadPoint() {
    return memstoreRead;
  }


  public static class WriteEntry {
    private long writeNumber;
    private boolean completed = false;
    WriteEntry(long writeNumber) {
      this.writeNumber = writeNumber;
    }
    void markCompleted() {
      this.completed = true;
    }
    boolean isCompleted() {
      return this.completed;
    }
    long getWriteNumber() {
      return this.writeNumber;
    }
  }

  public static final long FIXED_SIZE = ClassSize.align(
      ClassSize.OBJECT +
      2 * Bytes.SIZEOF_LONG +
      2 * ClassSize.REFERENCE);

}


HBase 0.96.0 release: Protobufs as wire format

HBase 0.96.0 was released recently on October 18th, 2013. Some notable changes are 1) Adopted protocol buffer as the wire format 2) Reduced mean time to recovery 3) Removed ROOT table Let’s take a look at all these new changes in depth to understand the new design and explore the differences/changes introduced in the codebase. In this first blog, we will take a look at protocol buffer adoption in HBase and examine the differences it introduced in the code base The main impact of the change is all the classes which previously implemented Writable interface to be marshaled over the RPC wire do not need to implement the interface anymore. In another word, both manual marshaling/demarshaling write(final DataOutput out) and readFields(final DataInput in) are not required anymore. Instead, it relies on protobufs to send the Get request over the RPC wire. Take a look at the following https://issues.apache.org/jira/browse/HBASE-5305 Get class after 0.96.0, does not need to implement Writable interface

public class Get extends OperationWithAttributes
  implements Row, Comparable

Old Get class definition implements Writable interface with readFields(final DataInput in) and write(final DataOutput out) implemented

public class Get extends OperationWithAttributes
  implements Writable, Row, Comparable {

 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > GET_VERSION) {
      throw new IOException("unsupported version");
    }
    this.row = Bytes.readByteArray(in);
    this.lockId = in.readLong();
    this.maxVersions = in.readInt();
    boolean hasFilter = in.readBoolean();
    if (hasFilter) {
      this.filter = Classes.createWritableForName(
        Bytes.toString(Bytes.readByteArray(in)));
      this.filter.readFields(in);
    }
    this.cacheBlocks = in.readBoolean();
    this.tr = new TimeRange();
    tr.readFields(in);
    int numFamilies = in.readInt();
    this.familyMap =
      new TreeMap<byte [],NavigableSet>(Bytes.BYTES_COMPARATOR);
    for(int i=0; i<numFamilies; i++) {
      byte [] family = Bytes.readByteArray(in);
      boolean hasColumns = in.readBoolean();
      NavigableSet set = null;
      if(hasColumns) {
        int numColumns = in.readInt();
        set = new TreeSet(Bytes.BYTES_COMPARATOR);
        for(int j=0; j<numColumns; j++) {
          byte [] qualifier = Bytes.readByteArray(in);
          set.add(qualifier);
        }
      }
      this.familyMap.put(family, set);
    }
    readAttributes(in);
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(GET_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.lockId);
    out.writeInt(this.maxVersions);
    if(this.filter == null) {
      out.writeBoolean(false);
    } else {
      out.writeBoolean(true);
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
      filter.write(out);
    }
    out.writeBoolean(this.cacheBlocks);
    tr.write(out);
    out.writeInt(familyMap.size());
    for(Map.Entry<byte [], NavigableSet> entry :
      familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet columnSet = entry.getValue();
      if(columnSet == null) {
        out.writeBoolean(false);
      } else {
        out.writeBoolean(true);
        out.writeInt(columnSet.size());
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);
        }
      }
    }
    writeAttributes(out);
  }
}

New Put class definition after 0.96.0

public class Put extends Mutation implements HeapSize, Comparable

Old Put class definition

public class Put extends Mutation
  implements HeapSize, Writable, Comparable {
 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > PUT_VERSION) {
      throw new IOException("version not supported");
    }
    this.row = Bytes.readByteArray(in);
    this.ts = in.readLong();
    this.lockId = in.readLong();
    this.writeToWAL = in.readBoolean();
    int numFamilies = in.readInt();
    if (!this.familyMap.isEmpty()) this.familyMap.clear();
    for(int i=0;i<numFamilies;i++) {
      byte [] family = Bytes.readByteArray(in);
      int numKeys = in.readInt();
      List keys = new ArrayList(numKeys);
      int totalLen = in.readInt();
      byte [] buf = new byte[totalLen];
      int offset = 0;
      for (int j = 0; j  1) {
      readAttributes(in);
    }
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(PUT_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.ts);
    out.writeLong(this.lockId);
    out.writeBoolean(this.writeToWAL);
    out.writeInt(familyMap.size());
    for (Map.Entry<byte [], List> entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      List keys = entry.getValue();
      out.writeInt(keys.size());
      int totalLen = 0;
      for(KeyValue kv : keys) {
        totalLen += kv.getLength();
      }
      out.writeInt(totalLen);
      for(KeyValue kv : keys) {
        out.writeInt(kv.getLength());
        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
      }
    }
    writeAttributes(out);
  }

New Scan class definition after 0.96.0

public class Scan extends OperationWithAttributes 

Old Scan class definition

public class Scan extends OperationWithAttributes implements Writable {

You can find protobuf definition files in the src/main/protobuf folder of the hbase-protocol project In HTable, we can see how the protobuf is being used in action in the get(final Get get) method, RegionServerCallable is built which uses ProtobufUtil to make the actual protobuf GetRequest to the server HTable

@Override
  public Result get(final Get get) throws IOException {
    RegionServerCallable callable = new RegionServerCallable(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
  }

ProtobufUtil is in package org.apache.hadoop.hbase.protobuf is the protobuf util class to help build protobuf calls using the auto generated protobufs classes.

package org.apache.hadoop.hbase.protobuf;

 /**
   * A helper to invoke a Get using client protocol.
   *
   * @param client
   * @param regionName
   * @param get
   * @return the result of the Get
   * @throws IOException
   */
  public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
  }

RequestConverter

 /**
   * Create a protocol buffer GetRequest for a client Get
   *
   * @param regionName the name of the region to get
   * @param get the client Get
   * @return a protocol buffer GetRequest
   */
  public static GetRequest buildGetRequest(final byte[] regionName,
      final Get get) throws IOException {
    GetRequest.Builder builder = GetRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    builder.setRegion(region);
    builder.setGet(ProtobufUtil.toGet(get));
    return builder.build();
  }

A look inside HRegion’s Stores initialization

In HBase, a table is split into multiple regions and each region server hosts a certain set of regions

In each region (org.apache.hadoop.hbase.regionserver.HRegion), it maintains one or more Stores where each store is responsible for one column family. If you take a look at the org.apache.hadoop.hbase.regionserver.Store class, you will notice its member variables, MemStore and list of StoreFiles.

During the initialization of HRegion, it has to load these multiple Stores. During the process, each Store will also have to initialize and load up its Memstore and the list of StoreFiles. HRegion performs these initializations in multiple background threads instead of loading them sequentially. Basically, HRegion constructs a Callable to be executed via the thread pool to speed up the process.  After the callable submission, it makes use of Future and CompletionService to query about the loading status.

 if (this.htableDescriptor != null &&
        !htableDescriptor.getFamilies().isEmpty()) {
      // initialize the thread pool for opening stores in parallel.
      ThreadPoolExecutor storeOpenerThreadPool =
        getStoreOpenAndCloseThreadPool(
          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
      CompletionService<Store> completionService =
        new ExecutorCompletionService<Store>(storeOpenerThreadPool);

      // initialize each store in parallel
      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
        status.setStatus("Instantiating store for column family " + family);
        completionService.submit(new Callable<Store>() {
          public Store call() throws IOException {
            return instantiateHStore(tableDir, family);
          }
        });
      }
      try {
        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
          Future<Store> future = completionService.take();
          Store store = future.get();

          this.stores.put(store.getColumnFamilyName().getBytes(), store);
          long storeSeqId = store.getMaxSequenceId();
          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
              storeSeqId);
          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
            maxSeqId = storeSeqId;
          }
          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
          if (maxStoreMemstoreTS > maxMemstoreTS) {
            maxMemstoreTS = maxStoreMemstoreTS;
          }
        }
      } catch (InterruptedException e) {
        throw new IOException(e);
      } catch (ExecutionException e) {
        throw new IOException(e.getCause());
      } finally {
        storeOpenerThreadPool.shutdownNow();
      }
    }

There is another interesting class, org.apache.hadoop.hbase.regionserver.MemStoreLAB. The main purpose of this class is to prevent heap fragmentation and long GC pauses. Take a look at the following excellent blog about the design of MSLAB (MemStore-Local Allocation Buffers) and how it solve the fragmentation issue.

http://blog.cloudera.com/blog/2011/03/avoiding-full-gcs-in-hbase-with-memstore-local-allocation-buffers-part-3/

Bucket in OpenTSDB

In OpenTSDB, it introduces the notion of time bucket, basically a way of grouping all data points fall within the specific time bucket. OpenTSDB uses the hourly time bucket for partitioning of data with the cell value is either of type integer or float. One could easily extend the same design to other domains storing more complex data types, for example, complex Avro type.

Even better, different time buckets could be designed such as daily, weekly, and monthly buckets. The sky is the limit here.

In openTSDB, the timestamp is broken into two parts, encoded both in the row key and qualifier.

The first one part is the hourly basetime encoded in the row key and the other is the delta seconds from the hourly bucket encoded in the qualifier.

The following code snippet from OpenTSDB shows how the base_time (hour bucket) is computed.
final long base_time = timestamp – (timestamp % Const.MAX_TIMESPAN);

private long updateBaseTime(final long timestamp) {

// We force the starting timestamp to be on a MAX_TIMESPAN boundary

// so that all TSDs create rows with the same base time.  Otherwise

// we'd need to coordinate TSDs to avoid creating rows that cover

// overlapping time periods.

final long base_time = timestamp - (timestamp % Const.MAX_TIMESPAN);

// Clone the row key since we're going to change it.  We must clone it

// because the HBase client may still hold a reference to it in its

// internal datastructures.

row = Arrays.copyOf(row, row.length);

Bytes.setInt(row, (int) base_time, tsdb.metrics.width());

tsdb.scheduleForCompaction(row, (int) base_time);

return base_time;

}

HBase Custom Filter

There are many different scan filters one could use in HBase. But, sometimes you simply need to build your own custom filter since none would satisfy your needs. With HBase 0.94.7, one could build his/her custom filter to be dynamically loaded by HBase without restarting the cluster.

It allows us to put our custom filter packaged as a jar to be placed in the following specified folder

${hbase.local.dir}/jars/

If you want to understand the mechanic of how HBase accomplishes this, you could look at the following classes in org.apache.hadoop.hbase.util package.

Basically, custom filters can be dropped into a pre-configured folder (hbase.dynamic.jars.dir), which can be in hdfs. In this way, region servers can pick them up dynamically, without the need to restart the cluster for the new filters to take effect.

Classes.java, ClassLoaderBase.java, DynamicClassLoader.java

Look at the createWritableForName(String className) in the Classes.java.  A very common way of instantiating class via reflection.


@SuppressWarnings("unchecked")
public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
}
}

/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.io.WritableFactories;

/**
* Utilities for class manipulation.
*/
public class Classes {

/**
* Dynamic class loader to load filter/comparators
*/
private final static ClassLoader CLASS_LOADER;

static {
ClassLoader parent = Classes.class.getClassLoader();
Configuration conf = HBaseConfiguration.create();
CLASS_LOADER = new DynamicClassLoader(conf, parent);
}

/**
* Equivalent of {@link Class#forName(String)} which also returns classes for
* primitives like <code>boolean</code>, etc.
*
* @param className
*          The name of the class to retrieve. Can be either a normal class or
*          a primitive class.
* @return The class specified by <code>className</code>
* @throws ClassNotFoundException
*           If the requested class can not be found.
*/
public static Class<?> extendedForName(String className)
throws ClassNotFoundException {
Class<?> valueType;
if (className.equals("boolean")) {
valueType = boolean.class;
} else if (className.equals("byte")) {
valueType = byte.class;
} else if (className.equals("short")) {
valueType = short.class;
} else if (className.equals("int")) {
valueType = int.class;
} else if (className.equals("long")) {
valueType = long.class;
} else if (className.equals("float")) {
valueType = float.class;
} else if (className.equals("double")) {
valueType = double.class;
} else if (className.equals("char")) {
valueType = char.class;
} else {
valueType = Class.forName(className);
}
return valueType;
}

@SuppressWarnings("rawtypes")
public static String stringify(Class[] classes) {
StringBuilder buf = new StringBuilder();
if (classes != null) {
for (Class c : classes) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(c.getName());
}
} else {
buf.append("NULL");
}
return buf.toString();
}

/**
* Used to dynamically load a filter class, and create a Writable filter.
* This filter class most likely extends Configurable.
*
* @param className the filter class name.
* @return a filter
*/
@SuppressWarnings("unchecked")
public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
}
}

/**
* This method is almost the same as #createWritableForName, except
* that this one doesn't expect the filter class to extends Configurable.
*
* @param className the filter class name.
* @return a filter
*/
@SuppressWarnings("unchecked")
public static Filter createForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>)Class.forName(className, true, CLASS_LOADER);
return (Filter)clazz.newInstance();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
} catch (InstantiationException e) {
throw new RuntimeException("Couldn't instantiate " + className, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("No access to " + className, e);
}
}
}

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.classification.InterfaceAudience;

import com.google.common.base.Preconditions;

/**
* Base class loader that defines couple shared constants used
* by sub-classes. It also defined method getClassLoadingLock for parallel
* class loading and JDK 1.6 support. This method (getClassLoadingLock)
* is similar to the same method in the base class Java ClassLoader
* introduced in JDK 1.7, but not in JDK 1.6.
*/
@InterfaceAudience.Private
public class ClassLoaderBase extends URLClassLoader {

// Maps class name to the corresponding lock object
private final ConcurrentHashMap<String, Object> parallelLockMap
= new ConcurrentHashMap<String, Object>();

protected static final String DEFAULT_LOCAL_DIR = "/tmp/hbase-local-dir";
protected static final String LOCAL_DIR_KEY = "hbase.local.dir";

/**
* Parent class loader.
*/
protected final ClassLoader parent;

/**
* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
*
* @param parent the parent ClassLoader to set.
*/
public ClassLoaderBase(final ClassLoader parent) {
super(new URL[]{}, parent);
Preconditions.checkNotNull(parent, "No parent classloader!");
this.parent = parent;
}

/**
* Returns the lock object for class loading operations.
*/
protected Object getClassLoadingLock(String className) {
Object lock = parallelLockMap.get(className);
if (lock != null) {
return lock;
}

Object newLock = new Object();
lock = parallelLockMap.putIfAbsent(className, newLock);
if (lock == null) {
lock = newLock;
}
return lock;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* This is a class loader that can load classes dynamically from new
* jar files under a configured folder. The paths to the jar files are
* converted to URLs, and URLClassLoader logic is actually used to load
* classes. This class loader always uses its parent class loader
* to load a class at first. Only if its parent class loader
* can not load a class, we will try to load it using the logic here.
*
* The configured folder can be a HDFS path. In this case, the jar files
* under that folder will be copied to local at first under ${hbase.local.dir}/jars/.
* The local copy will be updated if the remote copy is updated, according to its
* last modified timestamp.
*
* We can't unload a class already loaded. So we will use the existing
* jar files we already know to load any class which can't be loaded
* using the parent class loader. If we still can't load the class from
* the existing jar files, we will check if any new jar file is added,
* if so, we will load the new jar file and try to load the class again.
* If still failed, a class not found exception will be thrown.
*
* Be careful in uploading new jar files and make sure all classes
* are consistent, otherwise, we may not be able to load your
* classes properly.
*/
@InterfaceAudience.Private
public class DynamicClassLoader extends ClassLoaderBase {
private static final Log LOG =
LogFactory.getLog(DynamicClassLoader.class);

// Dynamic jars are put under ${hbase.local.dir}/jars/
private static final String DYNAMIC_JARS_DIR = File.separator
+ "jars" + File.separator;

private static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";

private File localDir;

// FileSystem of the remote path, set only if remoteDir != null
private FileSystem remoteDirFs;
private Path remoteDir;

// Last modified time of local jars
private HashMap<String, Long> jarModifiedTime;

/**
* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
*
* @param conf the configuration for the cluster.
* @param parent the parent ClassLoader to set.
*/
public DynamicClassLoader(
final Configuration conf, final ClassLoader parent) {
super(parent);

jarModifiedTime = new HashMap<String, Long>();
String localDirPath = conf.get(
LOCAL_DIR_KEY, DEFAULT_LOCAL_DIR) + DYNAMIC_JARS_DIR;
localDir = new File(localDirPath);
if (!localDir.mkdirs() && !localDir.isDirectory()) {
throw new RuntimeException("Failed to create local dir " + localDir.getPath()
+ ", DynamicClassLoader failed to init");
}

String remotePath = conf.get(DYNAMIC_JARS_DIR_KEY);
if (remotePath == null || remotePath.equals(localDirPath)) {
remoteDir = null;  // ignore if it is the same as the local path
} else {
remoteDir = new Path(remotePath);
try {
remoteDirFs = remoteDir.getFileSystem(conf);
} catch (IOException ioe) {
LOG.warn("Failed to identify the fs of dir "
+ remoteDir + ", ignored", ioe);
remoteDir = null;
}
}
}

@Override
public Class<?> loadClass(String name)
throws ClassNotFoundException {
try {
return parent.loadClass(name);
} catch (ClassNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " not found - using dynamical class loader");
}

synchronized (getClassLoadingLock(name)) {
// Check whether the class has already been loaded:
Class<?> clasz = findLoadedClass(name);
if (clasz != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " already loaded");
}
}
else {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Finding class: " + name);
}
clasz = findClass(name);
} catch (ClassNotFoundException cnfe) {
// Load new jar files if any
if (LOG.isDebugEnabled()) {
LOG.debug("Loading new jar files, if any");
}
loadNewJars();

if (LOG.isDebugEnabled()) {
LOG.debug("Finding class again: " + name);
}
clasz = findClass(name);
}
}
return clasz;
}
}
}

private synchronized void loadNewJars() {
// Refresh local jar file lists
for (File file: localDir.listFiles()) {
String fileName = file.getName();
if (jarModifiedTime.containsKey(fileName)) {
continue;
}
if (file.isFile() && fileName.endsWith(".jar")) {
jarModifiedTime.put(fileName, Long.valueOf(file.lastModified()));
try {
URL url = file.toURI().toURL();
addURL(url);
} catch (MalformedURLException mue) {
// This should not happen, just log it
LOG.warn("Failed to load new jar " + fileName, mue);
}
}
}

// Check remote files
FileStatus[] statuses = null;
if (remoteDir != null) {
try {
statuses = remoteDirFs.listStatus(remoteDir);
} catch (IOException ioe) {
LOG.warn("Failed to check remote dir status " + remoteDir, ioe);
}
}
if (statuses == null || statuses.length == 0) {
return; // no remote files at all
}

for (FileStatus status: statuses) {
if (status.isDir()) continue; // No recursive lookup
Path path = status.getPath();
String fileName = path.getName();
if (!fileName.endsWith(".jar")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignored non-jar file " + fileName);
}
continue; // Ignore non-jar files
}
Long cachedLastModificationTime = jarModifiedTime.get(fileName);
if (cachedLastModificationTime != null) {
long lastModified = status.getModificationTime();
if (lastModified < cachedLastModificationTime.longValue()) {
// There could be some race, for example, someone uploads
// a new one right in the middle the old one is copied to
// local. We can check the size as well. But it is still
// not guaranteed. This should be rare. Most likely,
// we already have the latest one.
// If you are unlucky to hit this race issue, you have
// to touch the remote jar to update its last modified time
continue;
}
}
try {
// Copy it to local
File dst = new File(localDir, fileName);
remoteDirFs.copyToLocalFile(path, new Path(dst.getPath()));
jarModifiedTime.put(fileName, Long.valueOf(dst.lastModified()));
URL url = dst.toURI().toURL();
addURL(url);
} catch (IOException ioe) {
LOG.warn("Failed to load new jar " + fileName, ioe);
}
}
}
}

HBase Row Key Design part 1

When one issues a get/scan query against HBase table via HTable or HTablePool, it will first lookup the -ROOT-, .META. tables to locate the region servers that host the data. This is the phase where most of the data irrelevant to our query got skipped. That’s why the row key design is so important when come to using HBase.

For example, one can use the following composite key to partition data based on siteId so that query to scan all users belong to a specific site can be done efficiently by specifying the startRow with prefix siteId.

Composite Key:
siteId|userId

-ROOT- and .META. lookup will locate all the region servers that contain the data for the specified siteId. Then, multiple get/scan RPC calls will be initialized against these region servers to retrieve the required results.

To be continued.

For example, when we call HTable get(List) method, it will call the processBatchCallback method in HConnectionImplementation. In this method, it finds out the region servers that host the data and then establish calls to these servers.


 public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException {
      // This belongs in HTable!!! Not in here.  St.Ack

      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException(
            "argument results must be the same size as argument list");
      }
      if (list.isEmpty()) {
        return;
      }

      // Keep track of the most recent servers for any given item for better
      // exceptional reporting.  We keep HRegionLocation to save on parsing.
      // Later below when we use lastServers, we'll pull what we need from
      // lastServers.
      HRegionLocation [] lastServers = new HRegionLocation[results.length];
      List<Row> workingList = new ArrayList<Row>(list);
      boolean retry = true;
      // count that helps presize actions array
      int actionCount = 0;

      for (int tries = 0; tries < numRetries && retry; ++tries) {

        // sleep first, if this is a retry
        if (tries >= 1) {
          long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
          Thread.sleep(sleepTime);
        }
        // step 1: break up into regionserver-sized chunks and build the data structs
        Map<HRegionLocation, MultiAction<R>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<R>>();
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
            HRegionLocation loc = locateRegion(tableName, row.getRow());
            byte[] regionName = loc.getRegionInfo().getRegionName();

            MultiAction<R> actions = actionsByServer.get(loc);
            if (actions == null) {
              actions = new MultiAction<R>();
              actionsByServer.put(loc, actions);
            }

            Action<R> action = new Action<R>(row, i);
            lastServers[i] = loc;
            actions.add(regionName, action);
          }
        }

        // step 2: make the requests

        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());

        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }

        // step 3: collect the failures and successes and prepare for retry

        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
             : futures.entrySet()) {
          HRegionLocation loc = responsePerServer.getKey();

          try {
            Future<MultiResponse> future = responsePerServer.getValue();
            MultiResponse resp = future.get();

            if (resp == null) {
              // Entire server failed
              LOG.debug("Failed all for server: " + loc.getHostnamePort() +
                ", removing from cache");
              continue;
            }

            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
              byte[] regionName = e.getKey();
              List<Pair<Integer, Object>> regionResults = e.getValue();
              for (Pair<Integer, Object> regionResult : regionResults) {
                if (regionResult == null) {
                  // if the first/only record is 'null' the entire region failed.
                  LOG.debug("Failures for region: " +
                      Bytes.toStringBinary(regionName) +
                      ", removing from cache");
                } else {
                  // Result might be an Exception, including DNRIOE
                  results[regionResult.getFirst()] = regionResult.getSecond();
                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
                    callback.update(e.getKey(),
                        list.get(regionResult.getFirst()).getRow(),
                        (R)regionResult.getSecond());
                  }
                }
              }
            }
          } catch (ExecutionException e) {
            LOG.warn("Failed all from " + loc, e);
          }
        }

        // step 4: identify failures and prep for a retry (if applicable).

        // Find failures (i.e. null Result), and add them to the workingList (in
        // order), so they can be retried.
        retry = false;
        workingList.clear();
        actionCount = 0;
        for (int i = 0; i < results.length; i++) {
          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
          // then retry that row. else dont.
          if (results[i] == null ||
              (results[i] instanceof Throwable &&
                  !(results[i] instanceof DoNotRetryIOException))) {

            retry = true;
            actionCount++;
            Row row = list.get(i);
            workingList.add(row);
            deleteCachedLocation(tableName, row.getRow());
          } else {
            if (results[i] != null && results[i] instanceof Throwable) {
              actionCount++;
            }
            // add null to workingList, so the order remains consistent with the original list argument.
            workingList.add(null);
          }
        }
      }

      List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
      List<Row> actions = new ArrayList<Row>(actionCount);
      List<String> addresses = new ArrayList<String>(actionCount);

      for (int i = 0 ; i < results.length; i++) {
        if (results[i] == null || results[i] instanceof Throwable) {
          exceptions.add((Throwable)results[i]);
          actions.add(list.get(i));
          addresses.add(lastServers[i].getHostnamePort());
        }
      }

      if (!exceptions.isEmpty()) {
        throw new RetriesExhaustedWithDetailsException(exceptions,
            actions,
            addresses);
      }
    }

Dive into HTable internals

HTable is the class we can use to interact with a HBase table. Internally, it contains an instance of HConnection. This connection is returned by calling

HConnectionManager.getConnection(Configuration conf);

HConnectionManager is a Singleton non-instantiable class that manages HConnections. If you take a look at this HConnectionManager, there is a LinkedHashMap<Configuration, HConnectionImplementation> called HBASE_INSTANCES. It provides the mapping between Configuration and HConnection. This is to ensure that the same Configuration object would result in the same HConnection being returned by HConnectionManager. Different HTable clients would end up using the same HConnection if they were using the same Configuration. This allows they all to share the same cache of region locations information and avoid the repetitive region locations discovery. Also, the same zookeeper watcher/tracker could be reused.

The implementation of the HConnection is called HConnectionImplementation. When HConnection is instantiated, it setup zookeeper trackers/watchers on HBase master location/address and the .ROOT. region.

HConnectionImplementation provides RPC connection to HMaster, -ROOT- and .META. hosting servers. In Hadoop/HBase, RPC communication is implemented using java dynamic proxy pattern. For example, HMasterInterface acts as the proxy to call remote method on HBase master server.

To understand the underlying mechanism of RPC invocation involved in HTable, take a look at the put(Put put) method.

public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }

It will in turn call processBatchCallback method in HConnectionImplementation.

public &amp;amp;lt;R&amp;amp;gt; void processBatchCallback(
      List&amp;amp;lt;? extends Row&amp;amp;gt; list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback&amp;amp;lt;R&amp;amp;gt; callback)
    throws IOException, InterruptedException

In this method, given a list of Put operations, basically it groups them by the region servers. All those Put operations destined for the same regionserver would be grouped together so that they could be dispatched together to the same region server. It uses the following hashmap to maintain the groupings.

Map&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; actionsByServer = new HashMap&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt;();

It groups all the actions/operations by the HServerAddress. HServerAddress is a “label” for a HBase server made of host and port number.

Iterating through the map, it then fires up the requests to the corresponding region servers by creating Callable to be executed on the executor thread pool

for (Entry&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; e : actionsByServer.entrySet()) {
 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
 }

Let’s look at the createCallable method

private &amp;amp;lt;R&amp;amp;gt; Callable&amp;amp;lt;MultiResponse&amp;amp;gt; createCallable(final HRegionLocation loc,
        final MultiAction&amp;amp;lt;R&amp;amp;gt; multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable&amp;amp;lt;MultiResponse&amp;amp;gt;() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable&amp;amp;lt;MultiResponse&amp;amp;gt;(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

An anonymous Callable class with the implemented call() method is returned by the above method and will be executed in the executor thread pool. In the implemented call method, the RPC client proxy, HRegionInterface server makes the remote RPC method call, server.multi(multi), marshalling the method and params to the remote server. The connect(boolean reload) method is to establish RPC connection to the region server.

server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());

In the getHRegionConnection method, it uses HBaseRPC to create a dynamic proxy to the region server.

server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);

Stay tuned for my next post about HBaseRPC internals. We will dive into the mechanic of RPC call implementation in HBase.