Repository: spark
Updated Branches:
  refs/heads/master ac8ff920f -> c5fe3dd4f


[SPARK-18010][CORE] Reduce work performed for building up the application list 
for the History Server app list UI page

## What changes were proposed in this pull request?
allow ReplayListenerBus to skip deserialising and replaying certain events 
using an inexpensive check of the event log entry. Use this to ensure that when 
event log replay is triggered for building the application list, we get the 
ReplayListenerBus to skip over all but the few events needed for our immediate 
purpose. Refer [SPARK-18010] for the motivation behind this change.

## How was this patch tested?

Tested with existing HistoryServer and ReplayListener unit test suites. All 
tests pass.

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.

Author: Vinayak <[email protected]>

Closes #15556 from vijoshi/SAAS-467_master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5fe3dd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5fe3dd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5fe3dd4

Branch: refs/heads/master
Commit: c5fe3dd4f59c464c830b414acccd3cca0fdd877c
Parents: ac8ff92
Author: Vinayak <[email protected]>
Authored: Tue Oct 25 10:36:03 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Tue Oct 25 10:36:03 2016 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 120 +++++++++++--------
 .../spark/scheduler/ReplayListenerBus.scala     |  39 ++++--
 2 files changed, 101 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5fe3dd4/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 530cc52..dfc1aad 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
@@ -78,10 +79,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   import FsHistoryProvider._
 
-  private val NOT_STARTED = "<Not Started>"
-
-  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
-
   // Interval between safemode checks.
   private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
     "spark.history.fs.safemodeCheck.interval", "5s")
@@ -241,11 +238,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               HistoryServer.getAttemptURI(appId, attempt.attemptId), 
attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each 
application
           }
-          val appListener = new ApplicationEventListener()
-          replayBus.addListener(appListener)
-          val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, 
attempt.logPath)),
-            replayBus)
-          appAttemptInfo.map { info =>
+
+          val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+
+          val appListener = replay(fileStatus, 
isApplicationCompleted(fileStatus), replayBus)
+
+          if (appListener.appId.isDefined) {
             val uiAclsEnabled = 
conf.getBoolean("spark.history.ui.acls.enable", false)
             ui.getSecurityManager.setAcls(uiAclsEnabled)
             // make sure to set admin acls before view acls so they are 
properly picked up
@@ -254,8 +252,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               appListener.viewAcls.getOrElse(""))
             
ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse(""))
             
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-            LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
+            Some(LoadedAppUI(ui, updateProbe(appId, attemptId, 
attempt.fileSize)))
+          } else {
+            None
           }
+
         }
       }
     } catch {
@@ -411,28 +412,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
-
   /**
    * Replay the log files in the list and merge the list of old applications 
with new ones
    */
   private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
     val newAttempts = try {
-        val bus = new ReplayListenerBus()
-        val res = replay(fileStatus, bus)
-        res match {
-          case Some(r) => logDebug(s"Application log ${r.logPath} loaded 
successfully: $r")
-          case None => logWarning(s"Failed to load application log 
${fileStatus.getPath}. " +
-            "The application may have not started.")
-        }
-        res
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
-            e)
-          None
+      val eventsFilter: ReplayEventsFilter = { eventString =>
+        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+          eventString.startsWith(APPL_END_EVENT_PREFIX)
+      }
+
+      val logPath = fileStatus.getPath()
+
+      val appCompleted = isApplicationCompleted(fileStatus)
+
+      val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
+
+      // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
+      // try to show their UI.
+      if (appListener.appId.isDefined) {
+        val attemptInfo = new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appListener.appId.getOrElse(logPath.getName()),
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          fileStatus.getModificationTime(),
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted,
+          fileStatus.getLen()
+        )
+        fileToAppInfo(logPath) = attemptInfo
+        logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: 
$attemptInfo")
+        Some(attemptInfo)
+      } else {
+        logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+          "The application may have not started.")
+        None
       }
 
+    } catch {
+      case e: Exception =>
+        logError(
+          s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
+          e)
+        None
+    }
+
     if (newAttempts.isEmpty) {
       return
     }
@@ -564,12 +591,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   /**
-   * Replays the events in the specified log file and returns information 
about the associated
-   * application. Return `None` if the application ID cannot be located.
+   * Replays the events in the specified log file on the supplied 
`ReplayListenerBus`. Returns
+   * an `ApplicationEventListener` instance with event data captured from the 
replay.
+   * `ReplayEventsFilter` determines what events are replayed and can 
therefore limit the
+   * data captured in the returned `ApplicationEventListener` instance.
    */
   private def replay(
       eventLog: FileStatus,
-      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
+      appCompleted: Boolean,
+      bus: ReplayListenerBus,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): 
ApplicationEventListener = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     // Note that the eventLog may have *increased* in size since when we 
grabbed the filestatus,
@@ -581,30 +612,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
       val appListener = new ApplicationEventListener
-      val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
-      bus.replay(logInput, logPath.toString, !appCompleted)
-
-      // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
-      // try to show their UI.
-      if (appListener.appId.isDefined) {
-        val attemptInfo = new FsApplicationAttemptInfo(
-          logPath.getName(),
-          appListener.appName.getOrElse(NOT_STARTED),
-          appListener.appId.getOrElse(logPath.getName()),
-          appListener.appAttemptId,
-          appListener.startTime.getOrElse(-1L),
-          appListener.endTime.getOrElse(-1L),
-          eventLog.getModificationTime(),
-          appListener.sparkUser.getOrElse(NOT_STARTED),
-          appCompleted,
-          eventLog.getLen()
-        )
-        fileToAppInfo(logPath) = attemptInfo
-        Some(attemptInfo)
-      } else {
-        None
-      }
+      bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+      appListener
     } finally {
       logInput.close()
     }
@@ -689,6 +699,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
 private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  private val NOT_STARTED = "<Not Started>"
+
+  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
+
+  private val APPL_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationStart\""
+
+  private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c5fe3dd4/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d32f5eb..3eff8d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.util.JsonProtocol
 
 /**
@@ -43,30 +44,45 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
    * @param sourceName Filename (or other source identifier) from whence 
@logData is being read
    * @param maybeTruncated Indicate whether log file might be truncated (some 
abnormal situations
    *        encountered, log file might not finished writing) or not
+   * @param eventsFilter Filter function to select JSON event strings in the 
log data stream that
+   *        should be parsed and replayed. When not specified, all event 
strings in the log data
+   *        are parsed and replayed.
    */
   def replay(
       logData: InputStream,
       sourceName: String,
-      maybeTruncated: Boolean = false): Unit = {
+      maybeTruncated: Boolean = false,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+
     var currentLine: String = null
-    var lineNumber: Int = 1
+    var lineNumber: Int = 0
+
     try {
-      val lines = Source.fromInputStream(logData).getLines()
-      while (lines.hasNext) {
-        currentLine = lines.next()
+      val lineEntries = Source.fromInputStream(logData)
+        .getLines()
+        .zipWithIndex
+        .filter { case (line, _) => eventsFilter(line) }
+
+      while (lineEntries.hasNext) {
         try {
+          val entry = lineEntries.next()
+
+          currentLine = entry._1
+          lineNumber = entry._2 + 1
+
           postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
         } catch {
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that 
might be truncated
-            if (!maybeTruncated || lines.hasNext) {
+            // the last entry may not be the very last line in the event log, 
but we treat it
+            // as such in a best effort to replay the given input
+            if (!maybeTruncated || lineEntries.hasNext) {
               throw jpe
             } else {
               logWarning(s"Got JsonParseException from log file $sourceName" +
                 s" at line $lineNumber, the file might not have finished 
writing cleanly.")
             }
         }
-        lineNumber += 1
       }
     } catch {
       case ioe: IOException =>
@@ -78,3 +94,12 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   }
 
 }
+
+
+private[spark] object ReplayListenerBus {
+
+  type ReplayEventsFilter = (String) => Boolean
+
+  // utility filter that selects all event logs during replay
+  val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to