Spark application Web UI is short-lived, only available for the duration of the job. Once the application is done, you won’t be able to access it anymore. To allow continued access even after the application is completed is to utilize the Spark History Server. To do that, we need to specify the following two settings during Spark application submission
spark.eventLog.enabled=true
spark.eventLog.dir=/path/to/log/dir (default is /tmp/spark-events)
As you can see in the SparkContext class below, the two settings are retrieved from SparkConf
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
_eventLogDir = if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
If the eventLogEnabled setting is true, SparkContext will then create EventLogSparkListener to be added to the LiveListenerBus’s event log queue.
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
As EventLoggingListener gets notified of various SparkListenerEvents, it writes to a persistent log file.
The log file will have a .inprogress suffix as long as the application is still running. It will get renamed after the job is completed.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"
val workingPath = logPath + IN_PROGRESS
Here is the list of different SparkListenerEvents.
SparkListenerStageSubmitted
SparkListenerStageCompleted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerSpeculativeTaskSubmitted
SparkListenerTaskEnd
SparkListenerJobStart
SparkListenerJobEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
SparkListenerExecutorBlacklisted
SparkListenerExecutorUnblacklisted
SparkListenerNodeBlacklisted
SparkListenerNodeUnblacklisted
SparkListenerBlockUpdated
SparkListenerExecutorMetricsUpdate
SparkListenerApplicationStart
SparkListenerApplicationEnd
SparkListenerLogStart
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
// may be null if the task has failed
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent
Here is the logEvent method used by listener to write the events to the log file.
/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = JsonProtocol.sparkEventToJson(event)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(ds => ds.getWrappedStream match {
case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
case _ => ds.hflush()
})
}
if (testing) {
loggedEvents += eventJson
}
}
On start, the Spark History Server reads and replays the log data in the configured log directory, specified by spark.eventLog.dir setting in SparkConf. You can find the checkForLogs() method in FsHistoryProvider.scala. In this method, it first scans the log directory to find any unread logs and submits them to a thread pool (replayExecutor) to be replayed. The number of processing threads can be specified via spark.history.fs.numReplayThreads in Spark configuration. If the setting is not specified, the number of threads will be default to 1/4 of available processors on the Java virtual machine. FsHistoryProvider is the concrete implementation of abstract class ApplicationHistoryProvider. The main purpose of this class is to provide the log data for display on the Web UI. Spark History Server initializes this class dynamically via reflection during start up. It defaults to FsHistoryProvider if spark.history.provider is not set in the Spark configuration.
See the checkForLogs() method below.
/**
* Builds the application list based on the current contents of the log directory.
* Tries to reuse as much of the data already in memory as possible, by not reading
* applications that haven't been updated since last time the logs were checked.
*/
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
// scan for modified applications, replay and merge them
val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry =>
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
recordedFileSize(entry.getPath()) < entry.getLen() } .sortWith { case (entry1, entry2) =>
entry1.getModificationTime() > entry2.getModificationTime()
}
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
var tasks = mutable.ListBuffer[Future[_]]()
try {
for (file <- logInfos) { tasks += replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(file) }) } } catch { // let the iteration over logInfos break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable // to take any more submissions at this time case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
}
pendingReplayTasksCount.addAndGet(tasks.size)
tasks.foreach { task =>
try {
// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
pendingReplayTasksCount.decrementAndGet()
}
}
lastScanTime.set(newLastScanTime)
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
}