HBase 0.96 API change

I was coding DAO for HBase 0.96 the other day and found out some API changes from 0.94 to 0.96.

The introduction of new Cell interface and also KeyValue is no longer implementing Writable interface. Instead it implements Cell interface. With protobuf as the wire format, Scan, Get, Put are also no longer required to implement Writable. They extends abstract class OperationWithAttributes which extends another abstract class Operation and implements Attributes interface. The abstract class Operation mainly provides JSON conversion for logging/debugging purpose. The Attributes interface defines the methods for setting and maintaining Map of attributes of the Operation.

 

public interface Cell {

  //1) Row

  /**
   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Short.MAX_VALUE which is 32,767 bytes.
   * @return The array containing the row bytes.
   */
  byte[] getRowArray();

  /**
   * @return Array index of first row byte
   */
  int getRowOffset();

  /**
   * @return Number of row bytes. Must be < rowArray.length - offset.
   */
  short getRowLength();


  //2) Family

  /**
   * Contiguous bytes composed of legal HDFS filename characters which may start at any index in the
   * containing array. Max length is Byte.MAX_VALUE, which is 127 bytes.
   * @return the array containing the family bytes.
   */
  byte[] getFamilyArray();

  /**
   * @return Array index of first family byte
   */
  int getFamilyOffset();

  /**
   * @return Number of family bytes.  Must be < familyArray.length - offset.
   */
  byte getFamilyLength();


  //3) Qualifier

  /**
   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Short.MAX_VALUE which is 32,767 bytes.
   * @return The array containing the qualifier bytes.
   */
  byte[] getQualifierArray();

  /**
   * @return Array index of first qualifier byte
   */
  int getQualifierOffset();

  /**
   * @return Number of qualifier bytes.  Must be < qualifierArray.length - offset.
   */
  int getQualifierLength();


  //4) Timestamp

  /**
   * @return Long value representing time at which this cell was "Put" into the row.  Typically
   * represents the time of insertion, but can be any value from 0 to Long.MAX_VALUE.
   */
  long getTimestamp();


  //5) Type

  /**
   * @return The byte representation of the KeyValue.TYPE of this cell: one of Put, Delete, etc
   */
  byte getTypeByte();


  //6) MvccVersion

  /**
   * Internal use only. A region-specific sequence ID given to each operation. It always exists for
   * cells in the memstore but is not retained forever. It may survive several flushes, but
   * generally becomes irrelevant after the cell's row is no longer involved in any operations that
   * require strict consistency.
   * @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
   */
  long getMvccVersion();


  //7) Value

  /**
   * Contiguous raw bytes that may start at any index in the containing array. Max length is
   * Integer.MAX_VALUE which is 2,147,483,648 bytes.
   * @return The array containing the value bytes.
   */
  byte[] getValueArray();

  /**
   * @return Array index of first value byte
   */
  int getValueOffset();

  /**
   * @return Number of value bytes.  Must be < valueArray.length - offset.
   */
  int getValueLength();
  
  /**
   * @return the tags byte array
   */
  byte[] getTagsArray();

  /**
   * @return the first offset where the tags start in the Cell
   */
  int getTagsOffset();
  
  /**
   * @return the total length of the tags in the Cell.
   */
  short getTagsLength();
  
  /**
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's value.
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#cloneValue(Cell)}
   */
  @Deprecated
  byte[] getValue();
  
  /**
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's family. 
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#cloneFamily(Cell)}
   */
  @Deprecated
  byte[] getFamily();

  /**
   * WARNING do not use, expensive.  This gets an arraycopy of the cell's qualifier.
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#cloneQualifier(Cell)}
   */
  @Deprecated
  byte[] getQualifier();

  /**
   * WARNING do not use, expensive.  this gets an arraycopy of the cell's row.
   *
   * Added to ease transition from  0.94 -> 0.96.
   * 
   * @deprecated as of 0.96, use {@link CellUtil#getRowByte(Cell, int)}
   */
  @Deprecated
  byte[] getRow();
}

Example: 0.96

   for(Cell cell: result.rawCells()) {

}

vs  0.94

for(KeyValue kv: result.raw()) {

}

0.96

ArrayList<Cell> list = new ArrayList<Cell>();

Result result = Result.create(list);

vs

0.94

ArrayList<KeyValue> list = new ArrayList<KeyValue>();

list.add(keyValue);

Result result = new Result(list);

 

 

HBase 0.96.0 release: Protobufs as wire format

HBase 0.96.0 was released recently on October 18th, 2013. Some notable changes are 1) Adopted protocol buffer as the wire format 2) Reduced mean time to recovery 3) Removed ROOT table Let’s take a look at all these new changes in depth to understand the new design and explore the differences/changes introduced in the codebase. In this first blog, we will take a look at protocol buffer adoption in HBase and examine the differences it introduced in the code base The main impact of the change is all the classes which previously implemented Writable interface to be marshaled over the RPC wire do not need to implement the interface anymore. In another word, both manual marshaling/demarshaling write(final DataOutput out) and readFields(final DataInput in) are not required anymore. Instead, it relies on protobufs to send the Get request over the RPC wire. Take a look at the following https://issues.apache.org/jira/browse/HBASE-5305 Get class after 0.96.0, does not need to implement Writable interface

public class Get extends OperationWithAttributes
  implements Row, Comparable

Old Get class definition implements Writable interface with readFields(final DataInput in) and write(final DataOutput out) implemented

public class Get extends OperationWithAttributes
  implements Writable, Row, Comparable {

 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > GET_VERSION) {
      throw new IOException("unsupported version");
    }
    this.row = Bytes.readByteArray(in);
    this.lockId = in.readLong();
    this.maxVersions = in.readInt();
    boolean hasFilter = in.readBoolean();
    if (hasFilter) {
      this.filter = Classes.createWritableForName(
        Bytes.toString(Bytes.readByteArray(in)));
      this.filter.readFields(in);
    }
    this.cacheBlocks = in.readBoolean();
    this.tr = new TimeRange();
    tr.readFields(in);
    int numFamilies = in.readInt();
    this.familyMap =
      new TreeMap<byte [],NavigableSet>(Bytes.BYTES_COMPARATOR);
    for(int i=0; i<numFamilies; i++) {
      byte [] family = Bytes.readByteArray(in);
      boolean hasColumns = in.readBoolean();
      NavigableSet set = null;
      if(hasColumns) {
        int numColumns = in.readInt();
        set = new TreeSet(Bytes.BYTES_COMPARATOR);
        for(int j=0; j<numColumns; j++) {
          byte [] qualifier = Bytes.readByteArray(in);
          set.add(qualifier);
        }
      }
      this.familyMap.put(family, set);
    }
    readAttributes(in);
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(GET_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.lockId);
    out.writeInt(this.maxVersions);
    if(this.filter == null) {
      out.writeBoolean(false);
    } else {
      out.writeBoolean(true);
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
      filter.write(out);
    }
    out.writeBoolean(this.cacheBlocks);
    tr.write(out);
    out.writeInt(familyMap.size());
    for(Map.Entry<byte [], NavigableSet> entry :
      familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet columnSet = entry.getValue();
      if(columnSet == null) {
        out.writeBoolean(false);
      } else {
        out.writeBoolean(true);
        out.writeInt(columnSet.size());
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);
        }
      }
    }
    writeAttributes(out);
  }
}

New Put class definition after 0.96.0

public class Put extends Mutation implements HeapSize, Comparable

Old Put class definition

public class Put extends Mutation
  implements HeapSize, Writable, Comparable {
 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version > PUT_VERSION) {
      throw new IOException("version not supported");
    }
    this.row = Bytes.readByteArray(in);
    this.ts = in.readLong();
    this.lockId = in.readLong();
    this.writeToWAL = in.readBoolean();
    int numFamilies = in.readInt();
    if (!this.familyMap.isEmpty()) this.familyMap.clear();
    for(int i=0;i<numFamilies;i++) {
      byte [] family = Bytes.readByteArray(in);
      int numKeys = in.readInt();
      List keys = new ArrayList(numKeys);
      int totalLen = in.readInt();
      byte [] buf = new byte[totalLen];
      int offset = 0;
      for (int j = 0; j  1) {
      readAttributes(in);
    }
  }

  public void write(final DataOutput out)
  throws IOException {
    out.writeByte(PUT_VERSION);
    Bytes.writeByteArray(out, this.row);
    out.writeLong(this.ts);
    out.writeLong(this.lockId);
    out.writeBoolean(this.writeToWAL);
    out.writeInt(familyMap.size());
    for (Map.Entry<byte [], List> entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      List keys = entry.getValue();
      out.writeInt(keys.size());
      int totalLen = 0;
      for(KeyValue kv : keys) {
        totalLen += kv.getLength();
      }
      out.writeInt(totalLen);
      for(KeyValue kv : keys) {
        out.writeInt(kv.getLength());
        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
      }
    }
    writeAttributes(out);
  }

New Scan class definition after 0.96.0

public class Scan extends OperationWithAttributes 

Old Scan class definition

public class Scan extends OperationWithAttributes implements Writable {

You can find protobuf definition files in the src/main/protobuf folder of the hbase-protocol project In HTable, we can see how the protobuf is being used in action in the get(final Get get) method, RegionServerCallable is built which uses ProtobufUtil to make the actual protobuf GetRequest to the server HTable

@Override
  public Result get(final Get get) throws IOException {
    RegionServerCallable callable = new RegionServerCallable(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
  }

ProtobufUtil is in package org.apache.hadoop.hbase.protobuf is the protobuf util class to help build protobuf calls using the auto generated protobufs classes.

package org.apache.hadoop.hbase.protobuf;

 /**
   * A helper to invoke a Get using client protocol.
   *
   * @param client
   * @param regionName
   * @param get
   * @return the result of the Get
   * @throws IOException
   */
  public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
  }

RequestConverter

 /**
   * Create a protocol buffer GetRequest for a client Get
   *
   * @param regionName the name of the region to get
   * @param get the client Get
   * @return a protocol buffer GetRequest
   */
  public static GetRequest buildGetRequest(final byte[] regionName,
      final Get get) throws IOException {
    GetRequest.Builder builder = GetRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    builder.setRegion(region);
    builder.setGet(ProtobufUtil.toGet(get));
    return builder.build();
  }