Get preferred locations for RDD’s partitions

When DAGScheduler submits a Stage for execution, it fetches the preferred locations (TaskLocations) to run tasks on partitions for a RDD from BlockManagerMaster which in turn reach out to  the driver’s RPC endpoint for the infos.  TaskLocation is either a host or (host, executorID) pair. The purpose is to achieve data locality when running a task. It is always preferred to run task on a host which has the required data block in memory.

DAGScheduler class has a getCacheLocs method. In this method, DAGScheduler will first check if the preferred locations (TaskLocation) has been fetched before by checking its internal cacheLocs hashMap. If the internal cacheLocs does not contain the information for the RDD, it will then call blockManagerMaster.getLocations to fetch the preferred locations for all the partitions within the RDD. Once fetched, task locations will be kept in the cacheLocs hashMap.

See DAGScheduler’s getCacheLocs method below.

private[scheduler]
  def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
    // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
    if (!cacheLocs.contains(rdd.id)) {
      // Note: if the storage level is NONE, we don't need to get locations from block manager.
      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
        IndexedSeq.fill(rdd.partitions.length)(Nil)
      } else {
        val blockIds =
          rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
        blockManagerMaster.getLocations(blockIds).map { bms =>
          bms.map(bm => TaskLocation(bm.host, bm.executorId))
        }
      }
      cacheLocs(rdd.id) = locs
    }
    cacheLocs(rdd.id)
  }
/**
   * Recursive implementation for getPreferredLocs.
   *
   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
   * methods (getCacheLocs()); please be careful when modifying this method, because any new
   * DAGScheduler state accessed by it may require additional synchronization.
   */
  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited)           if (locs != Nil) {             return locs           }         }       case _ =>
    }

    Nil
  }

In BlockManagerMaster, getLocations method sends a RPC request to driver end point to get the preferredLocations.


  /** Get locations of multiple blockIds from the driver */
  def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
    driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
      GetLocationsMultipleBlockIds(blockIds))
  }

 

Spark pluggable cluster manager ExternalClusterManager

In Spark 2.0.0, a new pluggable cluster manager interface, ExternalClusterManager was introduced.  See the Jira ticket below

https://issues.apache.org/jira/browse/SPARK-13904

This allows us to introduce new cluster managers other than Yarn, Mesos, and Spark standalone by implementing the ExternalClusterManager trait. These are the required methods in the trait. createTaskScheduler method is to create a task scheduler and createSchedulerBackend method is to create a scheduler backend.


def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>
def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>

ClusterManager is used in SparkContext to create scheduler backend and task scheduler.

Currently, there are two existing implementations of ExternalClusterManager. YarnClusterManager is the implementation for Yarn. As you can see in the codes, it returns YarnClusterScheduler when deploy mode is cluster. For client deploy mode, it returns YarnScheduler. For scheduler backend, it creates YarnClusterSchedulerBackend and YarnClientSchdulerBackend for cluster deploy mode and client deploy mode respectively.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler.cluster

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
 * Cluster Manager for creation of Yarn scheduler and backend
 */
private[spark] class YarnClusterManager extends ExternalClusterManager {

  override def canCreate(masterURL: String): Boolean = {
    masterURL == "yarn"
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    sc.deployMode match {
      case "cluster" => new YarnClusterScheduler(sc)
      case "client" => new YarnScheduler(sc)
      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    sc.deployMode match {
      case "cluster" =>
        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case "client" =>
        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case  _ =>
        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

MesosClusterManager is the implementation for Mesos.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
 * Cluster Manager for creation of Mesos scheduler and backend
 */
private[spark] class MesosClusterManager extends ExternalClusterManager {
  private val MESOS_REGEX = """mesos://(.*)""".r

  override def canCreate(masterURL: String): Boolean = {
    masterURL.startsWith("mesos")
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    new TaskSchedulerImpl(sc)
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    require(!sc.conf.get(IO_ENCRYPTION_ENABLED),
      "I/O encryption is currently not supported in Mesos.")

    val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
    val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
    if (coarse) {
      new MesosCoarseGrainedSchedulerBackend(
        scheduler.asInstanceOf[TaskSchedulerImpl],
        sc,
        mesosUrl,
        sc.env.securityManager)
    } else {
      new MesosFineGrainedSchedulerBackend(
        scheduler.asInstanceOf[TaskSchedulerImpl],
        sc,
        mesosUrl)
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

java.util.ServiceLoader mechanism is used to register and auto load any new external cluster manager implementation.

Failed spark job reports on YARN as successful when deployed with client-mode

If you deploy your spark job on YARN with client deploy mode, be aware that the YARN application is always shown as SUCCEEDED on YARN resource manager site even if the Spark job fails.

See the following Jira ticket

https://issues.apache.org/jira/browse/SPARK-11058

Note from Spark docs:

--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)

DAGSchedulerEventLoop in Spark

In Spark, DAGScheduler utilizes DAGSchedulerEventProcessLoop to process different DAGSchedulerEvent events.

Here are the different DAGSchedulerEvent events:
JobSubmitted
MapStageSubmitted
StageCancelled
JobCancelled
JobGroupCancelled
AllJobsCancelled
ExecutorAdded
ExecutorLost
WorkerRemoved
BeginEvent
SpeculativeTaskSubmitted
GettingResultEvent
CompletionEvent
TaskSetFailed
ResubmitFailedStages

Basically, DAGSchedulerEventProcessLoop extends an abstract class called EventLoop which has an internal daemon thread polling events from the event queue (LinkedBlockingDeque). It uses FIFO (put and take) even though it is a double-ended queue. Once the thread takes an event, it will trigger the onReceive method  on the subclass DAGSchedulerEventProcessLoop.

You can find the event thread definition in EventLoop as shown below.

private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

See DAGSchedulerEventProcessLoop’s onReceive method below. In this method, different DAGScheduler’s event handling methods are invoked based on the event type.

For example:

DAGScheduler calls

  • handleJobSubmitted method on JobSubmitted event.
  • handleMapStageSubmitted method on MapStageSubmitted event
  • handleStageCancellation method on StageCancelled event
  • handleJobCancellation method on JobCancelled event
  • handleJobGroupCancelled method on JobGroupCancelled event

Also, observe the good use of Scala extractor in pattern matching of DAGSchedulerEvent.

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)</code>

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}

override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}

Check out the following classes to learn more

EventLoop

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/EventLoop.scala

DAGSchedulerEvent

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

DAGScheduler (You can find internal class DAGSchedulerEventProcessLoop inside DAGScheduler)

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Continue reading

Remove Akka from Spark Core dependencies

In Spark 2.0, Akka is removed from Spark Core dependencies. For one to understand the works required to accomplish this, check out these tickets.

https://issues.apache.org/jira/browse/SPARK-5293

https://issues.apache.org/jira/browse/SPARK-5214

https://issues.apache.org/jira/browse/SPARK-6602

https://issues.apache.org/jira/browse/SPARK-6028

You can also find the design doc for new pluggable RPC implementation in this ticket

https://issues.apache.org/jira/browse/SPARK-5124

As a result of the refactoring, a common single-threaded event loop was implemented in DAGScheduler to replace Akka.  An alternative non Akka RPC implementation was also introduced.

Also, see Reynold Xin’s comment in the following pull request about the reasons behind the refactoring.

https://github.com/apache/spark/pull/4016

If we ever do that, it’d be for making networking easier (both debugging and deployment), and enabling our users to use Akka (using Akka, especially a different version of it for an app on top of Spark is a mess right now. Spark not depending on Akka will make it easier for applications on top of Spark to use Akka).

This major undertaking started as early as Spark 1.3 and finally in Spark 2.0, Akka is removed from Spark Core dependencies. Kudos to Shixiong Zhu for getting it done.