Spark Logistic Regression

Lets dive into the implementation of Logistic Regression in Spark. You can find the implementation in LogisticRegression.scala class. I am looking at the low level mllib library instead of the newer ml API.

/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

It defines the LogisticsRegressionModel which extends abstract class GeneralizedLinearModel (/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala) and ClassificationModel, Serializable, Saveable and PMMLExportable traits.

class LogisticRegressionModel @Since("1.3.0") (
    @Since("1.0.0") override val weights: Vector,
    @Since("1.0.0") override val intercept: Double,
    @Since("1.3.0") val numFeatures: Int,
    @Since("1.3.0") val numClasses: Int)
  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable
  with Saveable with PMMLExportable {

ClassificationModel defines various predict methods without concrete implementation

trait ClassificationModel extends Serializable {
  /**
   * Predict values for the given data set using the model trained.
   *
   * @param testData RDD representing data points to be predicted
   * @return an RDD[Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: RDD[Vector]): RDD[Double]

  /**
   * Predict values for a single data point using the model trained.
   *
   * @param testData array representing a single data point
   * @return predicted category from the trained model
   */
  @Since("1.0.0")
  def predict(testData: Vector): Double

  /**
   * Predict values for examples stored in a JavaRDD.
   * @param testData JavaRDD representing data points to be predicted
   * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
    predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
}

In LogisticRegressionModel’s predictPoint method, if the number of classes is 2 then we have binary classification, otherwise it will perform multinomial logistic regression classification.

 override protected def predictPoint(
      dataMatrix: Vector,
      weightMatrix: Vector,
      intercept: Double) = {
    require(dataMatrix.size == numFeatures)

    // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression.
    if (numClasses == 2) {
      val margin = dot(weightMatrix, dataMatrix) + intercept
      val score = 1.0 / (1.0 + math.exp(-margin))
      threshold match {
        case Some(t) => if (score > t) 1.0 else 0.0
        case None => score
      }
    } else {
      /**
       * Compute and find the one with maximum margins. If the maxMargin is negative, then the
       * prediction result will be the first class.
       *
       * PS, if you want to compute the probabilities for each outcome instead of the outcome
       * with maximum probability, remember to subtract the maxMargin from margins if maxMargin
       * is positive to prevent overflow.
       */
      var bestClass = 0
      var maxMargin = 0.0
      val withBias = dataMatrix.size + 1 == dataWithBiasSize
      (0 until numClasses - 1).foreach { i =>
        var margin = 0.0
        dataMatrix.foreachActive { (index, value) =>
          if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
        }
        // Intercept is required to be added into margin.
        if (withBias) {
          margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
        }
        if (margin > maxMargin) {
          maxMargin = margin
          bestClass = i + 1
        }
      }
      bestClass.toDouble
    }
  }

In binary classification, it uses the math below to calculate score value,

val margin = dot(weightMatrix, dataMatrix) + intercept
val score = 1.0 / (1.0 + math.exp(-margin))

if threshold is specified, then it checks calculated score against threshold and return class 1 if score is greater than threshold. Both dataMatrix and weightMatrix are of Vector type.

For binary logistic regression, it uses stochastic gradient descent with default L2 regularization to train the model. See the implementation of LogisticRegressionWithSGD below. It extends the abstract class GeneralizedLinearAlgorithm. GeneralizedLinearAlgorithm has an un-implemented empty method createModel as shown below.

protected def createModel(weights: Vector, intercept: Double): M

The abstract class has the base implementation of run(input: RDD[LabeledPoint], initialWeights: Vector): M method

which will call createModel method at the end.

Any extending class, eg. LogisticRegressionWithSGD provides the createModel implementation. It returns a trained classification model.

LogisticRegressionWithSGD contains the following train methods to build the model.
The train method takes RDD of LabeledPoint. LabeledPoint case class contains a double value (class label) and a Vector of features.

case class LabeledPoint @Since("1.0.0") (
    @Since("0.8.0") label: Double,
    @Since("1.0.0") features: Vector) {
  override def toString: String = {
    s"($label,$features)"
  }
}
@Since("0.8.0")
class LogisticRegressionWithSGD private[mllib] (
    private var stepSize: Double,
    private var numIterations: Int,
    private var regParam: Double,
    private var miniBatchFraction: Double)
  extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

  private val gradient = new LogisticGradient()
  private val updater = new SquaredL2Updater()
  @Since("0.8.0")
  override val optimizer = new GradientDescent(gradient, updater)
    .setStepSize(stepSize)
    .setNumIterations(numIterations)
    .setRegParam(regParam)
    .setMiniBatchFraction(miniBatchFraction)
  override protected val validators = List(DataValidators.binaryLabelValidator)

  /**
   * Construct a LogisticRegression object with default parameters: {stepSize: 1.0,
   * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.
   */
  @Since("0.8.0")
  def this() = this(1.0, 100, 0.01, 1.0)

  override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
    new LogisticRegressionModel(weights, intercept)
  }
}

/**
 * Top-level methods for calling Logistic Regression using Stochastic Gradient Descent.
 * NOTE: Labels used in Logistic Regression should be {0, 1}
 */
@Since("0.8.0")
object LogisticRegressionWithSGD {
  // NOTE(shivaram): We use multiple train methods instead of default arguments to support
  // Java programs.

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
   * gradient descent are initialized using the initial weights provided.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   * @param initialWeights Initial set of weights to be used. Array should be equal in size to
   *        the number of features in the data.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double,
      initialWeights: Vector): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input, initialWeights)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. We use the entire data
   * set to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param stepSize Step size to be used for each iteration of Gradient Descent.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double): LogisticRegressionModel = {
    train(input, numIterations, stepSize, 1.0)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using a step size of 1.0. We use the entire data set
   * to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int): LogisticRegressionModel = {
    train(input, numIterations, 1.0, 1.0)
  }
}

Deep Dive into YARN NodeManager

YARN framework is an event driven framework. It uses event handlers to listen and trigger callbacks to handle various events sent by components to the event queue. Threads are also being used by some event handlers to run long running logic after receiving the event. eg. in ResourceLocalizationService, during the event loop handling, it initializes LocalizerRunner thread to download remote resources. NodeManager is a core component in YARN. It consists of the following services internally

NodeStatusUpdater
NodeLabelsProvider
NodeResourceMonitor
ContainerManagerImpl
NMStateStoreService
NodeHealthCheckerService
LocalDirsHandlerService

Yes, it has a WebServer internally too.

These services are initialized within the serviceInit method


 @Override
  protected void serviceInit(Configuration conf) throws Exception 

One of the internal services, ContainerManagerImpl extends CompositeService and implements ServiceStateChangeListener, ContainerManagementProtocol, and EventHandler interfaces. It contains event handlers called ResourceLocalizationService and ContainersLauncher.

public class ContainerManagerImpl extends CompositeService implements
    ServiceStateChangeListener, ContainerManagementProtocol,
    EventHandler<ContainerManagerEvent>

Both ResourceLocalizationService and ContainerLauncher are defined in ContainerManagerImpl as follows

 private final ResourceLocalizationService rsrcLocalizationSrvc;
 private final ContainersLauncher containersLauncher;

As you can see in the class definition of ResourceLocalizationService, it implements EventHandler interface to handle LocalizationEvent type.

public class ResourceLocalizationService extends CompositeService
    implements EventHandler<LocalizationEvent>, LocalizationProtocol {

ContainersLauncher is the event handler responsible for initialization, starting, launching, and termination of containers. It implements EventHandler interface to handle ContainersLauncherEvent type, eg. LAUNCH_CONTAINER, RECOVER_CONTAINER, CLEANUP_CONTAINER events.

public class ContainersLauncher extends AbstractService
    implements EventHandler<ContainersLauncherEvent> {

ContainerManagerImpl registers the above event handlers as shown below.


dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);

In addition, it also registers other event handlers to listen and handle ContainersMonitorEvent, ContainerEvent, ApplicationEvent, and AuxServicesEvent.

dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, new ApplicationEventDispatcher());

In NodeManager, it has an important class called DefaultContainerExecutor which extends ContainerExecutor.

ContainerExecutor exec = ReflectionUtils.newInstance(
        conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
          DefaultContainerExecutor.class, ContainerExecutor.class), conf);
    try {
      exec.init();
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to initialize container executor", e);
    }    

This DefaultContainerExecutor provides the methods to activate and launch container. This class is used by both ResourceLocalizationService and ContainersLauncher. Both ResourceLocalizationService and ContainersLauncher are instantiated in ContainerManagerImpl.

If you look into DefaultContainerExecutor, you will find methods like
activateContainer, launchContainer used by ContainersLauncher whereas method startLocalizer is called by ResourceLocalizationService.

After a container is created, the required artifacts for the job have to be downloaded first, this is what we refer to as localization in YARN. Localization is to download remote resources onto the local file system. You can check out this excellent blog post by Hortonworks about resource localization.
http://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/

I will blog post about resource localization next time. Stay tuned.

Logistic Regression maximum likelihood estimation

Here are some great materials for Logistic Regression with focus in the maximum likelihood estimation of the model

Slides from Learning From Data

An Introduction to Logistic Regression: From Basic Concepts to Interpretation with Particular Attention to Nursing Domain

Algorithms for maximum likelihood logistic regression

Log Structured Merge Tree (LSM Tree) in HBase

If you have ever worked with HBase, Cassandra, Google Big Table, LevelDB, you probably have heard about Log Structured Merge tree. LSM differentiates these no-sql variants from the majority of RDBMS which use B+ tree.

In HBase, the LSM tree data structure concept is materialized by the use of HLog, Memstores, and storefiles. The main idea of LSM is that data is first kept in memory cache, eg. Memstores in HBase.  Each region server has multiple regions (HRegion). Each HRegion contains a section of data for a table. It has as many memstores as the number of column families for the table. HRegion keeps track of the total memstore size contributed by all these memstores. As you see in the following method in HRegion, after applying any mutation operations, it will check if the total memstore size exceeded the configured max flush size. If so, it will call the requestFlush() method. In the requestFlush() method, it basically delegates the call to the regionserver’s MemstoreFlusher

  /*

   * @param size

   * @return True if size is over the flush threshold

   */

  private boolean isFlushSize(final long size) {

    return size &gt; this.memstoreFlushSize;

  }

 /**
   * Perform a batch of mutations.
   * It supports only Put and Delete mutations and will ignore other types passed.
   * @param batchOp contains the list of mutations
   * @return an array of OperationStatus which internally contains the
   *         OperationStatusCode and the exceptionMessage if any.
   * @throws IOException
   */
  OperationStatus[] batchMutate(BatchOperationInProgress&lt;?&gt; batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {
          checkReadOnly();
        }
        checkResources();

        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            doPreMutationHook(batchOp);
          }
          initialized = true;
        }
        long addedSize = doMiniBatchMutation(batchOp);
        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
        if (isFlushSize(newSize)) {
          requestFlush();
        }
      }
    } finally {
      closeRegionOperation(op);
    }
    return batchOp.retCodeDetails;
  }


  private void requestFlush() {

    if (this.rsServices == null) {

      return;

    }

    synchronized (writestate) {

      if (this.writestate.isFlushRequested()) {

        return;

      }

      writestate.flushRequested = true;

    }

    // Make request outside of synchronize block; HBASE-818.

    this.rsServices.getFlushRequester().requestFlush(this, false);

    if (LOG.isDebugEnabled()) {

      LOG.debug(&quot;Flush requested on &quot; + this.getRegionInfo().getEncodedName());

    }

  }


 

HRegion loads up the configured max Memstore flush size.

This in memory cache has pre-configured max size. See the HRegion’s method below.

 void setHTableSpecificConf() {
    if (this.htableDescriptor == null) return;
    long flushSize = this.htableDescriptor.getMemStoreFlushSize();

    if (flushSize &lt;= 0) {
      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
    }
    this.memstoreFlushSize = flushSize;
    this.blockingMemStoreSize = this.memstoreFlushSize *
        conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
                HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
  }

As soon as it exceeds the max size, the data will be flushed to disk, as storefiles in HBase.

In addition, there is a configurable max limit of number of storefiles permitted in HBase. When the number of storefiles exceeds the allowable max limit, compaction will be triggered to merge and compact these storefiles into a bigger one. The main purpose of this compaction is to speed up the read path to reduce number of store files to be read.

Please check out this great paper of LSM tree

http://db.cs.berkeley.edu/cs286/papers/lsm-acta1996.pdf