DAGSchedulerEventLoop in Spark

In Spark, DAGScheduler utilizes DAGSchedulerEventProcessLoop to process different DAGSchedulerEvent events.

Here are the different DAGSchedulerEvent events:
JobSubmitted
MapStageSubmitted
StageCancelled
JobCancelled
JobGroupCancelled
AllJobsCancelled
ExecutorAdded
ExecutorLost
WorkerRemoved
BeginEvent
SpeculativeTaskSubmitted
GettingResultEvent
CompletionEvent
TaskSetFailed
ResubmitFailedStages

Basically, DAGSchedulerEventProcessLoop extends an abstract class called EventLoop which has an internal daemon thread polling events from the event queue (LinkedBlockingDeque). It uses FIFO (put and take) even though it is a double-ended queue. Once the thread takes an event, it will trigger the onReceive method  on the subclass DAGSchedulerEventProcessLoop.

You can find the event thread definition in EventLoop as shown below.

private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

See DAGSchedulerEventProcessLoop’s onReceive method below. In this method, different DAGScheduler’s event handling methods are invoked based on the event type.

For example:

DAGScheduler calls

  • handleJobSubmitted method on JobSubmitted event.
  • handleMapStageSubmitted method on MapStageSubmitted event
  • handleStageCancellation method on StageCancelled event
  • handleJobCancellation method on JobCancelled event
  • handleJobGroupCancelled method on JobGroupCancelled event

Also, observe the good use of Scala extractor in pattern matching of DAGSchedulerEvent.

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)</code>

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}

override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}

Check out the following classes to learn more

EventLoop

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/EventLoop.scala

DAGSchedulerEvent

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

DAGScheduler (You can find internal class DAGSchedulerEventProcessLoop inside DAGScheduler)

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Continue reading

Spark 2.2.0: New Imputer to replace missing values

With the release of Spark 2.2.0, we can now use the newly implemented Imputer to replace missing values in our dataset. However, it only supports mean and median as the imputation strategies currently but not the most frequent. The default strategy is mean. (Note: scikit-learn provides all three different strategies).

See the Imputer class and the associated Jira ticket below
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala

https://issues.apache.org/jira/browse/SPARK-13568

Example usage using Zillow Price Kaggle dataset:


val imputer = new Imputer()
imputer.setInputCols(Array("bedroomcnt", "bathroomcnt", "roomcnt", "calculatedfinishedsquarefeet", "taxamount", "taxvaluedollarcnt", "landtaxvaluedollarcnt", "structuretaxvaluedollarcnt")) 

imputer.setOutputCols(Array("bedroomcnt_out", "bathroomcnt_out", "roomcnt_out", "calculatedfinishedsquarefeet_out", "taxamount_out", "taxvaluedollarcnt_out", "landtaxvaluedollarcnt_out", "structuretaxvaluedollarcnt_out"))
   

However, I ran into this issue, it can’t handle column values of integer type. See this Jira ticket.

https://issues.apache.org/jira/browse/SPARK-20604

The good news is a pull request was created to fix the issue by converting integer type to double type during imputation. See the pull request below.

https://github.com/apache/spark/pull/17864

StringIndexer transform fails when column contains nulls

If you run into NullPointerException when using StringIndexer in Spark version < 2.2.0, this means that your input column contains null values. You would have to remove/impute these null values before using StringIndexer. See ticket below. Good news is this issue was fixed in Spark version 2.2.0

https://issues.apache.org/jira/browse/SPARK-11569

With the fix, we can specify how StringIndexer should handle null values, three different strategies are available as below.

handleInvalid=error: Throw an exception as before
handleInvalid=skip: Skip null values as well as unseen labels
handleInvalid=keep: Give null values an additional index as well as unseen labels

val codeIndexer = new StringIndexer().setInputCol("originalCode").setOutputCol("originalCodeCategory")
codeIndexer.setHandleInvalid("keep")

 

 

 

Deprecated functions in org.apache.spark.sql. functions in Spark 2.0

I just moved some of my spark codes from 1.6.0 to 2.2.0 and discovered that some functions in org.apache.spark.sql.functions._ are being replaced/renamed.

To name a few, see below

1) rowNumber() is replaced by row_number()

import org.apache.spark.sql.functions._
/**
* @group window_funcs
* @deprecated As of 1.6.0, replaced by `row_number`. This will be removed in Spark 2.0.
*/
@deprecated("Use row_number. This will be removed in Spark 2.0.", "1.6.0")
def rowNumber(): Column = row_number()

2) isNaN is replaced by isnan

/**
   * @group normal_funcs
   * @deprecated As of 1.6.0, replaced by `isnan`. This will be removed in Spark 2.0.
   */
  @deprecated("Use isnan. This will be removed in Spark 2.0.", "1.6.0")
  def isNaN(e: Column): Column = isnan(e)

3) inputFileName() is replaced by input_file_name

/**
   * @group normal_funcs
   * @deprecated As of 1.6.0, replaced by `input_file_name`. This will be removed in Spark 2.0.
   */
  @deprecated("Use input_file_name. This will be removed in Spark 2.0.", "1.6.0")
  def inputFileName(): Column = input_file_name()

To get the full list of all the replaced/renamed functions, refer to this code
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Scala Enumeration

In Java we use enum to represent fixed set of constants

For example, we would define days of week enum type as follows

public enum Day {
    SUNDAY, MONDAY, TUESDAY, WEDNESDAY,
    THURSDAY, FRIDAY, SATURDAY 
}

In Scala, we can do the same thing by extending Enumeration, for example

object Day extends Enumeration {
  type Day = Value
  val SUNDAY, MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY = Value
}

You can find examples of Scala Enumeration usage in Spark
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskState.scala

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala

Render Json using Jackson in Scala

If you use Jackson Json library in Scala, remember to register the DefaultScalaModule so that ObjectMapper can convert List, Array to Json correctly. See below.

 
val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)

Simple example:

 
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.{JsonProperty, PropertyAccessor}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule



object JsonExample {
  case class Car(@JsonProperty("id")  id: Long)
  case class Person(@JsonProperty("name") name: String = null,
                    @JsonProperty("cars") cars: Seq[Car] = null)

  def main(args:Array[String]):Unit = {
    val car1 = Car(12345)
    val car2 = Car(12346)
    val carsOwned = List(car1, car2)
    var person = Person(name="wei", cars=carsOwned)

    val objectMapper = new ObjectMapper()
    objectMapper.registerModule(DefaultScalaModule)
    objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.NONE)
    objectMapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY)
    println(s"person: ${objectMapper.writeValueAsString(person)}")
  }
}

Output:
person: {“name”:”wei”,”cars”:[{“id”:12345},{“id”:12346}]}