HBase 0.96.0 release: Protobufs as wire format

HBase 0.96.0 was released recently on October 18th, 2013. Some notable changes are 1) Adopted protocol buffer as the wire format 2) Reduced mean time to recovery 3) Removed ROOT table Let’s take a look at all these new changes in depth to understand the new design and explore the differences/changes introduced in the codebase. In this first blog, we will take a look at protocol buffer adoption in HBase and examine the differences it introduced in the code base The main impact of the change is all the classes which previously implemented Writable interface to be marshaled over the RPC wire do not need to implement the interface anymore. In another word, both manual marshaling/demarshaling write(final DataOutput out) and readFields(final DataInput in) are not required anymore. Instead, it relies on protobufs to send the Get request over the RPC wire. Take a look at the following https://issues.apache.org/jira/browse/HBASE-5305 Get class after 0.96.0, does not need to implement Writable interface

public class Get extends OperationWithAttributes
  implements Row, Comparable

Old Get class definition implements Writable interface with readFields(final DataInput in) and write(final DataOutput out) implemented

public class Get extends OperationWithAttributes
  implements Writable, Row, Comparable {

 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > GET_VERSION) {
      throw new IOException("unsupported version");
    }
    this.row = Bytes.readByteArray(in);
    this.lockId = in.readLong();
    this.maxVersions = in.readInt();
    boolean hasFilter = in.readBoolean();
    if (hasFilter) {
      this.filter = Classes.createWritableForName(
        Bytes.toString(Bytes.readByteArray(in)));
      this.filter.readFields(in);
    }
    this.cacheBlocks = in.readBoolean();
    this.tr = new TimeRange();
    tr.readFields(in);
    int numFamilies = in.readInt();
    this.familyMap =
      new TreeMap<byte [],NavigableSet>(Bytes.BYTES_COMPARATOR);
    for(int i=0; i<numFamilies; i++) {
      byte [] family = Bytes.readByteArray(in);
      boolean hasColumns = in.readBoolean();
      NavigableSet set = null;
      if(hasColumns) {
        int numColumns = in.readInt();
        set = new TreeSet(Bytes.BYTES_COMPARATOR);
        for(int j=0; j<numColumns; j++) {
          byte [] qualifier = Bytes.readByteArray(in);
          set.add(qualifier);
        }
      }
      this.familyMap.put(family, set);
    }
    readAttributes(in);
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(GET_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.lockId);
    out.writeInt(this.maxVersions);
    if(this.filter == null) {
      out.writeBoolean(false);
    } else {
      out.writeBoolean(true);
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
      filter.write(out);
    }
    out.writeBoolean(this.cacheBlocks);
    tr.write(out);
    out.writeInt(familyMap.size());
    for(Map.Entry<byte [], NavigableSet> entry :
      familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet columnSet = entry.getValue();
      if(columnSet == null) {
        out.writeBoolean(false);
      } else {
        out.writeBoolean(true);
        out.writeInt(columnSet.size());
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);
        }
      }
    }
    writeAttributes(out);
  }
}

New Put class definition after 0.96.0

public class Put extends Mutation implements HeapSize, Comparable

Old Put class definition

public class Put extends Mutation
  implements HeapSize, Writable, Comparable {
 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > PUT_VERSION) {
      throw new IOException("version not supported");
    }
    this.row = Bytes.readByteArray(in);
    this.ts = in.readLong();
    this.lockId = in.readLong();
    this.writeToWAL = in.readBoolean();
    int numFamilies = in.readInt();
    if (!this.familyMap.isEmpty()) this.familyMap.clear();
    for(int i=0;i<numFamilies;i++) {
      byte [] family = Bytes.readByteArray(in);
      int numKeys = in.readInt();
      List keys = new ArrayList(numKeys);
      int totalLen = in.readInt();
      byte [] buf = new byte[totalLen];
      int offset = 0;
      for (int j = 0; j  1) {
      readAttributes(in);
    }
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(PUT_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.ts);
    out.writeLong(this.lockId);
    out.writeBoolean(this.writeToWAL);
    out.writeInt(familyMap.size());
    for (Map.Entry<byte [], List> entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      List keys = entry.getValue();
      out.writeInt(keys.size());
      int totalLen = 0;
      for(KeyValue kv : keys) {
        totalLen += kv.getLength();
      }
      out.writeInt(totalLen);
      for(KeyValue kv : keys) {
        out.writeInt(kv.getLength());
        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
      }
    }
    writeAttributes(out);
  }

New Scan class definition after 0.96.0

public class Scan extends OperationWithAttributes 

Old Scan class definition

public class Scan extends OperationWithAttributes implements Writable {

You can find protobuf definition files in the src/main/protobuf folder of the hbase-protocol project In HTable, we can see how the protobuf is being used in action in the get(final Get get) method, RegionServerCallable is built which uses ProtobufUtil to make the actual protobuf GetRequest to the server HTable

@Override
  public Result get(final Get get) throws IOException {
    RegionServerCallable callable = new RegionServerCallable(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
  }

ProtobufUtil is in package org.apache.hadoop.hbase.protobuf is the protobuf util class to help build protobuf calls using the auto generated protobufs classes.

package org.apache.hadoop.hbase.protobuf;

 /**
   * A helper to invoke a Get using client protocol.
   *
   * @param client
   * @param regionName
   * @param get
   * @return the result of the Get
   * @throws IOException
   */
  public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
  }

RequestConverter

 /**
   * Create a protocol buffer GetRequest for a client Get
   *
   * @param regionName the name of the region to get
   * @param get the client Get
   * @return a protocol buffer GetRequest
   */
  public static GetRequest buildGetRequest(final byte[] regionName,
      final Get get) throws IOException {
    GetRequest.Builder builder = GetRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    builder.setRegion(region);
    builder.setGet(ProtobufUtil.toGet(get));
    return builder.build();
  }

A look inside HRegion’s Stores initialization

In HBase, a table is split into multiple regions and each region server hosts a certain set of regions

In each region (org.apache.hadoop.hbase.regionserver.HRegion), it maintains one or more Stores where each store is responsible for one column family. If you take a look at the org.apache.hadoop.hbase.regionserver.Store class, you will notice its member variables, MemStore and list of StoreFiles.

During the initialization of HRegion, it has to load these multiple Stores. During the process, each Store will also have to initialize and load up its Memstore and the list of StoreFiles. HRegion performs these initializations in multiple background threads instead of loading them sequentially. Basically, HRegion constructs a Callable to be executed via the thread pool to speed up the process.  After the callable submission, it makes use of Future and CompletionService to query about the loading status.

 if (this.htableDescriptor != null &&
        !htableDescriptor.getFamilies().isEmpty()) {
      // initialize the thread pool for opening stores in parallel.
      ThreadPoolExecutor storeOpenerThreadPool =
        getStoreOpenAndCloseThreadPool(
          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
      CompletionService<Store> completionService =
        new ExecutorCompletionService<Store>(storeOpenerThreadPool);

      // initialize each store in parallel
      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
        status.setStatus("Instantiating store for column family " + family);
        completionService.submit(new Callable<Store>() {
          public Store call() throws IOException {
            return instantiateHStore(tableDir, family);
          }
        });
      }
      try {
        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
          Future<Store> future = completionService.take();
          Store store = future.get();

          this.stores.put(store.getColumnFamilyName().getBytes(), store);
          long storeSeqId = store.getMaxSequenceId();
          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
              storeSeqId);
          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
            maxSeqId = storeSeqId;
          }
          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
          if (maxStoreMemstoreTS > maxMemstoreTS) {
            maxMemstoreTS = maxStoreMemstoreTS;
          }
        }
      } catch (InterruptedException e) {
        throw new IOException(e);
      } catch (ExecutionException e) {
        throw new IOException(e.getCause());
      } finally {
        storeOpenerThreadPool.shutdownNow();
      }
    }

There is another interesting class, org.apache.hadoop.hbase.regionserver.MemStoreLAB. The main purpose of this class is to prevent heap fragmentation and long GC pauses. Take a look at the following excellent blog about the design of MSLAB (MemStore-Local Allocation Buffers) and how it solve the fragmentation issue.

http://blog.cloudera.com/blog/2011/03/avoiding-full-gcs-in-hbase-with-memstore-local-allocation-buffers-part-3/

Mesos vs YARN

I will continue to add more infos as I learn and discover more about their differences.

Mesos paper

http://www.cs.berkeley.edu/~matei/papers/2011/nsdi_mesos.pdf

YARN intro article

http://developer.yahoo.com/blogs/hadoop/next-generation-apache-hadoop-mapreduce-scheduler-4141.html

YARN design doc

https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf

YARN Jira ticket

https://issues.apache.org/jira/browse/MAPREDUCE-279

 

Bucket in OpenTSDB

In OpenTSDB, it introduces the notion of time bucket, basically a way of grouping all data points fall within the specific time bucket. OpenTSDB uses the hourly time bucket for partitioning of data with the cell value is either of type integer or float. One could easily extend the same design to other domains storing more complex data types, for example, complex Avro type.

Even better, different time buckets could be designed such as daily, weekly, and monthly buckets. The sky is the limit here.

In openTSDB, the timestamp is broken into two parts, encoded both in the row key and qualifier.

The first one part is the hourly basetime encoded in the row key and the other is the delta seconds from the hourly bucket encoded in the qualifier.

The following code snippet from OpenTSDB shows how the base_time (hour bucket) is computed.
final long base_time = timestamp – (timestamp % Const.MAX_TIMESPAN);

private long updateBaseTime(final long timestamp) {

// We force the starting timestamp to be on a MAX_TIMESPAN boundary

// so that all TSDs create rows with the same base time.  Otherwise

// we'd need to coordinate TSDs to avoid creating rows that cover

// overlapping time periods.

final long base_time = timestamp - (timestamp % Const.MAX_TIMESPAN);

// Clone the row key since we're going to change it.  We must clone it

// because the HBase client may still hold a reference to it in its

// internal datastructures.

row = Arrays.copyOf(row, row.length);

Bytes.setInt(row, (int) base_time, tsdb.metrics.width());

tsdb.scheduleForCompaction(row, (int) base_time);

return base_time;

}