HBase previous releases timeline

I was searching for HBase previous releases timeline and found this great pic and would like to share it here.  Here is the original link from ASF’s blog https://blogs.apache.org/hbase/entry/start_of_a_new_era

Advertisements

Log Structured Merge Tree (LSM Tree) in HBase

If you have ever worked with HBase, Cassandra, Google Big Table, LevelDB, you probably have heard about Log Structured Merge tree. LSM differentiates these no-sql variants from the majority of RDBMS which use B+ tree.

In HBase, the LSM tree data structure concept is materialized by the use of HLog, Memstores, and storefiles. The main idea of LSM is that data is first kept in memory cache, eg. Memstores in HBase.  Each region server has multiple regions (HRegion). Each HRegion contains a section of data for a table. It has as many memstores as the number of column families for the table. HRegion keeps track of the total memstore size contributed by all these memstores. As you see in the following method in HRegion, after applying any mutation operations, it will check if the total memstore size exceeded the configured max flush size. If so, it will call the requestFlush() method. In the requestFlush() method, it basically delegates the call to the regionserver’s MemstoreFlusher

  /*

   * @param size

   * @return True if size is over the flush threshold

   */

  private boolean isFlushSize(final long size) {

    return size > this.memstoreFlushSize;

  }

 /**
   * Perform a batch of mutations.
   * It supports only Put and Delete mutations and will ignore other types passed.
   * @param batchOp contains the list of mutations
   * @return an array of OperationStatus which internally contains the
   *         OperationStatusCode and the exceptionMessage if any.
   * @throws IOException
   */
  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {
          checkReadOnly();
        }
        checkResources();

        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            doPreMutationHook(batchOp);
          }
          initialized = true;
        }
        long addedSize = doMiniBatchMutation(batchOp);
        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
        if (isFlushSize(newSize)) {
          requestFlush();
        }
      }
    } finally {
      closeRegionOperation(op);
    }
    return batchOp.retCodeDetails;
  }


  private void requestFlush() {

    if (this.rsServices == null) {

      return;

    }

    synchronized (writestate) {

      if (this.writestate.isFlushRequested()) {

        return;

      }

      writestate.flushRequested = true;

    }

    // Make request outside of synchronize block; HBASE-818.

    this.rsServices.getFlushRequester().requestFlush(this, false);

    if (LOG.isDebugEnabled()) {

      LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());

    }

  }


 

HRegion loads up the configured max Memstore flush size.

This in memory cache has pre-configured max size. See the HRegion’s method below.

 void setHTableSpecificConf() {
    if (this.htableDescriptor == null) return;
    long flushSize = this.htableDescriptor.getMemStoreFlushSize();

    if (flushSize <= 0) {
      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
    }
    this.memstoreFlushSize = flushSize;
    this.blockingMemStoreSize = this.memstoreFlushSize *
        conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
                HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
  }

As soon as it exceeds the max size, the data will be flushed to disk, as storefiles in HBase.

In addition, there is a configurable max limit of number of storefiles permitted in HBase. When the number of storefiles exceeds the allowable max limit, compaction will be triggered to merge and compact these storefiles into a bigger one. The main purpose of this compaction is to speed up the read path to reduce number of store files to be read.

Please check out this great paper of LSM tree

http://db.cs.berkeley.edu/cs286/papers/lsm-acta1996.pdf

HBase Pluggable Store, Memstore, StoreEngine

I was browsing the HBase codebase and noticed that many components in regionserver module (org.apache.hadoop.hbase.regionserver) had been refactored to become pluggable. This is a cleaner design because it means different implementations of these components can be plugged into HBase easily. Among the new interfaces are

Store, Memstore

The corresponding Implementation classes are

HStore implements Store

DefaultMemoryStore implements Memstore

Furthermore, StoreEngine is now an abstract class and acts as a factory to create StoreFileManager, CompactionPolicy and Compactor.
DefaultStoreEngine extends StoreEngine to create the default compactor, policy, and store file manager. This makes storefile management and compaction pluggable.

You can refer to the corresponding Jira and design doc

https://issues.apache.org/jira/browse/HBASE-7678

https://issues.apache.org/jira/secure/attachment/12568488/Pluggable%20compactions%20doc.pdf

HBase 0.96 API change

I was coding DAO for HBase 0.96 the other day and found out some API changes from 0.94 to 0.96.

The introduction of new Cell interface and also KeyValue is no longer implementing Writable interface. Instead it implements Cell interface. With protobuf as the wire format, Scan, Get, Put are also no longer required to implement Writable. They extends abstract class OperationWithAttributes which extends another abstract class Operation and implements Attributes interface. The abstract class Operation mainly provides JSON conversion for logging/debugging purpose. The Attributes interface defines the methods for setting and maintaining Map of attributes of the Operation.

 

public interface Cell {

  //1) Row

  /**
   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Short.MAX_VALUE which is 32,767 bytes.
   * @return The array containing the row bytes.
   */
  byte[] getRowArray();

  /**
   * @return Array index of first row byte
   */
  int getRowOffset();

  /**
   * @return Number of row bytes. Must be < rowArray.length - offset.
   */
  short getRowLength();


  //2) Family

  /**
   * Contiguous bytes composed of legal HDFS filename characters which may start at any index in the
   * containing array. Max length is Byte.MAX_VALUE, which is 127 bytes.
   * @return the array containing the family bytes.
   */
  byte[] getFamilyArray();

  /**
   * @return Array index of first family byte
   */
  int getFamilyOffset();

  /**
   * @return Number of family bytes.  Must be < familyArray.length - offset.
   */
  byte getFamilyLength();


  //3) Qualifier

  /**
   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Short.MAX_VALUE which is 32,767 bytes.
   * @return The array containing the qualifier bytes.
   */
  byte[] getQualifierArray();

  /**
   * @return Array index of first qualifier byte
   */
  int getQualifierOffset();

  /**
   * @return Number of qualifier bytes.  Must be < qualifierArray.length - offset.
   */
  int getQualifierLength();


  //4) Timestamp

  /**
   * @return Long value representing time at which this cell was "Put" into the row.  Typically
   * represents the time of insertion, but can be any value from 0 to Long.MAX_VALUE.
   */
  long getTimestamp();


  //5) Type

  /**
   * @return The byte representation of the KeyValue.TYPE of this cell: one of Put, Delete, etc
   */
  byte getTypeByte();


  //6) MvccVersion

  /**
   * Internal use only. A region-specific sequence ID given to each operation. It always exists for
   * cells in the memstore but is not retained forever. It may survive several flushes, but
   * generally becomes irrelevant after the cell's row is no longer involved in any operations that
   * require strict consistency.
   * @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
   */
  long getMvccVersion();


  //7) Value

  /**
   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Integer.MAX_VALUE which is 2,147,483,648 bytes.
   * @return The array containing the value bytes.
   */
  byte[] getValueArray();

  /**
   * @return Array index of first value byte
   */
  int getValueOffset();

  /**
   * @return Number of value bytes.  Must be < valueArray.length - offset.
   */
  int getValueLength();
  
  /**
   * @return the tags byte array
   */
  byte[] getTagsArray();

  /**
   * @return the first offset where the tags start in the Cell
   */
  int getTagsOffset();
  
  /**
   * @return the total length of the tags in the Cell.
   */
  short getTagsLength();
  
  /**
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's value.
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#cloneValue(Cell)}
   */
  @Deprecated
  byte[] getValue();
  
  /**
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's family. 
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#cloneFamily(Cell)}
   */
  @Deprecated
  byte[] getFamily();

  /**
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's qualifier.
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#cloneQualifier(Cell)}
   */
  @Deprecated
  byte[] getQualifier();

  /**
   * WARNING do not use, expensive.  this gets an arraycopy of the cell's row.
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#getRowByte(Cell, int)}
   */
  @Deprecated
  byte[] getRow();
}

Example: 0.96

   for(Cell cell: result.rawCells()) {

}

vs  0.94

for(KeyValue kv: result.raw()) {

}

0.96

ArrayList<Cell> list = new ArrayList<Cell>();

Result result = Result.create(list);

vs

0.94

ArrayList<KeyValue> list = new ArrayList<KeyValue>();

list.add(keyValue);

Result result = new Result(list);

 

 

HBase Multiversion Concurrency Control

For those who are interested in Multiversion Concurrency Control MVCC in HBase, check out the below post, a very good introduction post.

http://blogs.apache.org/hbase/entry/apache_hbase_internals_locking_and

Also, there is a good paper on the Theory and Algorithms of Multiversion Concurrency Control.
http://www.cse.ust.hk/~yjrobin/reading_list/[Transaction]Multiversion%20Concurrency%20Control-Theory%20and%20Algorithms.pdf

We can find the implementation of Multiversion Concurrency Control class in HBase code base, org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.
It is used by org.apache.hadoop.hbase.regionserver.HRegion.


/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.regionserver;

import java.util.LinkedList;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
 * Manages the read/write consistency within memstore. This provides
 * an interface for readers to determine what entries to ignore, and
 * a mechanism for writers to obtain new write numbers, then "commit"
 * the new writes for readers to read (thus forming atomic transactions).
 */
@InterfaceAudience.Private
public class MultiVersionConsistencyControl {
  private volatile long memstoreRead = 0;
  private volatile long memstoreWrite = 0;

  private final Object readWaiters = new Object();

  // This is the pending queue of writes.
  private final LinkedList<WriteEntry> writeQueue =
      new LinkedList<WriteEntry>();

  private static final ThreadLocal<Long> perThreadReadPoint =
      new ThreadLocal<Long>() {
       @Override
      protected
       Long initialValue() {
         return Long.MAX_VALUE;
       }
  };

  /**
   * Default constructor. Initializes the memstoreRead/Write points to 0.
   */
  public MultiVersionConsistencyControl() {
    this.memstoreRead = this.memstoreWrite = 0;
  }

  /**
   * Initializes the memstoreRead/Write points appropriately.
   * @param startPoint
   */
  public void initialize(long startPoint) {
    synchronized (writeQueue) {
      if (this.memstoreWrite != this.memstoreRead) {
        throw new RuntimeException("Already used this mvcc. Too late to initialize");
      }

      this.memstoreRead = this.memstoreWrite = startPoint;
    }
  }

  /**
   * Get this thread's read point. Used primarily by the memstore scanner to
   * know which values to skip (ie: have not been completed/committed to
   * memstore).
   */
  public static long getThreadReadPoint() {
      return perThreadReadPoint.get();
  }

  /**
   * Set the thread read point to the given value. The thread MVCC
   * is used by the Memstore scanner so it knows which values to skip.
   * Give it a value of 0 if you want everything.
   */
  public static void setThreadReadPoint(long readPoint) {
    perThreadReadPoint.set(readPoint);
  }

  /**
   * Set the thread MVCC read point to whatever the current read point is in
   * this particular instance of MVCC.  Returns the new thread read point value.
   */
  public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
    perThreadReadPoint.set(mvcc.memstoreReadPoint());
    return getThreadReadPoint();
  }

  /**
   * Set the thread MVCC read point to 0 (include everything).
   */
  public static void resetThreadReadPoint() {
    perThreadReadPoint.set(0L);
  }

  /**
   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
   */
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
  }

  /**
   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   *
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
   */
  public void completeMemstoreInsert(WriteEntry e) {
    advanceMemstore(e);
    waitForRead(e);
  }

  /**
   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   *
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   *
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
   */
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {
      e.markCompleted();

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("never was a first");
      }

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
          readWaiters.notifyAll();
        }
      }
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      }
      return false;
    }
  }

  /**
   * Wait for the global readPoint to advance upto
   * the specified transaction number.
   */
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {
        try {
          readWaiters.wait(0);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
  }

  public long memstoreReadPoint() {
    return memstoreRead;
  }


  public static class WriteEntry {
    private long writeNumber;
    private boolean completed = false;
    WriteEntry(long writeNumber) {
      this.writeNumber = writeNumber;
    }
    void markCompleted() {
      this.completed = true;
    }
    boolean isCompleted() {
      return this.completed;
    }
    long getWriteNumber() {
      return this.writeNumber;
    }
  }

  public static final long FIXED_SIZE = ClassSize.align(
      ClassSize.OBJECT +
      2 * Bytes.SIZEOF_LONG +
      2 * ClassSize.REFERENCE);

}


HBase 0.96.0 release: Protobufs as wire format

HBase 0.96.0 was released recently on October 18th, 2013. Some notable changes are 1) Adopted protocol buffer as the wire format 2) Reduced mean time to recovery 3) Removed ROOT table Let’s take a look at all these new changes in depth to understand the new design and explore the differences/changes introduced in the codebase. In this first blog, we will take a look at protocol buffer adoption in HBase and examine the differences it introduced in the code base The main impact of the change is all the classes which previously implemented Writable interface to be marshaled over the RPC wire do not need to implement the interface anymore. In another word, both manual marshaling/demarshaling write(final DataOutput out) and readFields(final DataInput in) are not required anymore. Instead, it relies on protobufs to send the Get request over the RPC wire. Take a look at the following https://issues.apache.org/jira/browse/HBASE-5305 Get class after 0.96.0, does not need to implement Writable interface

public class Get extends OperationWithAttributes
  implements Row, Comparable

Old Get class definition implements Writable interface with readFields(final DataInput in) and write(final DataOutput out) implemented

public class Get extends OperationWithAttributes
  implements Writable, Row, Comparable {

 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > GET_VERSION) {
      throw new IOException("unsupported version");
    }
    this.row = Bytes.readByteArray(in);
    this.lockId = in.readLong();
    this.maxVersions = in.readInt();
    boolean hasFilter = in.readBoolean();
    if (hasFilter) {
      this.filter = Classes.createWritableForName(
        Bytes.toString(Bytes.readByteArray(in)));
      this.filter.readFields(in);
    }
    this.cacheBlocks = in.readBoolean();
    this.tr = new TimeRange();
    tr.readFields(in);
    int numFamilies = in.readInt();
    this.familyMap =
      new TreeMap<byte [],NavigableSet>(Bytes.BYTES_COMPARATOR);
    for(int i=0; i<numFamilies; i++) {
      byte [] family = Bytes.readByteArray(in);
      boolean hasColumns = in.readBoolean();
      NavigableSet set = null;
      if(hasColumns) {
        int numColumns = in.readInt();
        set = new TreeSet(Bytes.BYTES_COMPARATOR);
        for(int j=0; j<numColumns; j++) {
          byte [] qualifier = Bytes.readByteArray(in);
          set.add(qualifier);
        }
      }
      this.familyMap.put(family, set);
    }
    readAttributes(in);
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(GET_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.lockId);
    out.writeInt(this.maxVersions);
    if(this.filter == null) {
      out.writeBoolean(false);
    } else {
      out.writeBoolean(true);
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
      filter.write(out);
    }
    out.writeBoolean(this.cacheBlocks);
    tr.write(out);
    out.writeInt(familyMap.size());
    for(Map.Entry<byte [], NavigableSet> entry :
      familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet columnSet = entry.getValue();
      if(columnSet == null) {
        out.writeBoolean(false);
      } else {
        out.writeBoolean(true);
        out.writeInt(columnSet.size());
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);
        }
      }
    }
    writeAttributes(out);
  }
}

New Put class definition after 0.96.0

public class Put extends Mutation implements HeapSize, Comparable

Old Put class definition

public class Put extends Mutation
  implements HeapSize, Writable, Comparable {
 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > PUT_VERSION) {
      throw new IOException("version not supported");
    }
    this.row = Bytes.readByteArray(in);
    this.ts = in.readLong();
    this.lockId = in.readLong();
    this.writeToWAL = in.readBoolean();
    int numFamilies = in.readInt();
    if (!this.familyMap.isEmpty()) this.familyMap.clear();
    for(int i=0;i<numFamilies;i++) {
      byte [] family = Bytes.readByteArray(in);
      int numKeys = in.readInt();
      List keys = new ArrayList(numKeys);
      int totalLen = in.readInt();
      byte [] buf = new byte[totalLen];
      int offset = 0;
      for (int j = 0; j  1) {
      readAttributes(in);
    }
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(PUT_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.ts);
    out.writeLong(this.lockId);
    out.writeBoolean(this.writeToWAL);
    out.writeInt(familyMap.size());
    for (Map.Entry<byte [], List> entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      List keys = entry.getValue();
      out.writeInt(keys.size());
      int totalLen = 0;
      for(KeyValue kv : keys) {
        totalLen += kv.getLength();
      }
      out.writeInt(totalLen);
      for(KeyValue kv : keys) {
        out.writeInt(kv.getLength());
        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
      }
    }
    writeAttributes(out);
  }

New Scan class definition after 0.96.0

public class Scan extends OperationWithAttributes 

Old Scan class definition

public class Scan extends OperationWithAttributes implements Writable {

You can find protobuf definition files in the src/main/protobuf folder of the hbase-protocol project In HTable, we can see how the protobuf is being used in action in the get(final Get get) method, RegionServerCallable is built which uses ProtobufUtil to make the actual protobuf GetRequest to the server HTable

@Override
  public Result get(final Get get) throws IOException {
    RegionServerCallable callable = new RegionServerCallable(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
  }

ProtobufUtil is in package org.apache.hadoop.hbase.protobuf is the protobuf util class to help build protobuf calls using the auto generated protobufs classes.

package org.apache.hadoop.hbase.protobuf;

 /**
   * A helper to invoke a Get using client protocol.
   *
   * @param client
   * @param regionName
   * @param get
   * @return the result of the Get
   * @throws IOException
   */
  public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
  }

RequestConverter

 /**
   * Create a protocol buffer GetRequest for a client Get
   *
   * @param regionName the name of the region to get
   * @param get the client Get
   * @return a protocol buffer GetRequest
   */
  public static GetRequest buildGetRequest(final byte[] regionName,
      final Get get) throws IOException {
    GetRequest.Builder builder = GetRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    builder.setRegion(region);
    builder.setGet(ProtobufUtil.toGet(get));
    return builder.build();
  }

A look inside HRegion’s Stores initialization

In HBase, a table is split into multiple regions and each region server hosts a certain set of regions

In each region (org.apache.hadoop.hbase.regionserver.HRegion), it maintains one or more Stores where each store is responsible for one column family. If you take a look at the org.apache.hadoop.hbase.regionserver.Store class, you will notice its member variables, MemStore and list of StoreFiles.

During the initialization of HRegion, it has to load these multiple Stores. During the process, each Store will also have to initialize and load up its Memstore and the list of StoreFiles. HRegion performs these initializations in multiple background threads instead of loading them sequentially. Basically, HRegion constructs a Callable to be executed via the thread pool to speed up the process.  After the callable submission, it makes use of Future and CompletionService to query about the loading status.

 if (this.htableDescriptor != null &&
        !htableDescriptor.getFamilies().isEmpty()) {
      // initialize the thread pool for opening stores in parallel.
      ThreadPoolExecutor storeOpenerThreadPool =
        getStoreOpenAndCloseThreadPool(
          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
      CompletionService<Store> completionService =
        new ExecutorCompletionService<Store>(storeOpenerThreadPool);

      // initialize each store in parallel
      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
        status.setStatus("Instantiating store for column family " + family);
        completionService.submit(new Callable<Store>() {
          public Store call() throws IOException {
            return instantiateHStore(tableDir, family);
          }
        });
      }
      try {
        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
          Future<Store> future = completionService.take();
          Store store = future.get();

          this.stores.put(store.getColumnFamilyName().getBytes(), store);
          long storeSeqId = store.getMaxSequenceId();
          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
              storeSeqId);
          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
            maxSeqId = storeSeqId;
          }
          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
          if (maxStoreMemstoreTS > maxMemstoreTS) {
            maxMemstoreTS = maxStoreMemstoreTS;
          }
        }
      } catch (InterruptedException e) {
        throw new IOException(e);
      } catch (ExecutionException e) {
        throw new IOException(e.getCause());
      } finally {
        storeOpenerThreadPool.shutdownNow();
      }
    }

There is another interesting class, org.apache.hadoop.hbase.regionserver.MemStoreLAB. The main purpose of this class is to prevent heap fragmentation and long GC pauses. Take a look at the following excellent blog about the design of MSLAB (MemStore-Local Allocation Buffers) and how it solve the fragmentation issue.

http://blog.cloudera.com/blog/2011/03/avoiding-full-gcs-in-hbase-with-memstore-local-allocation-buffers-part-3/