RDD in Spark

I have recently digged into Spark trying to understand its internals. I found some excellent interesting papers especially on RDD (Resilient Distributed Dataset). For example:

https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing)

http://www.cs.berkeley.edu/~matei/talks/2012/nsdi_rdds.pdf (Presentation)

For some of us who are more familiar with Hadoop disk based distributed system might also want to read up on distributed shared memory (DSM) to gain some basic understandings of various related concepts.

http://www.cdf.toronto.edu/~csc469h/fall/handouts/nitzberg91.pdf (Distributed Shared Memory: A survey of issues and algorithms) This is a good overview of DSM. It talks about memory coherence and coherence protocol.

http://hal.archives-ouvertes.fr/docs/00/07/41/93/PDF/RR-2481.pdf (A Recoverable Distributed Shared Memory Integrating Coherence and Recoverability)

RDD is different from DSM in the following aspects as pointed out in the above paper. See the table below. RDD is a restricted form of distributed shared memory. It is immutable, partitioned collections of records. It maintains lineage information (a series of transformations) for efficient fault recovery.

Table 1 taken from the 1st paper above


Screen Shot 2014-07-31 at 12.47.03 AM



To be continued…..

HyperLogLog vs LogLog

To understand HyperLogLog (HLL), one should read the LogLog and HyperLogLog papers.

http://algo.inria.fr/flajolet/Publications/DuFl03-LNCS.pdf  (LogLog)

http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf  (HyperLogLog)

The major difference between the two is one uses regular mean and the other (HLL) uses harmonic mean to average the estimate cardinality calculated by different m buckets. I would view these buckets as different experiments.

The accuracy (standard error) is controlled by the number of buckets, m. The more buckets the better is the estimation. But, HLL has better accuracy compared to LogLog given the same number of buckets.

The standard error for HLL is



The standard error for LogLog is

CodeCogsEqn (1)

The following is the LogLog algorithm taken from the paper

Screen shot 2014-07-29 at 8.07.49 PM

Basically, there are m buckets, each one is keeping track of the run of consecutive zeros in the bit string. We then figure out the max run of consecutive zeros (position of leftmost 1) in all the buckets.

eg.  1  => 1,   001 =>3

Then using the cardinality estimate formula shown in the last line, calculate the estimate unique counts. As you see from the formula, it involves taking the average of the run of zeros observed in these buckets. (Note: the alpha is the bias correction applied to the cardinality estimate)

For HyperLogLog, instead of using the average, harmonic mean (stochastic averaging) is used. Please refer to the HLL paper for more detailed description of the formula.

Taken from the HLL paper,

Screen shot 2014-07-29 at 8.24.19 PM

Spark on YARN

I tried out Spark on YARN recently. It is very straight forward. If you are planning to use MLlib, make sure you have gcc-gfortran installed on every cluster node.

1) First, you have to download and install  spark-1.0.0-bin-hadoop2


2) Set the YARN_CONF_DIR to point to the location of your hadoop configuration files.


export YARN_CONF_DIR=/etc/hadoop/conf

You can now start spark shell as follows

cd spark-1.0.0-bin-hadoop2

./bin/spark-shell —master yarn-client –driver-java-options “-Dspark.executor.memory=2g”

HyperLogLog Aproximate Distinct Counting

In MapReduce, distinct/unique count of large data set is very common but unfortunately it is not scalable because it requires one reducer. It gets worse when you have to perform unique count across different aggregation/segment groups. Again, unique counting across different aggregation granularities whether in terms of time dimension or in combination with other demographic attributes is a common practice in big data analytics. Eg. In your massive data sets, let’s find unique counts of users for every hour, every day, every week, every month and etc.

HyperLogLog approximate unique counting can be used to provide a scalable solution. Keep in mind, the unique count here is a probabilistic approximate counts.You can implement either a map side hyperloglog counting or reduce side hyperloglog counting. However, map side counting requires more memory and beware of out of memory issue if you have many segments to perform unique count on.

For more information, you can check out the following pages