TensorFlow Data Flow Graph Optimization

TensorFlow represents an user program as a computation graph/data flow graph where each node represents a mathematical computation eg. add, subtract, matrix multiply, ReLU, etc and each edge represents input/output tensor data. A node has zero or more input edges and zero or more output edges. Data flow graph is an important design because TensorFlow can perform the following code optimizations using the knowledge about the computation graph.

  • Remove dead nodes. These are the source nodes, sink nodes, control flow nodes, and stateful nodes.
  • Remove identity nodes. (https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/common_runtime/function.cc)
  • Perform constant folding to check if a node can be evaluated as constant and therefore eligible for subsequent constant propagation.
  • Perform function inlining.
  • Perform common subexpression elimination. This technique is to find the common subexpressions within the graph and replacing them with a single computation to avoid redundant computation Read more about common subexpression elimination here

Check out the full implementation in graph_optimizer.cc

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/common_runtime/graph_optimizer.cc

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/common_runtime/constant_folding.cc

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/graph/optimizer_cse.cc

Deep Learning on AWS GPU Instance

Amazon has released a Deep Learning AMI and makes the process of running deep learning on GPU way easier than before. Before the availability of this AMI, I had to go through the painstaking process of installing all the required CUDA and cuDNN libraries and then spending lots of time in debugging just to get everything running. This AMI makes the on-boarding process much much easier and smoother. Great works indeed !

Follow the steps in the following blog to launch the instance.
https://aws.amazon.com/blogs/machine-learning/get-started-with-deep-learning-using-the-aws-deep-learning-ami/

Go to AWS Marketplace, search for deep learning AMI (ubuntu) and create an instance from the image

Screen Shot 2018-03-31 at 9.58.13 AM

Select the p3.2xlarge instance type. Make sure you check the hourly pricing for selected instance. It adds up quickly for GPU instance. For p3.2xlarge instance, it costs ~ $3.06/hour. See this EC2 pricing link.

Note: Remember to terminate your instance after use.

Screen Shot 2018-04-08 at 2.08.53 PM

Review and Launch your instance

Screen Shot 2018-04-08 at 2.11.38 PM

Screen Shot 2018-03-31 at 10.02.21 AM

SSH to your new instance

ssh -L localhost:8888:localhost:8888 -i ~/.ssh/my-key-pair.pem ubuntu@xx.xxx.xxx.xxx


Screen Shot 2018-04-08 at 3.08.48 PM

Activate TensorFlow + Keras 2 on Python 3 with CUDA 8 using the following command

source activate tensorflow_p36


Run some examples
cd ~/tutorials/TensorFlow/board

python mnist_with_summaries.py
Screen Shot 2018-03-31 at 10.43.36 AM.png

Start tensorboard using the following command. You can then access the tensorboard UI at http://your-aws-instance-public-ip:6006
tensorboard --logdir=/tmp/tensorflow/mnist
Screen Shot 2018-04-08 at 3.22.42 PM

Screen Shot 2018-03-31 at 12.18.52 PM

Try out other examples

git clone https://github.com/keras-team/keras.git

cd keras/examples

python cifar10_cnn.py

Screen Shot 2018-04-08 at 4.56.48 PM

 

 

StringIndexer transform fails when column contains nulls

If you run into NullPointerException when using StringIndexer in Spark version < 2.2.0, this means that your input column contains null values. You would have to remove/impute these null values before using StringIndexer. See ticket below. Good news is this issue was fixed in Spark version 2.2.0

https://issues.apache.org/jira/browse/SPARK-11569

With the fix, we can specify how StringIndexer should handle null values, three different strategies are available as below.

handleInvalid=error: Throw an exception as before
handleInvalid=skip: Skip null values as well as unseen labels
handleInvalid=keep: Give null values an additional index as well as unseen labels

val codeIndexer = new StringIndexer().setInputCol("originalCode").setOutputCol("originalCodeCategory")
codeIndexer.setHandleInvalid("keep")

 

 

 

Zillow Price Kaggle Competition Part 2

From my last blog, I calculated the missing value percentage for every columns in the data. Next thing to do is to perform imputation of missing values in selected columns before model training.

However, I am going to skip doing in depth data cleaning and feature selection eg. removing outliers and calculating correlations between features, etc. I will do that in the next blog 😉

Instead I am going to use  the following features to build a model. These columns have low missing value percentage.

"bedroomcnt"
"bathroomcnt"
"roomcnt"
"taxamount"
"taxvaluedollarcnt"
"lotsizesquarefeet"
"finishedsquarefeet12"
"latitude"
"longitude"

I replaced missing values in bedroomcnt with 3 and bathroomcnt with 2 (use most frequent value) and drop any row that has any missing values in the other columns.

In this experiment, I used Spark to train a Gradient Boost Tree Regression model. I built a Spark ML pipeline to perform hyperparameter tuning of GBT. Basically, it will test a grid of different hyperparameters and choose the best parameters based on the evaluation metric, RMSE.

val paramGrid = new ParamGridBuilder()
      .addGrid(gbt.maxDepth, Array(2,5))
      .addGrid(gbt.maxIter, Array(50,100))
      .build()

 

Screen Shot 2017-08-07 at 12.37.39 AM

Screen Shot 2017-08-07 at 12.42.14 AM

Next I will need to go back to data cleaning and feature selection to choose better/more features to improve the model.

Zillow Prize Kaggle Competition: Exploratory Analysis Part 1

You can download the Zillow Price Kaggle datasets from https://www.kaggle.com/c/zillow-prize-1
For this exploratory study of property dataset, I used Spark to calculate the missing value percentage for each column. As you can see below, as many as 20 columns have greater than 90% null/empty values. Examples: storytypeid, basementsqft, yardbuildingsqft.

(Column Name, Missing Value Percentage)
(storytypeid,99.94559859467502)
(basementsqft,99.94546460106585)
(yardbuildingsqft26,99.91132972912857)
(fireplaceflag,99.82704774895761)
(architecturalstyletypeid,99.79696618369786)
(typeconstructiontypeid,99.7739862797244)
(finishedsquarefeet13,99.74300025760272)
(buildingclasstypeid,99.57694867743282)
(decktypeid,99.42731131438686)
(finishedsquarefeet6,99.26300165113625)
(poolsizesum,99.06338467186806)
(pooltypeid2,98.92553874642948)
(pooltypeid10,98.76260251767292)
(taxdelinquencyflag,98.10861320969296)
(taxdelinquencyyear,98.10854621288838)
(hashottuborspa,97.68814126410241)
(yardbuildingsqft17,97.30823588368953)
(finishedsquarefeet15,93.60857183916613)
(finishedfloor1squarefeet,93.20930438222749)
(finishedsquarefeet50,93.20930438222749)

(threequarterbathnbr,89.56085939481116)
(fireplacecnt,89.5271600021037)
(pooltypeid7,83.73789912090143)
(poolcnt,82.66343786733091)
(numberofstories,77.15177824593657)
(airconditioningtypeid,72.81541006901676)
(garagecarcnt,70.41196670124819)
(garagetotalsqft,70.41196670124819)
(regionidneighborhood,61.26238059075773)
(heatingorsystemtypeid,39.488452598253325)
(buildingqualitytypeid,35.06374913448503)
(unitcnt,33.75724444822604)
(propertyzoningdesc,33.71908976801352)
(lotsizesquarefeet,9.248875374888994)
(finishedsquarefeet12,9.24666448033761)
(calculatedbathnbr,4.31834603648579)
(fullbathcnt,4.31834603648579)
(censustractandblock,2.5166009707167016)
(landtaxvaluedollarcnt,2.268947282559358)
(regionidcity,2.10520709214774)
(yearbuilt,2.0074922526570096)
(calculatedfinishedsquarefeet,1.8613387234495853)
(structuretaxvaluedollarcnt,1.8418091549123563)
(taxvaluedollarcnt,1.4253570175970458)
(taxamount,1.0468250716782062)
(regionidzip,0.46830766406596236)
(propertycountylandusecode,0.4112598849597868)
(roomcnt,0.3843941663202374)
(bathroomcnt,0.38395868709041925)
(bedroomcnt,0.3835567062628948)
(assessmentyear,0.38318822383766404)
(fips,0.38312122703307666)
(latitude,0.38312122703307666)
(longitude,0.38312122703307666)
(propertylandusetypeid,0.38312122703307666)
(rawcensustractandblock,0.38312122703307666)
(regionidcounty,0.38312122703307666)
(parcelid,0.0)

Screen Shot 2017-08-05 at 12.24.07 AM

Now I have identified the good columns (italics bold, less than 10% empty values) and bad columns. The next step is to determine if we can apply some sorts of missing values imputation to any of those problematic columns.

A few missing value imputation techniques are available, such as using mean, median, or most frequent value as replacement value.

InstaCart: Product Recommender

InstaCart has recently open sourced anonymized data on customer orders for its Kaggle competition. You can find out more info from the link below.

https://www.kaggle.com/c/instacart-market-basket-analysis

Previously, I did some data exploration to discover some interesting insights. See my previous blog post.

Next I would like to use their anonymized data to build a product recommender system. There are many approaches/strategies in product recommendations, eg. Most popular items, Also bought/viewed, Featured items, and etc. I am going to explore the most popular items approach and collaborative filtering (also bought) approach.

We can identify the top 5 popular items for each department and utilize them for most popular items recommendation. For example, the top 5 items for frozen department are Blueberries, Organic Broccoli Florets, Organic Whole Strawberries, Pipeapple Chunks, Frozen Organic Wild Blueberries.

Screen Shot 2017-07-10 at 10.51.55 PM

Next, I utilized Spark ALS algorithm to  build a collaborative filtering based recommender system.

Some stats about the data

Total user-product pair (rating): 1,384,617

Total users: 131,209

Total products: 39,123

Given the data, I splitted the data into train set (0.8) vs test set (0.2) randomly. This resulted in
Number of train elements: 1,107,353
Number of test elements: 277,264

Here are the parameters I used in ALS, rank = 10, lambda = 1, number of iterations = 10, 50, 60, 80

  • Number of iterations = 10,    RMSE = 0.9999895
  • Number of iterations = 50,    RMSE = 0.9999828
  • Number of iterations = 60,    RMSE = 0.9999875
  • Number of iterations = 80,    RMSE = 0.9999933

We can also try out different ranks in grid search.

Here are some recommendations suggested by ALS collaborative filtering algorithm (number of iterations =80, rank=10)

Given user 124383 Transaction History

+——-+———-+—–+———-+—————————+——–+————-+
|user_id|product_id|count|product_id|product_name |aisle_id|department_id|
+——-+———-+—–+———-+—————————+——–+————-+
|124383 |49478 |1 |49478 |Frozen Organic Strawberries|24 |4 |
|124383 |21903 |1 |21903 |Organic Baby Spinach |123 |4 |
|124383 |19508 |1 |19508 |Corn Tortillas |128 |3 |
|124383 |20114 |1 |20114 |Jalapeno Peppers |83 |4 |
|124383 |44142 |1 |44142 |Red Onion |83 |4 |
|124383 |20345 |1 |20345 |Thin Crust Pepperoni Pizza |79 |1 |
|124383 |27966 |1 |27966 |Organic Raspberries |123 |4 |
+——-+———-+—–+———-+—————————+——–+——————+

Here are the recommendations
+———-+————————————————————-+——–+————-+
|product_id|product_name |aisle_id|department_id|
+———-+————————————————————-+——–+——————+
|28717 |Sport Deluxe Adjustable Black Ankle Stabilizer |133 |11 |
|15372 |Meditating Cedarwood Mineral Bath |25 |11 |
|18962 |Arroz Calasparra Paella Rice |63 |9 |
|2528 |Cluckin’ Good Stew |40 |8 |
|21156 |Dreamy Cold Brew Concentrate |90 |7 |
|12841 |King Crab Legs |39 |12 |
|24862 |Old Indian Wild Cherry Bark Syrup |47 |11 |
|37535 |Voluminous Extra-Volume Collagen Mascara – Blackest Black 680|132 |11 |
|30847 |Wild Oregano Oil |47 |11 |
+———-+————————————————————-+——–+————-+

 

 

 

 

 

StackoverflowError when running ALS in Spark’s MLlib

If you ever encounter StackoverflowError when running ALS in Spark’s MLLib, the solution is to turn on checkpointing as follows

sc.setCheckpointDir(‘your_checkpointing_dir/’)

Check out the Jira ticket regarding the issue and pull request below

https://issues.apache.org/jira/browse/SPARK-1006

https://github.com/apache/spark/pull/5076

Spark Logistic Regression

Lets dive into the implementation of Logistic Regression in Spark. You can find the implementation in LogisticRegression.scala class. I am looking at the low level mllib library instead of the newer ml API.

/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

It defines the LogisticsRegressionModel which extends abstract class GeneralizedLinearModel (/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala) and ClassificationModel, Serializable, Saveable and PMMLExportable traits.

class LogisticRegressionModel @Since("1.3.0") (
    @Since("1.0.0") override val weights: Vector,
    @Since("1.0.0") override val intercept: Double,
    @Since("1.3.0") val numFeatures: Int,
    @Since("1.3.0") val numClasses: Int)
  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable
  with Saveable with PMMLExportable {

ClassificationModel defines various predict methods without concrete implementation

trait ClassificationModel extends Serializable {
  /**
   * Predict values for the given data set using the model trained.
   *
   * @param testData RDD representing data points to be predicted
   * @return an RDD[Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: RDD[Vector]): RDD[Double]

  /**
   * Predict values for a single data point using the model trained.
   *
   * @param testData array representing a single data point
   * @return predicted category from the trained model
   */
  @Since("1.0.0")
  def predict(testData: Vector): Double

  /**
   * Predict values for examples stored in a JavaRDD.
   * @param testData JavaRDD representing data points to be predicted
   * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
    predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
}

In LogisticRegressionModel’s predictPoint method, if the number of classes is 2 then we have binary classification, otherwise it will perform multinomial logistic regression classification.

 override protected def predictPoint(
      dataMatrix: Vector,
      weightMatrix: Vector,
      intercept: Double) = {
    require(dataMatrix.size == numFeatures)

    // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression.
    if (numClasses == 2) {
      val margin = dot(weightMatrix, dataMatrix) + intercept
      val score = 1.0 / (1.0 + math.exp(-margin))
      threshold match {
        case Some(t) => if (score > t) 1.0 else 0.0
        case None => score
      }
    } else {
      /**
       * Compute and find the one with maximum margins. If the maxMargin is negative, then the
       * prediction result will be the first class.
       *
       * PS, if you want to compute the probabilities for each outcome instead of the outcome
       * with maximum probability, remember to subtract the maxMargin from margins if maxMargin
       * is positive to prevent overflow.
       */
      var bestClass = 0
      var maxMargin = 0.0
      val withBias = dataMatrix.size + 1 == dataWithBiasSize
      (0 until numClasses - 1).foreach { i =>
        var margin = 0.0
        dataMatrix.foreachActive { (index, value) =>
          if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
        }
        // Intercept is required to be added into margin.
        if (withBias) {
          margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
        }
        if (margin > maxMargin) {
          maxMargin = margin
          bestClass = i + 1
        }
      }
      bestClass.toDouble
    }
  }

In binary classification, it uses the math below to calculate score value,

val margin = dot(weightMatrix, dataMatrix) + intercept
val score = 1.0 / (1.0 + math.exp(-margin))

if threshold is specified, then it checks calculated score against threshold and return class 1 if score is greater than threshold. Both dataMatrix and weightMatrix are of Vector type.

For binary logistic regression, it uses stochastic gradient descent with default L2 regularization to train the model. See the implementation of LogisticRegressionWithSGD below. It extends the abstract class GeneralizedLinearAlgorithm. GeneralizedLinearAlgorithm has an un-implemented empty method createModel as shown below.

protected def createModel(weights: Vector, intercept: Double): M

The abstract class has the base implementation of run(input: RDD[LabeledPoint], initialWeights: Vector): M method

which will call createModel method at the end.

Any extending class, eg. LogisticRegressionWithSGD provides the createModel implementation. It returns a trained classification model.

LogisticRegressionWithSGD contains the following train methods to build the model.
The train method takes RDD of LabeledPoint. LabeledPoint case class contains a double value (class label) and a Vector of features.

case class LabeledPoint @Since("1.0.0") (
    @Since("0.8.0") label: Double,
    @Since("1.0.0") features: Vector) {
  override def toString: String = {
    s"($label,$features)"
  }
}
@Since("0.8.0")
class LogisticRegressionWithSGD private[mllib] (
    private var stepSize: Double,
    private var numIterations: Int,
    private var regParam: Double,
    private var miniBatchFraction: Double)
  extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

  private val gradient = new LogisticGradient()
  private val updater = new SquaredL2Updater()
  @Since("0.8.0")
  override val optimizer = new GradientDescent(gradient, updater)
    .setStepSize(stepSize)
    .setNumIterations(numIterations)
    .setRegParam(regParam)
    .setMiniBatchFraction(miniBatchFraction)
  override protected val validators = List(DataValidators.binaryLabelValidator)

  /**
   * Construct a LogisticRegression object with default parameters: {stepSize: 1.0,
   * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.
   */
  @Since("0.8.0")
  def this() = this(1.0, 100, 0.01, 1.0)

  override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
    new LogisticRegressionModel(weights, intercept)
  }
}

/**
 * Top-level methods for calling Logistic Regression using Stochastic Gradient Descent.
 * NOTE: Labels used in Logistic Regression should be {0, 1}
 */
@Since("0.8.0")
object LogisticRegressionWithSGD {
  // NOTE(shivaram): We use multiple train methods instead of default arguments to support
  // Java programs.

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
   * gradient descent are initialized using the initial weights provided.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   * @param initialWeights Initial set of weights to be used. Array should be equal in size to
   *        the number of features in the data.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double,
      initialWeights: Vector): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input, initialWeights)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. We use the entire data
   * set to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param stepSize Step size to be used for each iteration of Gradient Descent.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double): LogisticRegressionModel = {
    train(input, numIterations, stepSize, 1.0)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using a step size of 1.0. We use the entire data set
   * to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int): LogisticRegressionModel = {
    train(input, numIterations, 1.0, 1.0)
  }
}

Logistic Regression maximum likelihood estimation

Here are some great materials for Logistic Regression with focus in the maximum likelihood estimation of the model

Slides from Learning From Data

An Introduction to Logistic Regression: From Basic Concepts to Interpretation with Particular Attention to Nursing Domain

Algorithms for maximum likelihood logistic regression