InstaCart: Product Recommender

InstaCart has recently open sourced anonymized data on customer orders for its Kaggle competition. You can find out more info from the link below.

https://www.kaggle.com/c/instacart-market-basket-analysis

Previously, I did some data exploration to discover some interesting insights. See my previous blog post.

Next I would like to use their anonymized data to build a product recommender system. There are many approaches/strategies in product recommendations, eg. Most popular items, Also bought/viewed, Featured items, and etc. I am going to explore the most popular items approach and collaborative filtering (also bought) approach.

We can identify the top 5 popular items for each department and utilize them for most popular items recommendation. For example, the top 5 items for frozen department are Blueberries, Organic Broccoli Florets, Organic Whole Strawberries, Pipeapple Chunks, Frozen Organic Wild Blueberries.

Screen Shot 2017-07-10 at 10.51.55 PM

Next, I utilized Spark ALS algorithm to  build a collaborative filtering based recommender system.

Some stats about the data

Total user-product pair (rating): 1,384,617

Total users: 131,209

Total products: 39,123

Given the data, I splitted the data into train set (0.8) vs test set (0.2) randomly. This resulted in
Number of train elements: 1,107,353
Number of test elements: 277,264

Here are the parameters I used in ALS, rank = 10, lambda = 1, number of iterations = 10, 50, 60, 80

  • Number of iterations = 10,    RMSE = 0.9999895
  • Number of iterations = 50,    RMSE = 0.9999828
  • Number of iterations = 60,    RMSE = 0.9999875
  • Number of iterations = 80,    RMSE = 0.9999933

We can also try out different ranks in grid search.

Here are some recommendations suggested by ALS collaborative filtering algorithm (number of iterations =80, rank=10)

Given user 124383 Transaction History

+——-+———-+—–+———-+—————————+——–+————-+
|user_id|product_id|count|product_id|product_name |aisle_id|department_id|
+——-+———-+—–+———-+—————————+——–+————-+
|124383 |49478 |1 |49478 |Frozen Organic Strawberries|24 |4 |
|124383 |21903 |1 |21903 |Organic Baby Spinach |123 |4 |
|124383 |19508 |1 |19508 |Corn Tortillas |128 |3 |
|124383 |20114 |1 |20114 |Jalapeno Peppers |83 |4 |
|124383 |44142 |1 |44142 |Red Onion |83 |4 |
|124383 |20345 |1 |20345 |Thin Crust Pepperoni Pizza |79 |1 |
|124383 |27966 |1 |27966 |Organic Raspberries |123 |4 |
+——-+———-+—–+———-+—————————+——–+——————+

Here are the recommendations
+———-+————————————————————-+——–+————-+
|product_id|product_name |aisle_id|department_id|
+———-+————————————————————-+——–+——————+
|28717 |Sport Deluxe Adjustable Black Ankle Stabilizer |133 |11 |
|15372 |Meditating Cedarwood Mineral Bath |25 |11 |
|18962 |Arroz Calasparra Paella Rice |63 |9 |
|2528 |Cluckin’ Good Stew |40 |8 |
|21156 |Dreamy Cold Brew Concentrate |90 |7 |
|12841 |King Crab Legs |39 |12 |
|24862 |Old Indian Wild Cherry Bark Syrup |47 |11 |
|37535 |Voluminous Extra-Volume Collagen Mascara – Blackest Black 680|132 |11 |
|30847 |Wild Oregano Oil |47 |11 |
+———-+————————————————————-+——–+————-+

 

 

 

 

 

Instacart Data Insight

InstaCart has recently organized a Kaggle competition and published anonymized data on customer orders over time to predict which previously purchased products will be in a user’s next order. You can check out the link below.

https://www.kaggle.com/c/instacart-market-basket-analysis

First, I used Spark to analyze the data to uncover hidden insights.

We were given the departments data

Screen Shot 2017-07-05 at 10.04.34 PM

Products data

Screen Shot 2017-07-05 at 10.07.32 PM

Orders data

Screen Shot 2017-07-05 at 10.07.47 PM

Lets find out which is the busiest day of the week. As it turns out, day 0 is the busiest day, with 600,905 orders followed by day 1 with 587478 orders. I would assume day 0 is Sunday and day 1 is Monday.

Screen Shot 2017-07-05 at 10.15.12 PM.png

Next, lets figure out which is the busiest hour in a day. It turns out 10 am is the busiest hour with 288,418 orders, followed by 11 am with 284,728 orders.

As you can see, Instacart customers like to shop from 11 am to 5 pm. It would be interesting to see the day of week + hour of day breakdown too.

Screen Shot 2017-07-05 at 10.18.24 PM

Here is the breakdown of popular hours for each day. 10 am dominates the top spot.

Screen Shot 2017-07-06 at 12.59.15 AM

Next lets figure out the top ten popular items among Instacart customers. Surprisingly, banana is the most popular item.

Screen Shot 2017-07-05 at 10.09.13 PM.png

Lets find out the top item for each department. We can see that Blueberries is the most popular item for frozen department and Extra Virgin Olive Oil is the most popular item for pantry department. Some unexpected surprises are dried mango, cotton swabs, and honey nut cheerios.

Screen Shot 2017-07-10 at 10.18.57 PM

We are also interested in knowing the reorder interval, how many days since the prior order before making the next order.

Screen Shot 2017-07-05 at 11.47.04 PM.png

As we discovered, 30 days is the most frequent reorder interval. It looks like most of customers reorder once a month. On the other hand, 320,608 orders were placed after 7 days since the prior order. We can concluded that majority of customers reorder after 1 month or 1 week since the prior order.

Stay tuned for next blog on my study results at the individual user level.

Running Spark Job in AWS Data Pipeline

If you want to run Spark job in AWS data pipeline, add an EmrActivity and use command-runner.jar to submit the spark job.

In the Step field box of the EmrActivity node, enter the command as follows

command-runner.jar,spark-submit,--master,yarn-cluster,--deploy-mode,cluster,--class,com.yourcompany.yourpackage.YourClass,s3://PATH_TO_YOUR_JAR,YOUR_PROGRAM_ARGUMENT_1,YOUR_PROGRAM_ARGUMENT_2,YOUR_PROGRAM_ARGUMENT_3

Some useful resources
http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-submit-step.html
http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html

BLAS in MLlib

I was looking into BLAS.scala routines for MLlib’s vectors and matrices. It looks like Spark uses the F2jblas for level 1 routines.

https://github.com/apache/spark/blob/master/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala

// For level-1 routines, we use Java implementation.
private def f2jBLAS: NetlibBLAS = {
  if (_f2jBLAS == null) {
    _f2jBLAS = new F2jBLAS
  }
  _f2jBLAS
}

Here are some great resources about different BLAS implementations
http://www.spark.tc/blas-libraries-in-mllib/

https://blog.cloudera.com/blog/2017/02/accelerating-apache-spark-mllib-with-intel-math-kernel-library-intel-mkl/

BLAS routines (Level 1,2,3)
http://www.netlib.org/blas/#_blas_routines

StackoverflowError when running ALS in Spark’s MLlib

If you ever encounter StackoverflowError when running ALS in Spark’s MLLib, the solution is to turn on checkpointing as follows

sc.setCheckpointDir(‘your_checkpointing_dir/’)

Check out the Jira ticket regarding the issue and pull request below

https://issues.apache.org/jira/browse/SPARK-1006

https://github.com/apache/spark/pull/5076

Spark Dedup before Join

In Spark, as with any SQL left outer join, it will produce more rows than the total number of rows in the left table if the right table has duplicates.

You could first drop the duplicates on the right table before performing join as follows.

myDF.dropDuplicates(“myJoinkey”)

Or you could also do a groupBy and aggregate

Take a look at this dedup example

https://github.com/spirom/LearningSpark/blob/master/src/main/scala/dataframe/DropDuplicates.scala

Tuning Spark Jobs

Here are some useful resources on how to tune Spark job in terms of number of executors, executor memory and number of cores.

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

http://etlcode.com/index.php/blog/info/Bigdata/Apache-Spark-tuning-spark-jobs-Optimal-setting-for-executor-core-and-memory

https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory

https://aranair.github.io/posts/2017/03/10/tuning-my-apache-spark-cluster-on-aws-emr/

Spark Logistic Regression

Lets dive into the implementation of Logistic Regression in Spark. You can find the implementation in LogisticRegression.scala class. I am looking at the low level mllib library instead of the newer ml API.

/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

It defines the LogisticsRegressionModel which extends abstract class GeneralizedLinearModel (/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala) and ClassificationModel, Serializable, Saveable and PMMLExportable traits.

class LogisticRegressionModel @Since("1.3.0") (
    @Since("1.0.0") override val weights: Vector,
    @Since("1.0.0") override val intercept: Double,
    @Since("1.3.0") val numFeatures: Int,
    @Since("1.3.0") val numClasses: Int)
  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable
  with Saveable with PMMLExportable {

ClassificationModel defines various predict methods without concrete implementation

trait ClassificationModel extends Serializable {
  /**
   * Predict values for the given data set using the model trained.
   *
   * @param testData RDD representing data points to be predicted
   * @return an RDD[Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: RDD[Vector]): RDD[Double]

  /**
   * Predict values for a single data point using the model trained.
   *
   * @param testData array representing a single data point
   * @return predicted category from the trained model
   */
  @Since("1.0.0")
  def predict(testData: Vector): Double

  /**
   * Predict values for examples stored in a JavaRDD.
   * @param testData JavaRDD representing data points to be predicted
   * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
    predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
}

In LogisticRegressionModel’s predictPoint method, if the number of classes is 2 then we have binary classification, otherwise it will perform multinomial logistic regression classification.

 override protected def predictPoint(
      dataMatrix: Vector,
      weightMatrix: Vector,
      intercept: Double) = {
    require(dataMatrix.size == numFeatures)

    // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression.
    if (numClasses == 2) {
      val margin = dot(weightMatrix, dataMatrix) + intercept
      val score = 1.0 / (1.0 + math.exp(-margin))
      threshold match {
        case Some(t) => if (score > t) 1.0 else 0.0
        case None => score
      }
    } else {
      /**
       * Compute and find the one with maximum margins. If the maxMargin is negative, then the
       * prediction result will be the first class.
       *
       * PS, if you want to compute the probabilities for each outcome instead of the outcome
       * with maximum probability, remember to subtract the maxMargin from margins if maxMargin
       * is positive to prevent overflow.
       */
      var bestClass = 0
      var maxMargin = 0.0
      val withBias = dataMatrix.size + 1 == dataWithBiasSize
      (0 until numClasses - 1).foreach { i =>
        var margin = 0.0
        dataMatrix.foreachActive { (index, value) =>
          if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
        }
        // Intercept is required to be added into margin.
        if (withBias) {
          margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
        }
        if (margin > maxMargin) {
          maxMargin = margin
          bestClass = i + 1
        }
      }
      bestClass.toDouble
    }
  }

In binary classification, it uses the math below to calculate score value,

val margin = dot(weightMatrix, dataMatrix) + intercept
val score = 1.0 / (1.0 + math.exp(-margin))

if threshold is specified, then it checks calculated score against threshold and return class 1 if score is greater than threshold. Both dataMatrix and weightMatrix are of Vector type.

For binary logistic regression, it uses stochastic gradient descent with default L2 regularization to train the model. See the implementation of LogisticRegressionWithSGD below. It extends the abstract class GeneralizedLinearAlgorithm. GeneralizedLinearAlgorithm has an un-implemented empty method createModel as shown below.

protected def createModel(weights: Vector, intercept: Double): M

The abstract class has the base implementation of run(input: RDD[LabeledPoint], initialWeights: Vector): M method

which will call createModel method at the end.

Any extending class, eg. LogisticRegressionWithSGD provides the createModel implementation. It returns a trained classification model.

LogisticRegressionWithSGD contains the following train methods to build the model.
The train method takes RDD of LabeledPoint. LabeledPoint case class contains a double value (class label) and a Vector of features.

case class LabeledPoint @Since("1.0.0") (
    @Since("0.8.0") label: Double,
    @Since("1.0.0") features: Vector) {
  override def toString: String = {
    s"($label,$features)"
  }
}
@Since("0.8.0")
class LogisticRegressionWithSGD private[mllib] (
    private var stepSize: Double,
    private var numIterations: Int,
    private var regParam: Double,
    private var miniBatchFraction: Double)
  extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

  private val gradient = new LogisticGradient()
  private val updater = new SquaredL2Updater()
  @Since("0.8.0")
  override val optimizer = new GradientDescent(gradient, updater)
    .setStepSize(stepSize)
    .setNumIterations(numIterations)
    .setRegParam(regParam)
    .setMiniBatchFraction(miniBatchFraction)
  override protected val validators = List(DataValidators.binaryLabelValidator)

  /**
   * Construct a LogisticRegression object with default parameters: {stepSize: 1.0,
   * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.
   */
  @Since("0.8.0")
  def this() = this(1.0, 100, 0.01, 1.0)

  override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
    new LogisticRegressionModel(weights, intercept)
  }
}

/**
 * Top-level methods for calling Logistic Regression using Stochastic Gradient Descent.
 * NOTE: Labels used in Logistic Regression should be {0, 1}
 */
@Since("0.8.0")
object LogisticRegressionWithSGD {
  // NOTE(shivaram): We use multiple train methods instead of default arguments to support
  // Java programs.

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
   * gradient descent are initialized using the initial weights provided.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   * @param initialWeights Initial set of weights to be used. Array should be equal in size to
   *        the number of features in the data.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double,
      initialWeights: Vector): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input, initialWeights)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. We use the entire data
   * set to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param stepSize Step size to be used for each iteration of Gradient Descent.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double): LogisticRegressionModel = {
    train(input, numIterations, stepSize, 1.0)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using a step size of 1.0. We use the entire data set
   * to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int): LogisticRegressionModel = {
    train(input, numIterations, 1.0, 1.0)
  }
}

Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

RDD in Spark

I have recently digged into Spark trying to understand its internals. I found some excellent interesting papers especially on RDD (Resilient Distributed Dataset). For example:

https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing)

http://www.cs.berkeley.edu/~matei/talks/2012/nsdi_rdds.pdf (Presentation)

For some of us who are more familiar with Hadoop disk based distributed system might also want to read up on distributed shared memory (DSM) to gain some basic understandings of various related concepts.

http://www.cdf.toronto.edu/~csc469h/fall/handouts/nitzberg91.pdf (Distributed Shared Memory: A survey of issues and algorithms) This is a good overview of DSM. It talks about memory coherence and coherence protocol.

http://hal.archives-ouvertes.fr/docs/00/07/41/93/PDF/RR-2481.pdf (A Recoverable Distributed Shared Memory Integrating Coherence and Recoverability)

RDD is different from DSM in the following aspects as pointed out in the above paper. See the table below. RDD is a restricted form of distributed shared memory. It is immutable, partitioned collections of records. It maintains lineage information (a series of transformations) for efficient fault recovery.

Table 1 taken from the 1st paper above

 

Screen Shot 2014-07-31 at 12.47.03 AM

 

 

To be continued…..