DAGSchedulerEventLoop in Spark

In Spark, DAGScheduler utilizes DAGSchedulerEventProcessLoop to process different DAGSchedulerEvent events.

Here are the different DAGSchedulerEvent events:
JobSubmitted
MapStageSubmitted
StageCancelled
JobCancelled
JobGroupCancelled
AllJobsCancelled
ExecutorAdded
ExecutorLost
WorkerRemoved
BeginEvent
SpeculativeTaskSubmitted
GettingResultEvent
CompletionEvent
TaskSetFailed
ResubmitFailedStages

Basically, DAGSchedulerEventProcessLoop extends an abstract class called EventLoop which has an internal daemon thread polling events from the event queue (LinkedBlockingDeque). It uses FIFO (put and take) even though it is a double-ended queue. Once the thread takes an event, it will trigger the onReceive method  on the subclass DAGSchedulerEventProcessLoop.

You can find the event thread definition in EventLoop as shown below.

private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

See DAGSchedulerEventProcessLoop’s onReceive method below. In this method, different DAGScheduler’s event handling methods are invoked based on the event type.

For example:

DAGScheduler calls

  • handleJobSubmitted method on JobSubmitted event.
  • handleMapStageSubmitted method on MapStageSubmitted event
  • handleStageCancellation method on StageCancelled event
  • handleJobCancellation method on JobCancelled event
  • handleJobGroupCancelled method on JobGroupCancelled event

Also, observe the good use of Scala extractor in pattern matching of DAGSchedulerEvent.

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)</code>

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}

override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}

Check out the following classes to learn more

EventLoop

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/EventLoop.scala

DAGSchedulerEvent

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

DAGScheduler (You can find internal class DAGSchedulerEventProcessLoop inside DAGScheduler)

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s