Spark Dedup before Join

In Spark, as with any SQL left outer join, it will produce more rows than the total number of rows in the left table if the right table has duplicates.

You could first drop the duplicates on the right table before performing join as follows.

myDF.dropDuplicates(“myJoinkey”)

Or you could also do a groupBy and aggregate

Take a look at this dedup example

https://github.com/spirom/LearningSpark/blob/master/src/main/scala/dataframe/DropDuplicates.scala

Tuning Spark Jobs

Here are some useful resources on how to tune Spark job in terms of number of executors, executor memory and number of cores.

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

http://etlcode.com/index.php/blog/info/Bigdata/Apache-Spark-tuning-spark-jobs-Optimal-setting-for-executor-core-and-memory

https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory

https://aranair.github.io/posts/2017/03/10/tuning-my-apache-spark-cluster-on-aws-emr/

Analyzing Bike Share Data

In this series, I am going to use Spark to analyze the Bay Area’s Bike Share Data. You can download the dataset from http://www.bayareabikeshare.com/open-data

First let’s find out the top popular start terminals

trips.groupBy("Start Terminal", "Start Station").count().sort(desc("count")).show(false)

screen-shot-2016-12-04-at-9-47-09-am

San Francisco Caltrain (Townsend at 4th) and San Francisco Caltrain 2 (330 Townsend) are the two most popular bike stations. It shows that many Caltrain commuters are using these bikes to travel to their workplaces.

Lets figure out the day of week distribution of trips. As seen below, Thursday, Tuesday are the top two busiest days. It looks like people are most likely to show up at work on Thursday and Tuesday 🙂

On the other hand, Monday has the lowest number of trips among all weekdays. So if you want to have a good meeting attendance, you would probably schedule it on Tuesday or Thursday and try to avoid Monday 🙂

sqlContext.sql("select getDayAsString(day_of_week) as day, count(1) as count from (select `Start Terminal`, `Start Station`, getDayOfWeek(`Start Date`) as day_of_week from trips) as A group by day_of_week order by count desc")

screen-shot-2016-12-04-at-10-04-48-am

Next, lets take a look at the time of day distribution of the trips.

5 PM, 8 AM, 4 PM, 9 AM, 6 PM, 12 AM are the top 6 busiest hours. As expected, the bike usage peaks during morning and evening rush hours as people get to/off work. One interesting observation is the number of bike trips is also high during midnight, ranked sixth in the list.

screen-shot-2016-12-04-at-10-36-47-am

Hmm…I wonder which are the popular stations during morning rush hours from 8 am to 9 am.  As it turns out, San Francisco Caltrain, Temporary Transbay Terminal,  and San Francisco Caltrain 2 are the busiest bike stations during morning rush hours.

screen-shot-2016-12-04-at-11-07-46-am

Since midnight has the fifth highest number of bike trips, lets find out where the top originating bike stations are.

Harry Bridges Plaza (Ferry Building), Embarcadero at Sansome, Market at Sansome, Market at 4th, 2nd at Townsend are among the popular bike stations during midnight hour. See the below list. They are in close proximity to the city popular nightlife hangouts/hotspots.

screen-shot-2016-12-04-at-3-27-11-pm

Lets plot the hourly average bike availability for the top three start stations, San Francisco Caltrain (Townsend at 4th) terminal id 70, San Francisco Caltrain 2 (330 Townsend) terminal id 69, and Harry Bridges Plaza (Ferry Building) terminal id 50

They all share the same pattern, decreasing number of available bikes around morning rush hour, 8 am to 10 am.

screen-shot-2016-12-06-at-9-08-53-pm

Next, lets build a model to predict the bike availability.

To be continued…

 

Spark: Analyzing Stock Price

Simple moving average is an indicator many people use in analyzing stock price. Here I want to show how to use Spark’s window function to compute the moving average easily.

First, lets load the stock data

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

case class Stock(exc: String, symbol: String, stockDate: String, open: Double, high: Double, low: Double, close: Double,
                   volume: Double, adjClose: Double)

  val data = sc.textFile("s3://giantify-stocks/APC_2016_08_03.csv")
  val stocksData = data.map { d =>
    val tokens = d.split(",")
    Stock(tokens(0), tokens(1), tokens(2), tokens(3).toDouble, tokens(4).toDouble, tokens(5).toDouble, tokens(6).toDouble, tokens(7).toDouble,
      tokens(8).toDouble)
  }.toDF.cache()

  val stocks = stocksData.withColumn("stockDate", to_date(col("stockDate")))

Next we will compute the 20 days, 50 days, 100 days simple moving averages

val movingAverageWindow20 = Window.orderBy("stockDate").rowsBetween(-20, 0)
val movingAverageWindow50 = Window.orderBy("stockDate").rowsBetween(-50, 0)
val movingAverageWindow100 = Window.orderBy("stockDate").rowsBetween(-100, 0)

// Calculate the moving average
val stocksMA = stocks.withColumn( "MA20", avg(stocks("close")).over(movingAverageWindow20)).withColumn( "MA50", avg(stocks("close")).over(movingAverageWindow50)).withColumn("MA100", avg(stocks("close")).over(movingAverageWindow100))

stocksMA.show()

stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()

With the moving average calculated, let’s find when closing price exceeds the 50 days moving average

stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()
Screen Shot 2016-08-21 at 11.29.13 AM

Stay tuned for the next blog on how to use Zeppelin to visualize the price data

Useful Spark Code Snippets for Data Analytics

Here are some Spark code snippets you will find particularly useful when performing basic big data analytics

Read CSV

import com.databricks.spark.csv._
val data = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load(YOUR_INPUT_PATH)

Read AVRO

import com.databricks.spark.avro._
val data = sqlContext.read.avro(YOUR_INPUT_PATH)

Read JSON

val data = sqlContext.read.json(YOUR_INPUT_PATH)

Most often or not, you will probably perform some aggregations

val result = data.groupBy("company_branch").count().sort(desc("count"))
val result = data.groupBy("company_branch", "department").count().sort(asc("company_branch"),desc("count"))

You would want to save your results back to CSV file again

result.write.format("com.databricks.spark.csv").save(YOUR_OUTPUT_PATH)

If you want to consolidate all the result part files into one single file, you can use the coalesce(1) method

result.coalesce(1).write.format("com.databricks.spark.csv").save(YOUR_OUTPUT_PATH)

To perform projection/selection,

data.select(col("name"), col("age"), col("department_name").alias("dept"))

To perform filtering

data.filter("age > 18")

To use SQL, call the registerTempTable method on the dataframe

data.registerTempTable("data")
sqlContext.sql("select name, age from data")

Analyzing Uber Data

Thanks to FiveThirtyEight, we can now play with some New York’s Uber trip datasets (apr 2014 to sept 2014)

https://github.com/fivethirtyeight/uber-tlc-foil-response

I wrote a simple Spark job to analyze the data and create some visualizations using Zeppelin to tell the story from the numbers.

First, lets find out the day of week distribution of New York’s Uber trips. From the table below, we can see that Saturday has the most number of trips, totaled 537,091 followed by Sunday, with 534,856 trips whereas Wednesday has the least number of trips.

Note: dayOfWeek [Saturday: 7, Sunday: 1, Monday: 2, Tuesday:3, Wednesday:4, Thursday:5, Friday:6]

 

Screen Shot 2016-07-12 at 11.02.50 PM

Next, lets study the temporal pattern of Uber trips. With the data, I computed the hour distribution of Uber trips as shown by the bar chart below.

hourDistributionChart

Without no surprise, the number of Uber trips peaked around 5 pm during the evening rush hour period. As we see from the chart, the traffic began to increase from 2pm till 9 pm. A smaller peak was found around morning rush hour from 6 am to 8 am. Another interesting observation is that midnight 12 am also has a significant high number of Uber trips.

Most number of trips at peak hour 5 pm

hour17

Least number of trips at 2 am

hour2

 

HourDistribution-2

I also created the number of trips vs hour + day of week bar charts shown above to get a better understanding of hour distribution of trips for each of the day of week. Once again I used the following integers mapping to represent different day of week. Each color bar represents the number of Uber trips for the corresponding hour for the specific day of week.

Note: dayOfWeek [Saturday: 7, Sunday: 1, Monday: 2, Tuesday:3, Wednesday:4, Thursday:5, Friday:6]

As we can see from above, for group 2 which represents Monday, we have the least number of midnight trips, (dark blue bar, 0). Most people were going home earlier on Sunday night, resulted in the least number of mid night trips.

Next question we would like to answer is where are the hot spots with many Uber pickups. Using the NYC Open Data neighborhood shapefile for New York, we can group these pickups location by neighborhood and figure out these hotspots.

Identified popular hotspots

Manhanttan,3451299
Brooklyn, 595293
Queens,376066
Bronx,35522
Staten Island,1982

groupedByBoroOnly

These are the hot neighborhoods with the most Uber’s pickups

1)Manhattan, Midtown-Midtown South
2)Manhattan, Hudson Yards-Chelsea-Flatiron-Union Square
3)Manhattan, SoHo-TriBeCa-Civic Center-Little Italy
4)Manhattan, West Village
5)Manhattan, Turtle Bay-East Midtown
6)Queens, Airport
7)Manhattan, Upper East Side-Carnegie Hill
8)Manhattan, Battery Park City-Lower Manhattan

With the above insights, Uber drivers in New York can know the best days, hours, and locations to get customers.

If we analyze the trend of number of Uber’s pickups over time, from April 2014 to Sept 2014, it is obvious that Uber is getting more and more popular over time.

+——————–+—-+—–+——+
| boro|year|month| count|
+——————–+—-+—–+——+
|Bronx …|2014| 4| 3314|
|Bronx …|2014| 5| 3922|
|Bronx …|2014| 6| 4411|
|Bronx …|2014| 7| 6195|
|Bronx …|2014| 8| 8010|
|Bronx …|2014| 9| 9670|
|Brooklyn …|2014| 4| 61840|
|Brooklyn …|2014| 5| 73608|
|Brooklyn …|2014| 6| 77839|
|Brooklyn …|2014| 7|105489|
|Brooklyn …|2014| 8|129725|
|Brooklyn …|2014| 9|146792|
|Manhattan …|2014| 4|454311|
|Manhattan …|2014| 5|517599|
|Manhattan …|2014| 6|517848|
|Manhattan …|2014| 7|603413|
|Manhattan …|2014| 8|596033|
|Manhattan …|2014| 9|762095|
|Queens …|2014| 4| 37134|
|Queens …|2014| 5| 48948|
|Queens …|2014| 6| 53136|
|Queens …|2014| 7| 67360|
|Queens …|2014| 8| 78675|
|Queens …|2014| 9| 90813|
|Staten Island …|2014| 4| 234|
|Staten Island …|2014| 5| 288|
|Staten Island …|2014| 6| 246|
|Staten Island …|2014| 7| 340|
|Staten Island …|2014| 8| 413|
|Staten Island …|2014| 9| 461|
+——————–+—-+—–+——+

 

 

Spark Logistic Regression

Lets dive into the implementation of Logistic Regression in Spark. You can find the implementation in LogisticRegression.scala class. I am looking at the low level mllib library instead of the newer ml API.

/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

It defines the LogisticsRegressionModel which extends abstract class GeneralizedLinearModel (/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala) and ClassificationModel, Serializable, Saveable and PMMLExportable traits.

class LogisticRegressionModel @Since("1.3.0") (
    @Since("1.0.0") override val weights: Vector,
    @Since("1.0.0") override val intercept: Double,
    @Since("1.3.0") val numFeatures: Int,
    @Since("1.3.0") val numClasses: Int)
  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable
  with Saveable with PMMLExportable {

ClassificationModel defines various predict methods without concrete implementation

trait ClassificationModel extends Serializable {
  /**
   * Predict values for the given data set using the model trained.
   *
   * @param testData RDD representing data points to be predicted
   * @return an RDD[Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: RDD[Vector]): RDD[Double]

  /**
   * Predict values for a single data point using the model trained.
   *
   * @param testData array representing a single data point
   * @return predicted category from the trained model
   */
  @Since("1.0.0")
  def predict(testData: Vector): Double

  /**
   * Predict values for examples stored in a JavaRDD.
   * @param testData JavaRDD representing data points to be predicted
   * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction
   */
  @Since("1.0.0")
  def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
    predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
}

In LogisticRegressionModel’s predictPoint method, if the number of classes is 2 then we have binary classification, otherwise it will perform multinomial logistic regression classification.

 override protected def predictPoint(
      dataMatrix: Vector,
      weightMatrix: Vector,
      intercept: Double) = {
    require(dataMatrix.size == numFeatures)

    // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression.
    if (numClasses == 2) {
      val margin = dot(weightMatrix, dataMatrix) + intercept
      val score = 1.0 / (1.0 + math.exp(-margin))
      threshold match {
        case Some(t) => if (score > t) 1.0 else 0.0
        case None => score
      }
    } else {
      /**
       * Compute and find the one with maximum margins. If the maxMargin is negative, then the
       * prediction result will be the first class.
       *
       * PS, if you want to compute the probabilities for each outcome instead of the outcome
       * with maximum probability, remember to subtract the maxMargin from margins if maxMargin
       * is positive to prevent overflow.
       */
      var bestClass = 0
      var maxMargin = 0.0
      val withBias = dataMatrix.size + 1 == dataWithBiasSize
      (0 until numClasses - 1).foreach { i =>
        var margin = 0.0
        dataMatrix.foreachActive { (index, value) =>
          if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
        }
        // Intercept is required to be added into margin.
        if (withBias) {
          margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
        }
        if (margin > maxMargin) {
          maxMargin = margin
          bestClass = i + 1
        }
      }
      bestClass.toDouble
    }
  }

In binary classification, it uses the math below to calculate score value,

val margin = dot(weightMatrix, dataMatrix) + intercept
val score = 1.0 / (1.0 + math.exp(-margin))

if threshold is specified, then it checks calculated score against threshold and return class 1 if score is greater than threshold. Both dataMatrix and weightMatrix are of Vector type.

For binary logistic regression, it uses stochastic gradient descent with default L2 regularization to train the model. See the implementation of LogisticRegressionWithSGD below. It extends the abstract class GeneralizedLinearAlgorithm. GeneralizedLinearAlgorithm has an un-implemented empty method createModel as shown below.

protected def createModel(weights: Vector, intercept: Double): M

The abstract class has the base implementation of run(input: RDD[LabeledPoint], initialWeights: Vector): M method

which will call createModel method at the end.

Any extending class, eg. LogisticRegressionWithSGD provides the createModel implementation. It returns a trained classification model.

LogisticRegressionWithSGD contains the following train methods to build the model.
The train method takes RDD of LabeledPoint. LabeledPoint case class contains a double value (class label) and a Vector of features.

case class LabeledPoint @Since("1.0.0") (
    @Since("0.8.0") label: Double,
    @Since("1.0.0") features: Vector) {
  override def toString: String = {
    s"($label,$features)"
  }
}
@Since("0.8.0")
class LogisticRegressionWithSGD private[mllib] (
    private var stepSize: Double,
    private var numIterations: Int,
    private var regParam: Double,
    private var miniBatchFraction: Double)
  extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

  private val gradient = new LogisticGradient()
  private val updater = new SquaredL2Updater()
  @Since("0.8.0")
  override val optimizer = new GradientDescent(gradient, updater)
    .setStepSize(stepSize)
    .setNumIterations(numIterations)
    .setRegParam(regParam)
    .setMiniBatchFraction(miniBatchFraction)
  override protected val validators = List(DataValidators.binaryLabelValidator)

  /**
   * Construct a LogisticRegression object with default parameters: {stepSize: 1.0,
   * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.
   */
  @Since("0.8.0")
  def this() = this(1.0, 100, 0.01, 1.0)

  override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
    new LogisticRegressionModel(weights, intercept)
  }
}

/**
 * Top-level methods for calling Logistic Regression using Stochastic Gradient Descent.
 * NOTE: Labels used in Logistic Regression should be {0, 1}
 */
@Since("0.8.0")
object LogisticRegressionWithSGD {
  // NOTE(shivaram): We use multiple train methods instead of default arguments to support
  // Java programs.

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
   * gradient descent are initialized using the initial weights provided.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   * @param initialWeights Initial set of weights to be used. Array should be equal in size to
   *        the number of features in the data.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double,
      initialWeights: Vector): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input, initialWeights)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. Each iteration uses
   * `miniBatchFraction` fraction of the data to calculate the gradient.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @param stepSize Step size to be used for each iteration of gradient descent.
   * @param miniBatchFraction Fraction of data to be used per iteration.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,
      miniBatchFraction: Double): LogisticRegressionModel = {
    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
      .run(input)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using the specified step size. We use the entire data
   * set to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param stepSize Step size to be used for each iteration of Gradient Descent.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double): LogisticRegressionModel = {
    train(input, numIterations, stepSize, 1.0)
  }

  /**
   * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
   * number of iterations of gradient descent using a step size of 1.0. We use the entire data set
   * to update the gradient in each iteration.
   * NOTE: Labels used in Logistic Regression should be {0, 1}
   *
   * @param input RDD of (label, array of features) pairs.
   * @param numIterations Number of iterations of gradient descent to run.
   * @return a LogisticRegressionModel which has the weights and offset from training.
   */
  @Since("1.0.0")
  def train(
      input: RDD[LabeledPoint],
      numIterations: Int): LogisticRegressionModel = {
    train(input, numIterations, 1.0, 1.0)
  }
}

Zero copy technique in Hadoop and Spark

What exactly are zero copy techniques and how these techniques can be employed to achieve better performance in distributed system ?

If you browse the Hadoop MapReduce and Spark JIRA tickets, you will find a number of tickets related to the use of zero copy techniques such as MMap memory mapped files and sendFile() to improve the system.

Zero copy techniques are these techniques used to eliminate unnecessary data copy and context switches across application and kernel space. Please refer to the following excellent post for an in depth explanation of these techniques.

http://www.ibm.com/developerworks/library/j-zerocopy/

Traditionally, if a server wants to send data over the network to a client, it needs to read the data from the disk into kernel memory before storing it in the user memory. Then it transfers the data again from the user memory space to kernel buffer associated with the network stack before sending to the network interface card. See Figure 1 (taken from the above paper)

 Screen shot 2014-08-17 at 4.16.04 PM

A popular zero copy technique is called sendFile() or transferTo. Please see the following figure. (taken from the same paper).

Screen shot 2014-08-17 at 4.19.25 PM

As you see in Hadoop, it has already reverted to use zero copy transferTo way back in version 0.18.

https://issues.apache.org/jira/browse/HADOOP-3164

Also, it uses the same technique in sending shuffle files. Please see

https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

In Spark, there is also plan to use the same technique in sending shuffle files, targeted for upcoming version 1.2.0.

Spark-2468 : introduce the same transferTo technique in sending shuffle files.

https://issues.apache.org/jira/browse/SPARK-2468

Shuffling in Spark vs Hadoop MapReduce

In the last few posts about Hadoop MapTask spill mechanism, we learn that Hadoop uses an in memory buffer during the map task intermediate output writing phase. As the memory soft limit exceeded, it starts spilling the data to disk. This results in multiple spill files that are eventually merged together at the end of the map task into a big file. During the spilling process, the data in the memory buffer are first sorted by the partitions (each partition corresponds to one Reduce task)  and then by the keys.

As of Spark  1.01 version, Spark Map Tasks write the output directly to disk on completion. There is no use of an in memory buffer. Each Map Task writes as many shuffle files as the number of Reduce Task. One shuffle file per Reduce Task. Eg. one could have 1000 Map Tasks  (M) and 5000 Reduce Tasks (R), this results in 5 millions shuffle files. Spark does not however merge them into a single partitioned shuffle file as in Hadoop MapReduce. This number of file IO somehow affects the performance.

You can learn more about Spark shuffling from the following report.

http://www.cs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf

In the above report, it also points out the potential memory issue when using compression on the Map output files.

eg. In a machine with C number of cores allocated to Spark, we have C concurrent Map Tasks, each Map Task is writing out R shuffle files, one per Reduce Task. The total number of memory usage would be C*R*400KB.

Here comes the good news. A new shuffling mechanism, called sort-based shuffling is implemented for upcoming Spark version 1.1.0. You can read the design document below and learn more about this issue in SPARK-2045 JIRA ticket.

https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf

This sort-based shuffling is quite similar but not the same as Hadoop MapReduce shuffling.

The following is the description of SPARK-2045:

“…a sort-based shuffle implementation that takes advantage of an Ordering for keys (or just sorts by hashcode for keys that don’t have it) would likely improve performance and memory usage in very large shuffles. Our current hash-based shuffle needs an open file for each reduce task, which can fill up a lot of memory for compression buffers and cause inefficient IO. This would avoid both of those issues.”

RDD in Spark

I have recently digged into Spark trying to understand its internals. I found some excellent interesting papers especially on RDD (Resilient Distributed Dataset). For example:

https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing)

http://www.cs.berkeley.edu/~matei/talks/2012/nsdi_rdds.pdf (Presentation)

For some of us who are more familiar with Hadoop disk based distributed system might also want to read up on distributed shared memory (DSM) to gain some basic understandings of various related concepts.

http://www.cdf.toronto.edu/~csc469h/fall/handouts/nitzberg91.pdf (Distributed Shared Memory: A survey of issues and algorithms) This is a good overview of DSM. It talks about memory coherence and coherence protocol.

http://hal.archives-ouvertes.fr/docs/00/07/41/93/PDF/RR-2481.pdf (A Recoverable Distributed Shared Memory Integrating Coherence and Recoverability)

RDD is different from DSM in the following aspects as pointed out in the above paper. See the table below. RDD is a restricted form of distributed shared memory. It is immutable, partitioned collections of records. It maintains lineage information (a series of transformations) for efficient fault recovery.

Table 1 taken from the 1st paper above

 

Screen Shot 2014-07-31 at 12.47.03 AM

 

 

To be continued…..