Log Structured Merge Tree (LSM Tree) in HBase

If you have ever worked with HBase, Cassandra, Google Big Table, LevelDB, you probably have heard about Log Structured Merge tree. LSM differentiates these no-sql variants from the majority of RDBMS which use B+ tree.

In HBase, the LSM tree data structure concept is materialized by the use of HLog, Memstores, and storefiles. The main idea of LSM is that data is first kept in memory cache, eg. Memstores in HBase.  Each region server has multiple regions (HRegion). Each HRegion contains a section of data for a table. It has as many memstores as the number of column families for the table. HRegion keeps track of the total memstore size contributed by all these memstores. As you see in the following method in HRegion, after applying any mutation operations, it will check if the total memstore size exceeded the configured max flush size. If so, it will call the requestFlush() method. In the requestFlush() method, it basically delegates the call to the regionserver’s MemstoreFlusher


   * @param size

   * @return True if size is over the flush threshold


  private boolean isFlushSize(final long size) {

    return size > this.memstoreFlushSize;


   * Perform a batch of mutations.
   * It supports only Put and Delete mutations and will ignore other types passed.
   * @param batchOp contains the list of mutations
   * @return an array of OperationStatus which internally contains the
   *         OperationStatusCode and the exceptionMessage if any.
   * @throws IOException
  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {

        if (!initialized) {
          if (!batchOp.isInReplay()) {
          initialized = true;
        long addedSize = doMiniBatchMutation(batchOp);
        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
        if (isFlushSize(newSize)) {
    } finally {
    return batchOp.retCodeDetails;

  private void requestFlush() {

    if (this.rsServices == null) {



    synchronized (writestate) {

      if (this.writestate.isFlushRequested()) {



      writestate.flushRequested = true;


    // Make request outside of synchronize block; HBASE-818.

    this.rsServices.getFlushRequester().requestFlush(this, false);

    if (LOG.isDebugEnabled()) {

      LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());




HRegion loads up the configured max Memstore flush size.

This in memory cache has pre-configured max size. See the HRegion’s method below.

 void setHTableSpecificConf() {
    if (this.htableDescriptor == null) return;
    long flushSize = this.htableDescriptor.getMemStoreFlushSize();

    if (flushSize <= 0) {
      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
    this.memstoreFlushSize = flushSize;
    this.blockingMemStoreSize = this.memstoreFlushSize *

As soon as it exceeds the max size, the data will be flushed to disk, as storefiles in HBase.

In addition, there is a configurable max limit of number of storefiles permitted in HBase. When the number of storefiles exceeds the allowable max limit, compaction will be triggered to merge and compact these storefiles into a bigger one. The main purpose of this compaction is to speed up the read path to reduce number of store files to be read.

Please check out this great paper of LSM tree


HBase Pluggable Store, Memstore, StoreEngine

I was browsing the HBase codebase and noticed that many components in regionserver module (org.apache.hadoop.hbase.regionserver) had been refactored to become pluggable. This is a cleaner design because it means different implementations of these components can be plugged into HBase easily. Among the new interfaces are

Store, Memstore

The corresponding Implementation classes are

HStore implements Store

DefaultMemoryStore implements Memstore

Furthermore, StoreEngine is now an abstract class and acts as a factory to create StoreFileManager, CompactionPolicy and Compactor.
DefaultStoreEngine extends StoreEngine to create the default compactor, policy, and store file manager. This makes storefile management and compaction pluggable.

You can refer to the corresponding Jira and design doc



HBase 0.96 API change

I was coding DAO for HBase 0.96 the other day and found out some API changes from 0.94 to 0.96.

The introduction of new Cell interface and also KeyValue is no longer implementing Writable interface. Instead it implements Cell interface. With protobuf as the wire format, Scan, Get, Put are also no longer required to implement Writable. They extends abstract class OperationWithAttributes which extends another abstract class Operation and implements Attributes interface. The abstract class Operation mainly provides JSON conversion for logging/debugging purpose. The Attributes interface defines the methods for setting and maintaining Map of attributes of the Operation.


public interface Cell {

  //1) Row

   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Short.MAX_VALUE which is 32,767 bytes.
   * @return The array containing the row bytes.
  byte[] getRowArray();

   * @return Array index of first row byte
  int getRowOffset();

   * @return Number of row bytes. Must be < rowArray.length - offset.
  short getRowLength();

  //2) Family

   * Contiguous bytes composed of legal HDFS filename characters which may start at any index in the
   * containing array. Max length is Byte.MAX_VALUE, which is 127 bytes.
   * @return the array containing the family bytes.
  byte[] getFamilyArray();

   * @return Array index of first family byte
  int getFamilyOffset();

   * @return Number of family bytes.  Must be < familyArray.length - offset.
  byte getFamilyLength();

  //3) Qualifier

   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Short.MAX_VALUE which is 32,767 bytes.
   * @return The array containing the qualifier bytes.
  byte[] getQualifierArray();

   * @return Array index of first qualifier byte
  int getQualifierOffset();

   * @return Number of qualifier bytes.  Must be < qualifierArray.length - offset.
  int getQualifierLength();

  //4) Timestamp

   * @return Long value representing time at which this cell was "Put" into the row.  Typically
   * represents the time of insertion, but can be any value from 0 to Long.MAX_VALUE.
  long getTimestamp();

  //5) Type

   * @return The byte representation of the KeyValue.TYPE of this cell: one of Put, Delete, etc
  byte getTypeByte();

  //6) MvccVersion

   * Internal use only. A region-specific sequence ID given to each operation. It always exists for
   * cells in the memstore but is not retained forever. It may survive several flushes, but
   * generally becomes irrelevant after the cell's row is no longer involved in any operations that
   * require strict consistency.
   * @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
  long getMvccVersion();

  //7) Value

   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Integer.MAX_VALUE which is 2,147,483,648 bytes.
   * @return The array containing the value bytes.
  byte[] getValueArray();

   * @return Array index of first value byte
  int getValueOffset();

   * @return Number of value bytes.  Must be < valueArray.length - offset.
  int getValueLength();
   * @return the tags byte array
  byte[] getTagsArray();

   * @return the first offset where the tags start in the Cell
  int getTagsOffset();
   * @return the total length of the tags in the Cell.
  short getTagsLength();
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's value.
   * Added to ease transition from  0.94 -> 0.96.
   * @deprecated as of 0.96, use {@link CellUtil#cloneValue(Cell)}
  byte[] getValue();
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's family. 
   * Added to ease transition from  0.94 -> 0.96.
   * @deprecated as of 0.96, use {@link CellUtil#cloneFamily(Cell)}
  byte[] getFamily();

   * WARNING do not use, expensive.  This gets an arraycopy of the cell's qualifier.
   * Added to ease transition from  0.94 -> 0.96.
   * @deprecated as of 0.96, use {@link CellUtil#cloneQualifier(Cell)}
  byte[] getQualifier();

   * WARNING do not use, expensive.  this gets an arraycopy of the cell's row.
   * Added to ease transition from  0.94 -> 0.96.
   * @deprecated as of 0.96, use {@link CellUtil#getRowByte(Cell, int)}
  byte[] getRow();

Example: 0.96

   for(Cell cell: result.rawCells()) {


vs  0.94

for(KeyValue kv: result.raw()) {



ArrayList<Cell> list = new ArrayList<Cell>();

Result result = Result.create(list);



ArrayList<KeyValue> list = new ArrayList<KeyValue>();


Result result = new Result(list);



HBase Multiversion Concurrency Control

For those who are interested in Multiversion Concurrency Control MVCC in HBase, check out the below post, a very good introduction post.


Also, there is a good paper on the Theory and Algorithms of Multiversion Concurrency Control.

We can find the implementation of Multiversion Concurrency Control class in HBase code base, org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.
It is used by org.apache.hadoop.hbase.regionserver.HRegion.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.hadoop.hbase.regionserver;

import java.util.LinkedList;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

 * Manages the read/write consistency within memstore. This provides
 * an interface for readers to determine what entries to ignore, and
 * a mechanism for writers to obtain new write numbers, then "commit"
 * the new writes for readers to read (thus forming atomic transactions).
public class MultiVersionConsistencyControl {
  private volatile long memstoreRead = 0;
  private volatile long memstoreWrite = 0;

  private final Object readWaiters = new Object();

  // This is the pending queue of writes.
  private final LinkedList<WriteEntry> writeQueue =
      new LinkedList<WriteEntry>();

  private static final ThreadLocal<Long> perThreadReadPoint =
      new ThreadLocal<Long>() {
       Long initialValue() {
         return Long.MAX_VALUE;

   * Default constructor. Initializes the memstoreRead/Write points to 0.
  public MultiVersionConsistencyControl() {
    this.memstoreRead = this.memstoreWrite = 0;

   * Initializes the memstoreRead/Write points appropriately.
   * @param startPoint
  public void initialize(long startPoint) {
    synchronized (writeQueue) {
      if (this.memstoreWrite != this.memstoreRead) {
        throw new RuntimeException("Already used this mvcc. Too late to initialize");

      this.memstoreRead = this.memstoreWrite = startPoint;

   * Get this thread's read point. Used primarily by the memstore scanner to
   * know which values to skip (ie: have not been completed/committed to
   * memstore).
  public static long getThreadReadPoint() {
      return perThreadReadPoint.get();

   * Set the thread read point to the given value. The thread MVCC
   * is used by the Memstore scanner so it knows which values to skip.
   * Give it a value of 0 if you want everything.
  public static void setThreadReadPoint(long readPoint) {

   * Set the thread MVCC read point to whatever the current read point is in
   * this particular instance of MVCC.  Returns the new thread read point value.
  public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
    return getThreadReadPoint();

   * Set the thread MVCC read point to 0 (include everything).
  public static void resetThreadReadPoint() {

   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;
      WriteEntry e = new WriteEntry(nextWriteNumber);
      return e;

   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
  public void completeMemstoreInsert(WriteEntry e) {

   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
        } else {

      if (!ranOnce) {
        throw new RuntimeException("never was a first");

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      return false;

   * Wait for the global readPoint to advance upto
   * the specified transaction number.
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {
        try {
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
    if (interrupted) Thread.currentThread().interrupt();

  public long memstoreReadPoint() {
    return memstoreRead;

  public static class WriteEntry {
    private long writeNumber;
    private boolean completed = false;
    WriteEntry(long writeNumber) {
      this.writeNumber = writeNumber;
    void markCompleted() {
      this.completed = true;
    boolean isCompleted() {
      return this.completed;
    long getWriteNumber() {
      return this.writeNumber;

  public static final long FIXED_SIZE = ClassSize.align(
      ClassSize.OBJECT +
      2 * Bytes.SIZEOF_LONG +
      2 * ClassSize.REFERENCE);


HBase 0.96.0 release: Protobufs as wire format

HBase 0.96.0 was released recently on October 18th, 2013. Some notable changes are 1) Adopted protocol buffer as the wire format 2) Reduced mean time to recovery 3) Removed ROOT table Let’s take a look at all these new changes in depth to understand the new design and explore the differences/changes introduced in the codebase. In this first blog, we will take a look at protocol buffer adoption in HBase and examine the differences it introduced in the code base The main impact of the change is all the classes which previously implemented Writable interface to be marshaled over the RPC wire do not need to implement the interface anymore. In another word, both manual marshaling/demarshaling write(final DataOutput out) and readFields(final DataInput in) are not required anymore. Instead, it relies on protobufs to send the Get request over the RPC wire. Take a look at the following https://issues.apache.org/jira/browse/HBASE-5305 Get class after 0.96.0, does not need to implement Writable interface

public class Get extends OperationWithAttributes
  implements Row, Comparable

Old Get class definition implements Writable interface with readFields(final DataInput in) and write(final DataOutput out) implemented

public class Get extends OperationWithAttributes
  implements Writable, Row, Comparable {

 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > GET_VERSION) {
      throw new IOException("unsupported version");
    this.row = Bytes.readByteArray(in);
    this.lockId = in.readLong();
    this.maxVersions = in.readInt();
    boolean hasFilter = in.readBoolean();
    if (hasFilter) {
      this.filter = Classes.createWritableForName(
    this.cacheBlocks = in.readBoolean();
    this.tr = new TimeRange();
    int numFamilies = in.readInt();
    this.familyMap =
      new TreeMap<byte [],NavigableSet>(Bytes.BYTES_COMPARATOR);
    for(int i=0; i<numFamilies; i++) {
      byte [] family = Bytes.readByteArray(in);
      boolean hasColumns = in.readBoolean();
      NavigableSet set = null;
      if(hasColumns) {
        int numColumns = in.readInt();
        set = new TreeSet(Bytes.BYTES_COMPARATOR);
        for(int j=0; j<numColumns; j++) {
          byte [] qualifier = Bytes.readByteArray(in);
      this.familyMap.put(family, set);

  public void write(final DataOutput out)
  throws IOException {
    Bytes.writeByteArray(out, this.row);
    if(this.filter == null) {
    } else {
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
    for(Map.Entry<byte [], NavigableSet> entry :
      familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet columnSet = entry.getValue();
      if(columnSet == null) {
      } else {
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);

New Put class definition after 0.96.0

public class Put extends Mutation implements HeapSize, Comparable

Old Put class definition

public class Put extends Mutation
  implements HeapSize, Writable, Comparable {
 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > PUT_VERSION) {
      throw new IOException("version not supported");
    this.row = Bytes.readByteArray(in);
    this.ts = in.readLong();
    this.lockId = in.readLong();
    this.writeToWAL = in.readBoolean();
    int numFamilies = in.readInt();
    if (!this.familyMap.isEmpty()) this.familyMap.clear();
    for(int i=0;i<numFamilies;i++) {
      byte [] family = Bytes.readByteArray(in);
      int numKeys = in.readInt();
      List keys = new ArrayList(numKeys);
      int totalLen = in.readInt();
      byte [] buf = new byte[totalLen];
      int offset = 0;
      for (int j = 0; j  1) {

  public void write(final DataOutput out)
  throws IOException {
    Bytes.writeByteArray(out, this.row);
    for (Map.Entry<byte [], List> entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      List keys = entry.getValue();
      int totalLen = 0;
      for(KeyValue kv : keys) {
        totalLen += kv.getLength();
      for(KeyValue kv : keys) {
        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());

New Scan class definition after 0.96.0

public class Scan extends OperationWithAttributes 

Old Scan class definition

public class Scan extends OperationWithAttributes implements Writable {

You can find protobuf definition files in the src/main/protobuf folder of the hbase-protocol project In HTable, we can see how the protobuf is being used in action in the get(final Get get) method, RegionServerCallable is built which uses ProtobufUtil to make the actual protobuf GetRequest to the server HTable

  public Result get(final Get get) throws IOException {
    RegionServerCallable callable = new RegionServerCallable(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
    return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);

ProtobufUtil is in package org.apache.hadoop.hbase.protobuf is the protobuf util class to help build protobuf calls using the auto generated protobufs classes.

package org.apache.hadoop.hbase.protobuf;

   * A helper to invoke a Get using client protocol.
   * @param client
   * @param regionName
   * @param get
   * @return the result of the Get
   * @throws IOException
  public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);


   * Create a protocol buffer GetRequest for a client Get
   * @param regionName the name of the region to get
   * @param get the client Get
   * @return a protocol buffer GetRequest
  public static GetRequest buildGetRequest(final byte[] regionName,
      final Get get) throws IOException {
    GetRequest.Builder builder = GetRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    return builder.build();

A look inside HRegion’s Stores initialization

In HBase, a table is split into multiple regions and each region server hosts a certain set of regions

In each region (org.apache.hadoop.hbase.regionserver.HRegion), it maintains one or more Stores where each store is responsible for one column family. If you take a look at the org.apache.hadoop.hbase.regionserver.Store class, you will notice its member variables, MemStore and list of StoreFiles.

During the initialization of HRegion, it has to load these multiple Stores. During the process, each Store will also have to initialize and load up its Memstore and the list of StoreFiles. HRegion performs these initializations in multiple background threads instead of loading them sequentially. Basically, HRegion constructs a Callable to be executed via the thread pool to speed up the process.  After the callable submission, it makes use of Future and CompletionService to query about the loading status.

 if (this.htableDescriptor != null &&
        !htableDescriptor.getFamilies().isEmpty()) {
      // initialize the thread pool for opening stores in parallel.
      ThreadPoolExecutor storeOpenerThreadPool =
          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
      CompletionService<Store> completionService =
        new ExecutorCompletionService<Store>(storeOpenerThreadPool);

      // initialize each store in parallel
      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
        status.setStatus("Instantiating store for column family " + family);
        completionService.submit(new Callable<Store>() {
          public Store call() throws IOException {
            return instantiateHStore(tableDir, family);
      try {
        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
          Future<Store> future = completionService.take();
          Store store = future.get();

          this.stores.put(store.getColumnFamilyName().getBytes(), store);
          long storeSeqId = store.getMaxSequenceId();
          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
            maxSeqId = storeSeqId;
          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
          if (maxStoreMemstoreTS > maxMemstoreTS) {
            maxMemstoreTS = maxStoreMemstoreTS;
      } catch (InterruptedException e) {
        throw new IOException(e);
      } catch (ExecutionException e) {
        throw new IOException(e.getCause());
      } finally {

There is another interesting class, org.apache.hadoop.hbase.regionserver.MemStoreLAB. The main purpose of this class is to prevent heap fragmentation and long GC pauses. Take a look at the following excellent blog about the design of MSLAB (MemStore-Local Allocation Buffers) and how it solve the fragmentation issue.


Bucket in OpenTSDB

In OpenTSDB, it introduces the notion of time bucket, basically a way of grouping all data points fall within the specific time bucket. OpenTSDB uses the hourly time bucket for partitioning of data with the cell value is either of type integer or float. One could easily extend the same design to other domains storing more complex data types, for example, complex Avro type.

Even better, different time buckets could be designed such as daily, weekly, and monthly buckets. The sky is the limit here.

In openTSDB, the timestamp is broken into two parts, encoded both in the row key and qualifier.

The first one part is the hourly basetime encoded in the row key and the other is the delta seconds from the hourly bucket encoded in the qualifier.

The following code snippet from OpenTSDB shows how the base_time (hour bucket) is computed.
final long base_time = timestamp – (timestamp % Const.MAX_TIMESPAN);

private long updateBaseTime(final long timestamp) {

// We force the starting timestamp to be on a MAX_TIMESPAN boundary

// so that all TSDs create rows with the same base time.  Otherwise

// we'd need to coordinate TSDs to avoid creating rows that cover

// overlapping time periods.

final long base_time = timestamp - (timestamp % Const.MAX_TIMESPAN);

// Clone the row key since we're going to change it.  We must clone it

// because the HBase client may still hold a reference to it in its

// internal datastructures.

row = Arrays.copyOf(row, row.length);

Bytes.setInt(row, (int) base_time, tsdb.metrics.width());

tsdb.scheduleForCompaction(row, (int) base_time);

return base_time;


HBase Custom Filter

There are many different scan filters one could use in HBase. But, sometimes you simply need to build your own custom filter since none would satisfy your needs. With HBase 0.94.7, one could build his/her custom filter to be dynamically loaded by HBase without restarting the cluster.

It allows us to put our custom filter packaged as a jar to be placed in the following specified folder


If you want to understand the mechanic of how HBase accomplishes this, you could look at the following classes in org.apache.hadoop.hbase.util package.

Basically, custom filters can be dropped into a pre-configured folder (hbase.dynamic.jars.dir), which can be in hdfs. In this way, region servers can pick them up dynamically, without the need to restart the cluster for the new filters to take effect.

Classes.java, ClassLoaderBase.java, DynamicClassLoader.java

Look at the createWritableForName(String className) in the Classes.java.  A very common way of instantiating class via reflection.

public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);

* Copyright The Apache Software Foundation
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.hadoop.hbase.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.io.WritableFactories;

* Utilities for class manipulation.
public class Classes {

* Dynamic class loader to load filter/comparators
private final static ClassLoader CLASS_LOADER;

static {
ClassLoader parent = Classes.class.getClassLoader();
Configuration conf = HBaseConfiguration.create();
CLASS_LOADER = new DynamicClassLoader(conf, parent);

* Equivalent of {@link Class#forName(String)} which also returns classes for
* primitives like <code>boolean</code>, etc.
* @param className
*          The name of the class to retrieve. Can be either a normal class or
*          a primitive class.
* @return The class specified by <code>className</code>
* @throws ClassNotFoundException
*           If the requested class can not be found.
public static Class<?> extendedForName(String className)
throws ClassNotFoundException {
Class<?> valueType;
if (className.equals("boolean")) {
valueType = boolean.class;
} else if (className.equals("byte")) {
valueType = byte.class;
} else if (className.equals("short")) {
valueType = short.class;
} else if (className.equals("int")) {
valueType = int.class;
} else if (className.equals("long")) {
valueType = long.class;
} else if (className.equals("float")) {
valueType = float.class;
} else if (className.equals("double")) {
valueType = double.class;
} else if (className.equals("char")) {
valueType = char.class;
} else {
valueType = Class.forName(className);
return valueType;

public static String stringify(Class[] classes) {
StringBuilder buf = new StringBuilder();
if (classes != null) {
for (Class c : classes) {
if (buf.length() > 0) {
} else {
return buf.toString();

* Used to dynamically load a filter class, and create a Writable filter.
* This filter class most likely extends Configurable.
* @param className the filter class name.
* @return a filter
public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);

* This method is almost the same as #createWritableForName, except
* that this one doesn't expect the filter class to extends Configurable.
* @param className the filter class name.
* @return a filter
public static Filter createForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>)Class.forName(className, true, CLASS_LOADER);
return (Filter)clazz.newInstance();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
} catch (InstantiationException e) {
throw new RuntimeException("Couldn't instantiate " + className, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("No access to " + className, e);

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.util;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.classification.InterfaceAudience;

import com.google.common.base.Preconditions;

* Base class loader that defines couple shared constants used
* by sub-classes. It also defined method getClassLoadingLock for parallel
* class loading and JDK 1.6 support. This method (getClassLoadingLock)
* is similar to the same method in the base class Java ClassLoader
* introduced in JDK 1.7, but not in JDK 1.6.
public class ClassLoaderBase extends URLClassLoader {

// Maps class name to the corresponding lock object
private final ConcurrentHashMap<String, Object> parallelLockMap
= new ConcurrentHashMap<String, Object>();

protected static final String DEFAULT_LOCAL_DIR = "/tmp/hbase-local-dir";
protected static final String LOCAL_DIR_KEY = "hbase.local.dir";

* Parent class loader.
protected final ClassLoader parent;

* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
* @param parent the parent ClassLoader to set.
public ClassLoaderBase(final ClassLoader parent) {
super(new URL[]{}, parent);
Preconditions.checkNotNull(parent, "No parent classloader!");
this.parent = parent;

* Returns the lock object for class loading operations.
protected Object getClassLoadingLock(String className) {
Object lock = parallelLockMap.get(className);
if (lock != null) {
return lock;

Object newLock = new Object();
lock = parallelLockMap.putIfAbsent(className, newLock);
if (lock == null) {
lock = newLock;
return lock;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.util;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

* This is a class loader that can load classes dynamically from new
* jar files under a configured folder. The paths to the jar files are
* converted to URLs, and URLClassLoader logic is actually used to load
* classes. This class loader always uses its parent class loader
* to load a class at first. Only if its parent class loader
* can not load a class, we will try to load it using the logic here.
* The configured folder can be a HDFS path. In this case, the jar files
* under that folder will be copied to local at first under ${hbase.local.dir}/jars/.
* The local copy will be updated if the remote copy is updated, according to its
* last modified timestamp.
* We can't unload a class already loaded. So we will use the existing
* jar files we already know to load any class which can't be loaded
* using the parent class loader. If we still can't load the class from
* the existing jar files, we will check if any new jar file is added,
* if so, we will load the new jar file and try to load the class again.
* If still failed, a class not found exception will be thrown.
* Be careful in uploading new jar files and make sure all classes
* are consistent, otherwise, we may not be able to load your
* classes properly.
public class DynamicClassLoader extends ClassLoaderBase {
private static final Log LOG =

// Dynamic jars are put under ${hbase.local.dir}/jars/
private static final String DYNAMIC_JARS_DIR = File.separator
+ "jars" + File.separator;

private static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";

private File localDir;

// FileSystem of the remote path, set only if remoteDir != null
private FileSystem remoteDirFs;
private Path remoteDir;

// Last modified time of local jars
private HashMap<String, Long> jarModifiedTime;

* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
* @param conf the configuration for the cluster.
* @param parent the parent ClassLoader to set.
public DynamicClassLoader(
final Configuration conf, final ClassLoader parent) {

jarModifiedTime = new HashMap<String, Long>();
String localDirPath = conf.get(
localDir = new File(localDirPath);
if (!localDir.mkdirs() && !localDir.isDirectory()) {
throw new RuntimeException("Failed to create local dir " + localDir.getPath()
+ ", DynamicClassLoader failed to init");

String remotePath = conf.get(DYNAMIC_JARS_DIR_KEY);
if (remotePath == null || remotePath.equals(localDirPath)) {
remoteDir = null;  // ignore if it is the same as the local path
} else {
remoteDir = new Path(remotePath);
try {
remoteDirFs = remoteDir.getFileSystem(conf);
} catch (IOException ioe) {
LOG.warn("Failed to identify the fs of dir "
+ remoteDir + ", ignored", ioe);
remoteDir = null;

public Class<?> loadClass(String name)
throws ClassNotFoundException {
try {
return parent.loadClass(name);
} catch (ClassNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " not found - using dynamical class loader");

synchronized (getClassLoadingLock(name)) {
// Check whether the class has already been loaded:
Class<?> clasz = findLoadedClass(name);
if (clasz != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " already loaded");
else {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Finding class: " + name);
clasz = findClass(name);
} catch (ClassNotFoundException cnfe) {
// Load new jar files if any
if (LOG.isDebugEnabled()) {
LOG.debug("Loading new jar files, if any");

if (LOG.isDebugEnabled()) {
LOG.debug("Finding class again: " + name);
clasz = findClass(name);
return clasz;

private synchronized void loadNewJars() {
// Refresh local jar file lists
for (File file: localDir.listFiles()) {
String fileName = file.getName();
if (jarModifiedTime.containsKey(fileName)) {
if (file.isFile() && fileName.endsWith(".jar")) {
jarModifiedTime.put(fileName, Long.valueOf(file.lastModified()));
try {
URL url = file.toURI().toURL();
} catch (MalformedURLException mue) {
// This should not happen, just log it
LOG.warn("Failed to load new jar " + fileName, mue);

// Check remote files
FileStatus[] statuses = null;
if (remoteDir != null) {
try {
statuses = remoteDirFs.listStatus(remoteDir);
} catch (IOException ioe) {
LOG.warn("Failed to check remote dir status " + remoteDir, ioe);
if (statuses == null || statuses.length == 0) {
return; // no remote files at all

for (FileStatus status: statuses) {
if (status.isDir()) continue; // No recursive lookup
Path path = status.getPath();
String fileName = path.getName();
if (!fileName.endsWith(".jar")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignored non-jar file " + fileName);
continue; // Ignore non-jar files
Long cachedLastModificationTime = jarModifiedTime.get(fileName);
if (cachedLastModificationTime != null) {
long lastModified = status.getModificationTime();
if (lastModified < cachedLastModificationTime.longValue()) {
// There could be some race, for example, someone uploads
// a new one right in the middle the old one is copied to
// local. We can check the size as well. But it is still
// not guaranteed. This should be rare. Most likely,
// we already have the latest one.
// If you are unlucky to hit this race issue, you have
// to touch the remote jar to update its last modified time
try {
// Copy it to local
File dst = new File(localDir, fileName);
remoteDirFs.copyToLocalFile(path, new Path(dst.getPath()));
jarModifiedTime.put(fileName, Long.valueOf(dst.lastModified()));
URL url = dst.toURI().toURL();
} catch (IOException ioe) {
LOG.warn("Failed to load new jar " + fileName, ioe);

Nifty utility class from HBase to determine the size of class in memory

I came across this nifty util class from HBase used to determine the size of class in memory.

In summary, for 64 bits system,

Reference size = 8 bytes

Object : 2 x reference = 16 bytes

Array: 3 x reference = 24 bytes

Integer: 16 bytes + SIZEOF_INTEGER = 16 bytes + 4 bytes = 20 bytes => align to byte of eight, 28 bytes

Atomic Integer: 16 bytes + SIZEOF_INTEGER = 28 bytes

* Copyright 2010 The Apache Software Foundation
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.hadoop.hbase.util;

import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;

* Class for determining the "size" of a class, an attempt to calculate the
* actual bytes that an object of this class will occupy in memory
* The core of this class is taken from the Derby project
public class ClassSize {
static final Log LOG = LogFactory.getLog(ClassSize.class);

private static int nrOfRefsPerObj = 2;

/** Array overhead */
public static final int ARRAY;

/** Overhead for ArrayList(0) */
public static final int ARRAYLIST;

/** Overhead for ByteBuffer */
public static final int BYTE_BUFFER;

/** Overhead for an Integer */
public static final int INTEGER;

/** Overhead for entry in map */
public static final int MAP_ENTRY;

/** Object overhead is minimum 2 * reference size (8 bytes on 64-bit) */
public static final int OBJECT;

/** Reference size is 8 bytes on 64-bit, 4 bytes on 32-bit */
public static final int REFERENCE;

/** String overhead */
public static final int STRING;

/** Overhead for TreeMap */
public static final int TREEMAP;

/** Overhead for ConcurrentHashMap */
public static final int CONCURRENT_HASHMAP;

/** Overhead for ConcurrentHashMap.Entry */
public static final int CONCURRENT_HASHMAP_ENTRY;

/** Overhead for ConcurrentHashMap.Segment */
public static final int CONCURRENT_HASHMAP_SEGMENT;

/** Overhead for ConcurrentSkipListMap */
public static final int CONCURRENT_SKIPLISTMAP;

/** Overhead for ConcurrentSkipListMap Entry */
public static final int CONCURRENT_SKIPLISTMAP_ENTRY;

/** Overhead for ReentrantReadWriteLock */
public static final int REENTRANT_LOCK;

/** Overhead for AtomicLong */
public static final int ATOMIC_LONG;

/** Overhead for AtomicInteger */
public static final int ATOMIC_INTEGER;

/** Overhead for AtomicBoolean */
public static final int ATOMIC_BOOLEAN;

/** Overhead for CopyOnWriteArraySet */
public static final int COPYONWRITE_ARRAYSET;

/** Overhead for CopyOnWriteArrayList */
public static final int COPYONWRITE_ARRAYLIST;

/* Are we running on jdk7? */
private static final boolean JDK7;
static {
final String version = System.getProperty("java.version");
// Verify String looks like this: 1.6.0_29
if (!version.matches("\d\.\d\..*")) {
throw new RuntimeException("Unexpected version format: " + version);
// Convert char to int
int major = (int) (version.charAt(0) - '0');
int minor = (int) (version.charAt(2) - '0');
JDK7 = major == 1 && minor == 7;

* Method for reading the arc settings and setting overheads according
* to 32-bit or 64-bit architecture.
static {
//Default value is set to 8, covering the case when arcModel is unknown
if (is32BitJVM()) {
} else {


ARRAY = align(3 * REFERENCE);

ARRAYLIST = align(OBJECT + align(REFERENCE) + align(ARRAY) +
(2 * Bytes.SIZEOF_INT));

//noinspection PointlessArithmeticExpression
BYTE_BUFFER = align(OBJECT + align(REFERENCE) + align(ARRAY) +
(5 * Bytes.SIZEOF_INT) +



TREEMAP = align(OBJECT + (2 * Bytes.SIZEOF_INT) + align(7 * REFERENCE));

// STRING is different size in jdk6 and jdk7. Just use what we estimate as size rather than
// have a conditional on whether jdk7.
STRING = (int) estimateBase(String.class, false);

// CONCURRENT_HASHMAP is different size in jdk6 and jdk7; it looks like its different between
// 23.6-b03 and 23.0-b21. Just use what we estimate as size rather than have a conditional on
// whether jdk7.
CONCURRENT_HASHMAP = (int) estimateBase(ConcurrentHashMap.class, false);

(2 * Bytes.SIZEOF_INT));



align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
align((OBJECT + (3 * REFERENCE))/2)); /* one index per two entries */







* The estimate of the size of a class instance depends on whether the JVM
* uses 32 or 64 bit addresses, that is it depends on the size of an object
* reference. It is a linear function of the size of a reference, e.g.
* 24 + 5*r where r is the size of a reference (usually 4 or 8 bytes).
* This method returns the coefficients of the linear function, e.g. {24, 5}
* in the above example.
* @param cl A class whose instance size is to be estimated
* @param debug debug flag
* @return an array of 3 integers. The first integer is the size of the
* primitives, the second the number of arrays and the third the number of
* references.
private static int [] getSizeCoefficients(Class cl, boolean debug) {
int primitives = 0;
int arrays = 0;
//The number of references that a new object takes
int references = nrOfRefsPerObj;
int index = 0;

for ( ; null != cl; cl = cl.getSuperclass()) {
Field[] field = cl.getDeclaredFields();
if (null != field) {
for (Field aField : field) {
if (Modifier.isStatic(aField.getModifiers())) continue;
Class fieldClass = aField.getType();
if (fieldClass.isArray()) {
} else if (!fieldClass.isPrimitive()) {
} else {// Is simple primitive
String name = fieldClass.getName();

if (name.equals("int") || name.equals("I"))
primitives += Bytes.SIZEOF_INT;
else if (name.equals("long") || name.equals("J"))
primitives += Bytes.SIZEOF_LONG;
else if (name.equals("boolean") || name.equals("Z"))
primitives += Bytes.SIZEOF_BOOLEAN;
else if (name.equals("short") || name.equals("S"))
primitives += Bytes.SIZEOF_SHORT;
else if (name.equals("byte") || name.equals("B"))
primitives += Bytes.SIZEOF_BYTE;
else if (name.equals("char") || name.equals("C"))
primitives += Bytes.SIZEOF_CHAR;
else if (name.equals("float") || name.equals("F"))
primitives += Bytes.SIZEOF_FLOAT;
else if (name.equals("double") || name.equals("D"))
primitives += Bytes.SIZEOF_DOUBLE;
if (debug) {
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("" + index + " " + aField.getName() + " " + aField.getType());
return new int [] {primitives, arrays, references};

* Estimate the static space taken up by a class instance given the
* coefficients returned by getSizeCoefficients.
* @param coeff the coefficients
* @param debug debug flag
* @return the size estimate, in bytes
private static long estimateBaseFromCoefficients(int [] coeff, boolean debug) {
long prealign_size = coeff[0] + align(coeff[1] * ARRAY) + coeff[2] * REFERENCE;

// Round up to a multiple of 8
long size = align(prealign_size);
if(debug) {
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("Primitives=" + coeff[0] + ", arrays=" + coeff[1] +
", references(includes " + nrOfRefsPerObj +
" for object overhead)=" + coeff[2] + ", refSize " + REFERENCE +
", size=" + size + ", prealign_size=" + prealign_size);
return size;

* Estimate the static space taken up by the fields of a class. This includes
* the space taken up by by references (the pointer) but not by the referenced
* object. So the estimated size of an array field does not depend on the size
* of the array. Similarly the size of an object (reference) field does not
* depend on the object.
* @param cl class
* @param debug debug flag
* @return the size estimate in bytes.
public static long estimateBase(Class cl, boolean debug) {
return estimateBaseFromCoefficients( getSizeCoefficients(cl, debug), debug);

* Aligns a number to 8.
* @param num number to align to 8
* @return smallest number >= input that is a multiple of 8
public static int align(int num) {
return (int)(align((long)num));

* Aligns a number to 8.
* @param num number to align to 8
* @return smallest number >= input that is a multiple of 8
public static long align(long num) {
//The 7 comes from that the alignSize is 8 which is the number of bytes
//stored and sent together
return  ((num + 7) >> 3) << 3;

* Determines if we are running in a 32-bit JVM. Some unit tests need to
* know this too.
public static boolean is32BitJVM() {
return System.getProperty("sun.arch.data.model").equals("32");
