Spark 2.3.0

Great news ! Spark 2.3.0 was released yesterday. This new release comes with many great features, checkout the release note

One of the many notable features is the Spark on Kubernetes support. It provides a new kubernetes scheduler backend that supports native submission of spark jobs to a kubernetes cluster.

For those who are interested in the details of implementation, you can check out the  Spark on Kubernetes Design Proposal

Here is the associated Jira ticket https://issues.apache.org/jira/browse/SPARK-18278

Advertisements

Spark History Server

Spark application Web UI is short-lived, only available for the duration of the job. Once the application is done, you won’t be able to access it anymore. To allow continued access even after the application is completed is to utilize the Spark History Server. To do that, we need to specify the following two settings during Spark application submission

spark.eventLog.enabled=true
spark.eventLog.dir=/path/to/log/dir (default is /tmp/spark-events)

As you can see in the SparkContext class below, the two settings are retrieved from SparkConf

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
_eventLogDir = if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

If the eventLogEnabled setting is true, SparkContext will then create EventLogSparkListener to be added to the LiveListenerBus’s event log queue.

_eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addToEventLogQueue(logger)
        Some(logger)
      } else {
        None
      }

As EventLoggingListener gets notified of various SparkListenerEvents, it writes to a persistent log file.

The log file will have a .inprogress suffix as long as the application is still running. It will get renamed after the job is completed.

val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"

val workingPath = logPath + IN_PROGRESS

Here is the list of different SparkListenerEvents.
SparkListenerStageSubmitted
SparkListenerStageCompleted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerSpeculativeTaskSubmitted
SparkListenerTaskEnd
SparkListenerJobStart
SparkListenerJobEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
SparkListenerExecutorBlacklisted
SparkListenerExecutorUnblacklisted
SparkListenerNodeBlacklisted
SparkListenerNodeUnblacklisted
SparkListenerBlockUpdated
SparkListenerExecutorMetricsUpdate
SparkListenerApplicationStart
SparkListenerApplicationEnd
SparkListenerLogStart

@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
    stageId: Int,
    stageAttemptId: Int,
    taskType: String,
    reason: TaskEndReason,
    taskInfo: TaskInfo,
    // may be null if the task has failed
    @Nullable taskMetrics: TaskMetrics)
  extends SparkListenerEvent

Here is the logEvent method used by listener to write the events to the log file.

/** Log the event as JSON. */
  private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
    val eventJson = JsonProtocol.sparkEventToJson(event)
    // scalastyle:off println
    writer.foreach(_.println(compact(render(eventJson))))
    // scalastyle:on println
    if (flushLogger) {
      writer.foreach(_.flush())
      hadoopDataStream.foreach(ds => ds.getWrappedStream match {
        case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
        case _ => ds.hflush()
      })
    }
    if (testing) {
      loggedEvents += eventJson
    }
  }

On start, the Spark History Server reads and replays the log data in the configured log directory, specified by spark.eventLog.dir setting in SparkConf. You can find the checkForLogs() method in FsHistoryProvider.scala. In this method, it first scans the log directory to find any unread logs and submits them to a thread pool (replayExecutor) to be replayed. The number of processing threads can be specified via spark.history.fs.numReplayThreads in Spark configuration. If the setting is not specified, the number of threads will be default to 1/4 of available processors on the Java virtual machine. FsHistoryProvider is the concrete implementation of abstract class ApplicationHistoryProvider. The main purpose of this class is to provide the log data for display on the Web UI. Spark History Server initializes this class dynamically via reflection during start up. It defaults to FsHistoryProvider if spark.history.provider is not set in the Spark configuration.

See the checkForLogs() method below.

/**
   * Builds the application list based on the current contents of the log directory.
   * Tries to reuse as much of the data already in memory as possible, by not reading
   * applications that haven't been updated since last time the logs were checked.
   */
  private[history] def checkForLogs(): Unit = {
    try {
      val newLastScanTime = getNewLastScanTime()
      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
      // scan for modified applications, replay and merge them
      val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
        .filter { entry =>
          !entry.isDirectory() &&
            // FsHistoryProvider generates a hidden file which can't be read.  Accidentally
            // reading a garbage file is safe, but we would log an error which can be scary to
            // the end-user.
            !entry.getPath().getName().startsWith(".") &&
            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
            recordedFileSize(entry.getPath()) < entry.getLen()         }         .sortWith { case (entry1, entry2) =>
          entry1.getModificationTime() > entry2.getModificationTime()
        }

      if (logInfos.nonEmpty) {
        logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
      }

      var tasks = mutable.ListBuffer[Future[_]]()

      try {
        for (file <- logInfos) {           tasks += replayExecutor.submit(new Runnable {             override def run(): Unit = mergeApplicationListing(file)           })         }       } catch {         // let the iteration over logInfos break, since an exception on         // replayExecutor.submit (..) indicates the ExecutorService is unable         // to take any more submissions at this time         case e: Exception =>
          logError(s"Exception while submitting event log for replay", e)
      }

      pendingReplayTasksCount.addAndGet(tasks.size)

      tasks.foreach { task =>
        try {
          // Wait for all tasks to finish. This makes sure that checkForLogs
          // is not scheduled again while some tasks are already running in
          // the replayExecutor.
          task.get()
        } catch {
          case e: InterruptedException =>
            throw e
          case e: Exception =>
            logError("Exception while merging application listings", e)
        } finally {
          pendingReplayTasksCount.decrementAndGet()
        }
      }

      lastScanTime.set(newLastScanTime)
    } catch {
      case e: Exception => logError("Exception in checking for event log updates", e)
    }
  }

 

Get preferred locations for RDD’s partitions

When DAGScheduler submits a Stage for execution, it fetches the preferred locations (TaskLocations) to run tasks on partitions for a RDD from BlockManagerMaster which in turn reach out to  the driver’s RPC endpoint for the infos.  TaskLocation is either a host or (host, executorID) pair. The purpose is to achieve data locality when running a task. It is always preferred to run task on a host which has the required data block in memory.

DAGScheduler class has a getCacheLocs method. In this method, DAGScheduler will first check if the preferred locations (TaskLocation) has been fetched before by checking its internal cacheLocs hashMap. If the internal cacheLocs does not contain the information for the RDD, it will then call blockManagerMaster.getLocations to fetch the preferred locations for all the partitions within the RDD. Once fetched, task locations will be kept in the cacheLocs hashMap.

See DAGScheduler’s getCacheLocs method below.

private[scheduler]
  def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
    // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
    if (!cacheLocs.contains(rdd.id)) {
      // Note: if the storage level is NONE, we don't need to get locations from block manager.
      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
        IndexedSeq.fill(rdd.partitions.length)(Nil)
      } else {
        val blockIds =
          rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
        blockManagerMaster.getLocations(blockIds).map { bms =>
          bms.map(bm => TaskLocation(bm.host, bm.executorId))
        }
      }
      cacheLocs(rdd.id) = locs
    }
    cacheLocs(rdd.id)
  }
/**
   * Recursive implementation for getPreferredLocs.
   *
   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
   * methods (getCacheLocs()); please be careful when modifying this method, because any new
   * DAGScheduler state accessed by it may require additional synchronization.
   */
  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited)           if (locs != Nil) {             return locs           }         }       case _ =>
    }

    Nil
  }

In BlockManagerMaster, getLocations method sends a RPC request to driver end point to get the preferredLocations.


  /** Get locations of multiple blockIds from the driver */
  def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
    driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
      GetLocationsMultipleBlockIds(blockIds))
  }

 

Spark pluggable cluster manager ExternalClusterManager

In Spark 2.0.0, a new pluggable cluster manager interface, ExternalClusterManager was introduced.  See the Jira ticket below

https://issues.apache.org/jira/browse/SPARK-13904

This allows us to introduce new cluster managers other than Yarn, Mesos, and Spark standalone by implementing the ExternalClusterManager trait. These are the required methods in the trait. createTaskScheduler method is to create a task scheduler and createSchedulerBackend method is to create a scheduler backend.


def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>
def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>

ClusterManager is used in SparkContext to create scheduler backend and task scheduler.

Currently, there are two existing implementations of ExternalClusterManager. YarnClusterManager is the implementation for Yarn. As you can see in the codes, it returns YarnClusterScheduler when deploy mode is cluster. For client deploy mode, it returns YarnScheduler. For scheduler backend, it creates YarnClusterSchedulerBackend and YarnClientSchdulerBackend for cluster deploy mode and client deploy mode respectively.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler.cluster

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
 * Cluster Manager for creation of Yarn scheduler and backend
 */
private[spark] class YarnClusterManager extends ExternalClusterManager {

  override def canCreate(masterURL: String): Boolean = {
    masterURL == "yarn"
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    sc.deployMode match {
      case "cluster" => new YarnClusterScheduler(sc)
      case "client" => new YarnScheduler(sc)
      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    sc.deployMode match {
      case "cluster" =>
        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case "client" =>
        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case  _ =>
        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

MesosClusterManager is the implementation for Mesos.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
 * Cluster Manager for creation of Mesos scheduler and backend
 */
private[spark] class MesosClusterManager extends ExternalClusterManager {
  private val MESOS_REGEX = """mesos://(.*)""".r

  override def canCreate(masterURL: String): Boolean = {
    masterURL.startsWith("mesos")
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    new TaskSchedulerImpl(sc)
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    require(!sc.conf.get(IO_ENCRYPTION_ENABLED),
      "I/O encryption is currently not supported in Mesos.")

    val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
    val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
    if (coarse) {
      new MesosCoarseGrainedSchedulerBackend(
        scheduler.asInstanceOf[TaskSchedulerImpl],
        sc,
        mesosUrl,
        sc.env.securityManager)
    } else {
      new MesosFineGrainedSchedulerBackend(
        scheduler.asInstanceOf[TaskSchedulerImpl],
        sc,
        mesosUrl)
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

java.util.ServiceLoader mechanism is used to register and auto load any new external cluster manager implementation.

Failed spark job reports on YARN as successful when deployed with client-mode

If you deploy your spark job on YARN with client deploy mode, be aware that the YARN application is always shown as SUCCEEDED on YARN resource manager site even if the Spark job fails.

See the following Jira ticket

https://issues.apache.org/jira/browse/SPARK-11058

Note from Spark docs:

--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)

DAGSchedulerEventLoop in Spark

In Spark, DAGScheduler utilizes DAGSchedulerEventProcessLoop to process different DAGSchedulerEvent events.

Here are the different DAGSchedulerEvent events:
JobSubmitted
MapStageSubmitted
StageCancelled
JobCancelled
JobGroupCancelled
AllJobsCancelled
ExecutorAdded
ExecutorLost
WorkerRemoved
BeginEvent
SpeculativeTaskSubmitted
GettingResultEvent
CompletionEvent
TaskSetFailed
ResubmitFailedStages

Basically, DAGSchedulerEventProcessLoop extends an abstract class called EventLoop which has an internal daemon thread polling events from the event queue (LinkedBlockingDeque). It uses FIFO (put and take) even though it is a double-ended queue. Once the thread takes an event, it will trigger the onReceive method  on the subclass DAGSchedulerEventProcessLoop.

You can find the event thread definition in EventLoop as shown below.

private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

See DAGSchedulerEventProcessLoop’s onReceive method below. In this method, different DAGScheduler’s event handling methods are invoked based on the event type.

For example:

DAGScheduler calls

  • handleJobSubmitted method on JobSubmitted event.
  • handleMapStageSubmitted method on MapStageSubmitted event
  • handleStageCancellation method on StageCancelled event
  • handleJobCancellation method on JobCancelled event
  • handleJobGroupCancelled method on JobGroupCancelled event

Also, observe the good use of Scala extractor in pattern matching of DAGSchedulerEvent.

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)</code>

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}

override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}

Check out the following classes to learn more

EventLoop

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/EventLoop.scala

DAGSchedulerEvent

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

DAGScheduler (You can find internal class DAGSchedulerEventProcessLoop inside DAGScheduler)

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Continue reading