Deep Learning on AWS GPU Instance

Amazon has released a Deep Learning AMI and makes the process of running deep learning on GPU way easier than before. Before the availability of this AMI, I had to go through the painstaking process of installing all the required CUDA and cuDNN libraries and then spending lots of time in debugging just to get everything running. This AMI makes the on-boarding process much much easier and smoother. Great works indeed !

Follow the steps in the following blog to launch the instance.

Go to AWS Marketplace, search for deep learning AMI (ubuntu) and create an instance from the image

Screen Shot 2018-03-31 at 9.58.13 AM

Select the p3.2xlarge instance type. Make sure you check the hourly pricing for selected instance. It adds up quickly for GPU instance. For p3.2xlarge instance, it costs ~ $3.06/hour. See this EC2 pricing link.

Note: Remember to terminate your instance after use.

Screen Shot 2018-04-08 at 2.08.53 PM

Review and Launch your instance

Screen Shot 2018-04-08 at 2.11.38 PM

Screen Shot 2018-03-31 at 10.02.21 AM

SSH to your new instance

ssh -L localhost:8888:localhost:8888 -i ~/.ssh/my-key-pair.pem

Screen Shot 2018-04-08 at 3.08.48 PM

Activate TensorFlow + Keras 2 on Python 3 with CUDA 8 using the following command

source activate tensorflow_p36

Run some examples
cd ~/tutorials/TensorFlow/board

Screen Shot 2018-03-31 at 10.43.36 AM.png

Start tensorboard using the following command. You can then access the tensorboard UI at http://your-aws-instance-public-ip:6006
tensorboard --logdir=/tmp/tensorflow/mnist
Screen Shot 2018-04-08 at 3.22.42 PM

Screen Shot 2018-03-31 at 12.18.52 PM

Try out other examples

git clone

cd keras/examples


Screen Shot 2018-04-08 at 4.56.48 PM




Spark 2.3.0

Great news ! Spark 2.3.0 was released yesterday. This new release comes with many great features, checkout the release note

One of the many notable features is the Spark on Kubernetes support. It provides a new kubernetes scheduler backend that supports native submission of spark jobs to a kubernetes cluster.

For those who are interested in the details of implementation, you can check out the  Spark on Kubernetes Design Proposal

Here is the associated Jira ticket

Spark History Server

Spark application Web UI is short-lived, only available for the duration of the job. Once the application is done, you won’t be able to access it anymore. To allow continued access even after the application is completed is to utilize the Spark History Server. To do that, we need to specify the following two settings during Spark application submission

spark.eventLog.dir=/path/to/log/dir (default is /tmp/spark-events)

As you can see in the SparkContext class below, the two settings are retrieved from SparkConf

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
_eventLogDir = if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
      } else {

If the eventLogEnabled setting is true, SparkContext will then create EventLogSparkListener to be added to the LiveListenerBus’s event log queue.

_eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
      } else {

As EventLoggingListener gets notified of various SparkListenerEvents, it writes to a persistent log file.

The log file will have a .inprogress suffix as long as the application is still running. It will get renamed after the job is completed.

val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"

val workingPath = logPath + IN_PROGRESS

Here is the list of different SparkListenerEvents.

case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
  extends SparkListenerEvent

case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent

case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

case class SparkListenerTaskEnd(
    stageId: Int,
    stageAttemptId: Int,
    taskType: String,
    reason: TaskEndReason,
    taskInfo: TaskInfo,
    // may be null if the task has failed
    @Nullable taskMetrics: TaskMetrics)
  extends SparkListenerEvent

Here is the logEvent method used by listener to write the events to the log file.

/** Log the event as JSON. */
  private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
    val eventJson = JsonProtocol.sparkEventToJson(event)
    // scalastyle:off println
    // scalastyle:on println
    if (flushLogger) {
      hadoopDataStream.foreach(ds => ds.getWrappedStream match {
        case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
        case _ => ds.hflush()
    if (testing) {
      loggedEvents += eventJson

On start, the Spark History Server reads and replays the log data in the configured log directory, specified by spark.eventLog.dir setting in SparkConf. You can find the checkForLogs() method in FsHistoryProvider.scala. In this method, it first scans the log directory to find any unread logs and submits them to a thread pool (replayExecutor) to be replayed. The number of processing threads can be specified via spark.history.fs.numReplayThreads in Spark configuration. If the setting is not specified, the number of threads will be default to 1/4 of available processors on the Java virtual machine. FsHistoryProvider is the concrete implementation of abstract class ApplicationHistoryProvider. The main purpose of this class is to provide the log data for display on the Web UI. Spark History Server initializes this class dynamically via reflection during start up. It defaults to FsHistoryProvider if spark.history.provider is not set in the Spark configuration.

See the checkForLogs() method below.

   * Builds the application list based on the current contents of the log directory.
   * Tries to reuse as much of the data already in memory as possible, by not reading
   * applications that haven't been updated since last time the logs were checked.
  private[history] def checkForLogs(): Unit = {
    try {
      val newLastScanTime = getNewLastScanTime()
      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
      // scan for modified applications, replay and merge them
      val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
        .filter { entry =>
          !entry.isDirectory() &&
            // FsHistoryProvider generates a hidden file which can't be read.  Accidentally
            // reading a garbage file is safe, but we would log an error which can be scary to
            // the end-user.
            !entry.getPath().getName().startsWith(".") &&
            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
            recordedFileSize(entry.getPath()) < entry.getLen()         }         .sortWith { case (entry1, entry2) =>
          entry1.getModificationTime() > entry2.getModificationTime()

      if (logInfos.nonEmpty) {
        logDebug(s"New/updated attempts found: ${logInfos.size} ${}")

      var tasks = mutable.ListBuffer[Future[_]]()

      try {
        for (file <- logInfos) {           tasks += replayExecutor.submit(new Runnable {             override def run(): Unit = mergeApplicationListing(file)           })         }       } catch {         // let the iteration over logInfos break, since an exception on         // replayExecutor.submit (..) indicates the ExecutorService is unable         // to take any more submissions at this time         case e: Exception =>
          logError(s"Exception while submitting event log for replay", e)


      tasks.foreach { task =>
        try {
          // Wait for all tasks to finish. This makes sure that checkForLogs
          // is not scheduled again while some tasks are already running in
          // the replayExecutor.
        } catch {
          case e: InterruptedException =>
            throw e
          case e: Exception =>
            logError("Exception while merging application listings", e)
        } finally {

    } catch {
      case e: Exception => logError("Exception in checking for event log updates", e)


Get preferred locations for RDD’s partitions

When DAGScheduler submits a Stage for execution, it fetches the preferred locations (TaskLocations) to run tasks on partitions for a RDD from BlockManagerMaster which in turn reach out to  the driver’s RPC endpoint for the infos.  TaskLocation is either a host or (host, executorID) pair. The purpose is to achieve data locality when running a task. It is always preferred to run task on a host which has the required data block in memory.

DAGScheduler class has a getCacheLocs method. In this method, DAGScheduler will first check if the preferred locations (TaskLocation) has been fetched before by checking its internal cacheLocs hashMap. If the internal cacheLocs does not contain the information for the RDD, it will then call blockManagerMaster.getLocations to fetch the preferred locations for all the partitions within the RDD. Once fetched, task locations will be kept in the cacheLocs hashMap.

See DAGScheduler’s getCacheLocs method below.

  def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
    // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
    if (!cacheLocs.contains( {
      // Note: if the storage level is NONE, we don't need to get locations from block manager.
      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
      } else {
        val blockIds =
 => RDDBlockId(, index)).toArray[BlockId]
        blockManagerMaster.getLocations(blockIds).map { bms =>
 => TaskLocation(, bm.executorId))
      cacheLocs( = locs
   * Recursive implementation for getPreferredLocs.
   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
   * methods (getCacheLocs()); please be careful when modifying this method, because any new
   * DAGScheduler state accessed by it may require additional synchronization.
  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited)           if (locs != Nil) {             return locs           }         }       case _ =>


In BlockManagerMaster, getLocations method sends a RPC request to driver end point to get the preferredLocations.

  /** Get locations of multiple blockIds from the driver */
  def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {