Spark pluggable cluster manager ExternalClusterManager

In Spark 2.0.0, a new pluggable cluster manager interface, ExternalClusterManager was introduced.  See the Jira ticket below

https://issues.apache.org/jira/browse/SPARK-13904

This allows us to introduce new cluster managers other than Yarn, Mesos, and Spark standalone by implementing the ExternalClusterManager trait. These are the required methods in the trait. createTaskScheduler method is to create a task scheduler and createSchedulerBackend method is to create a scheduler backend.


def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>
def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>

ClusterManager is used in SparkContext to create scheduler backend and task scheduler.

Currently, there are two existing implementations of ExternalClusterManager. YarnClusterManager is the implementation for Yarn. As you can see in the codes, it returns YarnClusterScheduler when deploy mode is cluster. For client deploy mode, it returns YarnScheduler. For scheduler backend, it creates YarnClusterSchedulerBackend and YarnClientSchdulerBackend for cluster deploy mode and client deploy mode respectively.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler.cluster

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
 * Cluster Manager for creation of Yarn scheduler and backend
 */
private[spark] class YarnClusterManager extends ExternalClusterManager {

  override def canCreate(masterURL: String): Boolean = {
    masterURL == "yarn"
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    sc.deployMode match {
      case "cluster" => new YarnClusterScheduler(sc)
      case "client" => new YarnScheduler(sc)
      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    sc.deployMode match {
      case "cluster" =>
        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case "client" =>
        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case  _ =>
        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

MesosClusterManager is the implementation for Mesos.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
 * Cluster Manager for creation of Mesos scheduler and backend
 */
private[spark] class MesosClusterManager extends ExternalClusterManager {
  private val MESOS_REGEX = """mesos://(.*)""".r

  override def canCreate(masterURL: String): Boolean = {
    masterURL.startsWith("mesos")
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    new TaskSchedulerImpl(sc)
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    require(!sc.conf.get(IO_ENCRYPTION_ENABLED),
      "I/O encryption is currently not supported in Mesos.")

    val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
    val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
    if (coarse) {
      new MesosCoarseGrainedSchedulerBackend(
        scheduler.asInstanceOf[TaskSchedulerImpl],
        sc,
        mesosUrl,
        sc.env.securityManager)
    } else {
      new MesosFineGrainedSchedulerBackend(
        scheduler.asInstanceOf[TaskSchedulerImpl],
        sc,
        mesosUrl)
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}

java.util.ServiceLoader mechanism is used to register and auto load any new external cluster manager implementation.

Failed spark job reports on YARN as successful when deployed with client-mode

If you deploy your spark job on YARN with client deploy mode, be aware that the YARN application is always shown as SUCCEEDED on YARN resource manager site even if the Spark job fails.

See the following Jira ticket

https://issues.apache.org/jira/browse/SPARK-11058

Note from Spark docs:

--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)

DAGSchedulerEventLoop in Spark

In Spark, DAGScheduler utilizes DAGSchedulerEventProcessLoop to process different DAGSchedulerEvent events.

Here are the different DAGSchedulerEvent events:
JobSubmitted
MapStageSubmitted
StageCancelled
JobCancelled
JobGroupCancelled
AllJobsCancelled
ExecutorAdded
ExecutorLost
WorkerRemoved
BeginEvent
SpeculativeTaskSubmitted
GettingResultEvent
CompletionEvent
TaskSetFailed
ResubmitFailedStages

Basically, DAGSchedulerEventProcessLoop extends an abstract class called EventLoop which has an internal daemon thread polling events from the event queue (LinkedBlockingDeque). It uses FIFO (put and take) even though it is a double-ended queue. Once the thread takes an event, it will trigger the onReceive method  on the subclass DAGSchedulerEventProcessLoop.

You can find the event thread definition in EventLoop as shown below.

private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

See DAGSchedulerEventProcessLoop’s onReceive method below. In this method, different DAGScheduler’s event handling methods are invoked based on the event type.

For example:

DAGScheduler calls

  • handleJobSubmitted method on JobSubmitted event.
  • handleMapStageSubmitted method on MapStageSubmitted event
  • handleStageCancellation method on StageCancelled event
  • handleJobCancellation method on JobCancelled event
  • handleJobGroupCancelled method on JobGroupCancelled event

Also, observe the good use of Scala extractor in pattern matching of DAGSchedulerEvent.

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)</code>

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}

override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}

Check out the following classes to learn more

EventLoop

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/EventLoop.scala

DAGSchedulerEvent

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

DAGScheduler (You can find internal class DAGSchedulerEventProcessLoop inside DAGScheduler)

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Continue reading

Remove Akka from Spark Core dependencies

In Spark 2.0, Akka is removed from Spark Core dependencies. For one to understand the works required to accomplish this, check out these tickets.

https://issues.apache.org/jira/browse/SPARK-5293

https://issues.apache.org/jira/browse/SPARK-5214

https://issues.apache.org/jira/browse/SPARK-6602

https://issues.apache.org/jira/browse/SPARK-6028

You can also find the design doc for new pluggable RPC implementation in this ticket

https://issues.apache.org/jira/browse/SPARK-5124

As a result of the refactoring, a common single-threaded event loop was implemented in DAGScheduler to replace Akka.  An alternative non Akka RPC implementation was also introduced.

Also, see Reynold Xin’s comment in the following pull request about the reasons behind the refactoring.

https://github.com/apache/spark/pull/4016

If we ever do that, it’d be for making networking easier (both debugging and deployment), and enabling our users to use Akka (using Akka, especially a different version of it for an app on top of Spark is a mess right now. Spark not depending on Akka will make it easier for applications on top of Spark to use Akka).

This major undertaking started as early as Spark 1.3 and finally in Spark 2.0, Akka is removed from Spark Core dependencies. Kudos to Shixiong Zhu for getting it done.

 

 

 

 

Spark 2.2.0: New Imputer to replace missing values

With the release of Spark 2.2.0, we can now use the newly implemented Imputer to replace missing values in our dataset. However, it only supports mean and median as the imputation strategies currently but not the most frequent. The default strategy is mean. (Note: scikit-learn provides all three different strategies).

See the Imputer class and the associated Jira ticket below
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala

https://issues.apache.org/jira/browse/SPARK-13568

Example usage using Zillow Price Kaggle dataset:


val imputer = new Imputer()
imputer.setInputCols(Array("bedroomcnt", "bathroomcnt", "roomcnt", "calculatedfinishedsquarefeet", "taxamount", "taxvaluedollarcnt", "landtaxvaluedollarcnt", "structuretaxvaluedollarcnt")) 

imputer.setOutputCols(Array("bedroomcnt_out", "bathroomcnt_out", "roomcnt_out", "calculatedfinishedsquarefeet_out", "taxamount_out", "taxvaluedollarcnt_out", "landtaxvaluedollarcnt_out", "structuretaxvaluedollarcnt_out"))
   

However, I ran into this issue, it can’t handle column values of integer type. See this Jira ticket.

https://issues.apache.org/jira/browse/SPARK-20604

The good news is a pull request was created to fix the issue by converting integer type to double type during imputation. See the pull request below.

https://github.com/apache/spark/pull/17864

StringIndexer transform fails when column contains nulls

If you run into NullPointerException when using StringIndexer in Spark version < 2.2.0, this means that your input column contains null values. You would have to remove/impute these null values before using StringIndexer. See ticket below. Good news is this issue was fixed in Spark version 2.2.0

https://issues.apache.org/jira/browse/SPARK-11569

With the fix, we can specify how StringIndexer should handle null values, three different strategies are available as below.

handleInvalid=error: Throw an exception as before
handleInvalid=skip: Skip null values as well as unseen labels
handleInvalid=keep: Give null values an additional index as well as unseen labels

val codeIndexer = new StringIndexer().setInputCol("originalCode").setOutputCol("originalCodeCategory")
codeIndexer.setHandleInvalid("keep")

 

 

 

Zillow Price Kaggle Competition Part 2

From my last blog, I calculated the missing value percentage for every columns in the data. Next thing to do is to perform imputation of missing values in selected columns before model training.

However, I am going to skip doing in depth data cleaning and feature selection eg. removing outliers and calculating correlations between features, etc. I will do that in the next blog 😉

Instead I am going to use  the following features to build a model. These columns have low missing value percentage.

"bedroomcnt"
"bathroomcnt"
"roomcnt"
"taxamount"
"taxvaluedollarcnt"
"lotsizesquarefeet"
"finishedsquarefeet12"
"latitude"
"longitude"

I replaced missing values in bedroomcnt with 3 and bathroomcnt with 2 (use most frequent value) and drop any row that has any missing values in the other columns.

In this experiment, I used Spark to train a Gradient Boost Tree Regression model. I built a Spark ML pipeline to perform hyperparameter tuning of GBT. Basically, it will test a grid of different hyperparameters and choose the best parameters based on the evaluation metric, RMSE.

val paramGrid = new ParamGridBuilder()
      .addGrid(gbt.maxDepth, Array(2,5))
      .addGrid(gbt.maxIter, Array(50,100))
      .build()

 

Screen Shot 2017-08-07 at 12.37.39 AM

Screen Shot 2017-08-07 at 12.42.14 AM

Next I will need to go back to data cleaning and feature selection to choose better/more features to improve the model.

Zillow Prize Kaggle Competition: Exploratory Analysis Part 1

You can download the Zillow Price Kaggle datasets from https://www.kaggle.com/c/zillow-prize-1
For this exploratory study of property dataset, I used Spark to calculate the missing value percentage for each column. As you can see below, as many as 20 columns have greater than 90% null/empty values. Examples: storytypeid, basementsqft, yardbuildingsqft.

(Column Name, Missing Value Percentage)
(storytypeid,99.94559859467502)
(basementsqft,99.94546460106585)
(yardbuildingsqft26,99.91132972912857)
(fireplaceflag,99.82704774895761)
(architecturalstyletypeid,99.79696618369786)
(typeconstructiontypeid,99.7739862797244)
(finishedsquarefeet13,99.74300025760272)
(buildingclasstypeid,99.57694867743282)
(decktypeid,99.42731131438686)
(finishedsquarefeet6,99.26300165113625)
(poolsizesum,99.06338467186806)
(pooltypeid2,98.92553874642948)
(pooltypeid10,98.76260251767292)
(taxdelinquencyflag,98.10861320969296)
(taxdelinquencyyear,98.10854621288838)
(hashottuborspa,97.68814126410241)
(yardbuildingsqft17,97.30823588368953)
(finishedsquarefeet15,93.60857183916613)
(finishedfloor1squarefeet,93.20930438222749)
(finishedsquarefeet50,93.20930438222749)

(threequarterbathnbr,89.56085939481116)
(fireplacecnt,89.5271600021037)
(pooltypeid7,83.73789912090143)
(poolcnt,82.66343786733091)
(numberofstories,77.15177824593657)
(airconditioningtypeid,72.81541006901676)
(garagecarcnt,70.41196670124819)
(garagetotalsqft,70.41196670124819)
(regionidneighborhood,61.26238059075773)
(heatingorsystemtypeid,39.488452598253325)
(buildingqualitytypeid,35.06374913448503)
(unitcnt,33.75724444822604)
(propertyzoningdesc,33.71908976801352)
(lotsizesquarefeet,9.248875374888994)
(finishedsquarefeet12,9.24666448033761)
(calculatedbathnbr,4.31834603648579)
(fullbathcnt,4.31834603648579)
(censustractandblock,2.5166009707167016)
(landtaxvaluedollarcnt,2.268947282559358)
(regionidcity,2.10520709214774)
(yearbuilt,2.0074922526570096)
(calculatedfinishedsquarefeet,1.8613387234495853)
(structuretaxvaluedollarcnt,1.8418091549123563)
(taxvaluedollarcnt,1.4253570175970458)
(taxamount,1.0468250716782062)
(regionidzip,0.46830766406596236)
(propertycountylandusecode,0.4112598849597868)
(roomcnt,0.3843941663202374)
(bathroomcnt,0.38395868709041925)
(bedroomcnt,0.3835567062628948)
(assessmentyear,0.38318822383766404)
(fips,0.38312122703307666)
(latitude,0.38312122703307666)
(longitude,0.38312122703307666)
(propertylandusetypeid,0.38312122703307666)
(rawcensustractandblock,0.38312122703307666)
(regionidcounty,0.38312122703307666)
(parcelid,0.0)

Screen Shot 2017-08-05 at 12.24.07 AM

Now I have identified the good columns (italics bold, less than 10% empty values) and bad columns. The next step is to determine if we can apply some sorts of missing values imputation to any of those problematic columns.

A few missing value imputation techniques are available, such as using mean, median, or most frequent value as replacement value.