Running Spark Job in AWS Data Pipeline

If you want to run Spark job in AWS data pipeline, add an EmrActivity and use command-runner.jar to submit the spark job.

In the Step field box of the EmrActivity node, enter the command as follows

command-runner.jar,spark-submit,--master,yarn-cluster,--deploy-mode,cluster,--class,com.yourcompany.yourpackage.YourClass,s3://PATH_TO_YOUR_JAR,YOUR_PROGRAM_ARGUMENT_1,YOUR_PROGRAM_ARGUMENT_2,YOUR_PROGRAM_ARGUMENT_3

Some useful resources
http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-submit-step.html
http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html

BLAS in MLlib

I was looking into BLAS.scala routines for MLlib’s vectors and matrices. It looks like Spark uses the F2jblas for level 1 routines.

https://github.com/apache/spark/blob/master/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala

// For level-1 routines, we use Java implementation.
private def f2jBLAS: NetlibBLAS = {
  if (_f2jBLAS == null) {
    _f2jBLAS = new F2jBLAS
  }
  _f2jBLAS
}

Here are some great resources about different BLAS implementations
http://www.spark.tc/blas-libraries-in-mllib/

https://blog.cloudera.com/blog/2017/02/accelerating-apache-spark-mllib-with-intel-math-kernel-library-intel-mkl/

BLAS routines (Level 1,2,3)
http://www.netlib.org/blas/#_blas_routines

Render Json using Jackson in Scala

If you use Jackson Json library in Scala, remember to register the DefaultScalaModule so that ObjectMapper can convert List, Array to Json correctly. See below.

 
val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)

Simple example:

 
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.{JsonProperty, PropertyAccessor}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule



object JsonExample {
  case class Car(@JsonProperty("id")  id: Long)
  case class Person(@JsonProperty("name") name: String = null,
                    @JsonProperty("cars") cars: Seq[Car] = null)

  def main(args:Array[String]):Unit = {
    val car1 = Car(12345)
    val car2 = Car(12346)
    val carsOwned = List(car1, car2)
    var person = Person(name="wei", cars=carsOwned)

    val objectMapper = new ObjectMapper()
    objectMapper.registerModule(DefaultScalaModule)
    objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.NONE)
    objectMapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY)
    println(s"person: ${objectMapper.writeValueAsString(person)}")
  }
}

Output:
person: {“name”:”wei”,”cars”:[{“id”:12345},{“id”:12346}]}

StackoverflowError when running ALS in Spark’s MLlib

If you ever encounter StackoverflowError when running ALS in Spark’s MLLib, the solution is to turn on checkpointing as follows

sc.setCheckpointDir(‘your_checkpointing_dir/’)

Check out the Jira ticket regarding the issue and pull request below

https://issues.apache.org/jira/browse/SPARK-1006

https://github.com/apache/spark/pull/5076

Spark Dedup before Join

In Spark, as with any SQL left outer join, it will produce more rows than the total number of rows in the left table if the right table has duplicates.

You could first drop the duplicates on the right table before performing join as follows.

myDF.dropDuplicates(“myJoinkey”)

Or you could also do a groupBy and aggregate

Take a look at this dedup example

https://github.com/spirom/LearningSpark/blob/master/src/main/scala/dataframe/DropDuplicates.scala

Tuning Spark Jobs

Here are some useful resources on how to tune Spark job in terms of number of executors, executor memory and number of cores.

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

http://etlcode.com/index.php/blog/info/Bigdata/Apache-Spark-tuning-spark-jobs-Optimal-setting-for-executor-core-and-memory

https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory

https://aranair.github.io/posts/2017/03/10/tuning-my-apache-spark-cluster-on-aws-emr/