Repository: spark
Updated Branches:
  refs/heads/branch-2.0 646cc8552 -> 26ae5cfa7


[SPARK-18010][CORE] Reduce work performed for building up the application list 
for the History Server app list UI page

## What changes were proposed in this pull request?

backport of pull request #15556

allow ReplayListenerBus to skip deserialising and replaying certain events 
using an inexpensive check of the event log entry. Use this to ensure that when 
event log replay is triggered for building the application list, we get the 
ReplayListenerBus to skip over all but the few events needed for our immediate 
purpose. Refer [SPARK-18010] for the motivation behind this change.
## How was this patch tested?

Tested with existing HistoryServer and ReplayListener unit test suites. All 
tests pass.

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.

Author: Vinayak vijoshi5in.ibm.com

Closes #15556 from vijoshi/SAAS-467_master.

Author: Vinayak <[email protected]>

Closes #15655 from vijoshi/SAAS-467_branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26ae5cfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26ae5cfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26ae5cfa

Branch: refs/heads/branch-2.0
Commit: 26ae5cfa7592cd84b5cc3361dfb0951ffa7409a9
Parents: 646cc85
Author: Vinayak <[email protected]>
Authored: Mon Nov 14 12:15:27 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Mon Nov 14 12:15:27 2016 +0100

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 120 +++++++++++--------
 .../spark/scheduler/ReplayListenerBus.scala     |  35 ++++--
 2 files changed, 97 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26ae5cfa/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index cf4a401..a1e36c5 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -35,6 +35,7 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
@@ -77,10 +78,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   import FsHistoryProvider._
 
-  private val NOT_STARTED = "<Not Started>"
-
-  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
-
   // Interval between safemode checks.
   private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
     "spark.history.fs.safemodeCheck.interval", "5s")
@@ -238,11 +235,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               HistoryServer.getAttemptURI(appId, attempt.attemptId), 
attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each 
application
           }
-          val appListener = new ApplicationEventListener()
-          replayBus.addListener(appListener)
-          val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, 
attempt.logPath)),
-            replayBus)
-          appAttemptInfo.map { info =>
+
+          val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+
+          val appListener = replay(fileStatus, 
isApplicationCompleted(fileStatus), replayBus)
+
+          if (appListener.appId.isDefined) {
             val uiAclsEnabled = 
conf.getBoolean("spark.history.ui.acls.enable", false)
             ui.getSecurityManager.setAcls(uiAclsEnabled)
             // make sure to set admin acls before view acls so they are 
properly picked up
@@ -251,8 +249,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               appListener.viewAcls.getOrElse(""))
             
ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse(""))
             
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-            LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
+            Some(LoadedAppUI(ui, updateProbe(appId, attemptId, 
attempt.fileSize)))
+          } else {
+            None
           }
+
         }
       }
     } catch {
@@ -392,28 +393,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
-
   /**
    * Replay the log files in the list and merge the list of old applications 
with new ones
    */
   private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
     val newAttempts = try {
-        val bus = new ReplayListenerBus()
-        val res = replay(fileStatus, bus)
-        res match {
-          case Some(r) => logDebug(s"Application log ${r.logPath} loaded 
successfully: $r")
-          case None => logWarning(s"Failed to load application log 
${fileStatus.getPath}. " +
-            "The application may have not started.")
-        }
-        res
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
-            e)
-          None
+      val eventsFilter: ReplayEventsFilter = { eventString =>
+        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+          eventString.startsWith(APPL_END_EVENT_PREFIX)
+      }
+
+      val logPath = fileStatus.getPath()
+
+      val appCompleted = isApplicationCompleted(fileStatus)
+
+      val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
+
+      // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
+      // try to show their UI.
+      if (appListener.appId.isDefined) {
+        val attemptInfo = new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appListener.appId.getOrElse(logPath.getName()),
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          fileStatus.getModificationTime(),
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted,
+          fileStatus.getLen()
+        )
+        fileToAppInfo(logPath) = attemptInfo
+        logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: 
$attemptInfo")
+        Some(attemptInfo)
+      } else {
+        logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+          "The application may have not started.")
+        None
       }
 
+    } catch {
+      case e: Exception =>
+        logError(
+          s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
+          e)
+        None
+    }
+
     if (newAttempts.isEmpty) {
       return
     }
@@ -550,12 +577,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   /**
-   * Replays the events in the specified log file and returns information 
about the associated
-   * application. Return `None` if the application ID cannot be located.
+   * Replays the events in the specified log file on the supplied 
`ReplayListenerBus`. Returns
+   * an `ApplicationEventListener` instance with event data captured from the 
replay.
+   * `ReplayEventsFilter` determines what events are replayed and can 
therefore limit the
+   * data captured in the returned `ApplicationEventListener` instance.
    */
   private def replay(
       eventLog: FileStatus,
-      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
+      appCompleted: Boolean,
+      bus: ReplayListenerBus,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): 
ApplicationEventListener = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     // Note that the eventLog may have *increased* in size since when we 
grabbed the filestatus,
@@ -567,30 +598,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
       val appListener = new ApplicationEventListener
-      val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
-      bus.replay(logInput, logPath.toString, !appCompleted)
-
-      // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
-      // try to show their UI.
-      if (appListener.appId.isDefined) {
-        val attemptInfo = new FsApplicationAttemptInfo(
-          logPath.getName(),
-          appListener.appName.getOrElse(NOT_STARTED),
-          appListener.appId.getOrElse(logPath.getName()),
-          appListener.appAttemptId,
-          appListener.startTime.getOrElse(-1L),
-          appListener.endTime.getOrElse(-1L),
-          eventLog.getModificationTime(),
-          appListener.sparkUser.getOrElse(NOT_STARTED),
-          appCompleted,
-          eventLog.getLen()
-        )
-        fileToAppInfo(logPath) = attemptInfo
-        Some(attemptInfo)
-      } else {
-        None
-      }
+      bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+      appListener
     } finally {
       logInput.close()
     }
@@ -675,6 +685,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
 private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  private val NOT_STARTED = "<Not Started>"
+
+  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
+
+  private val APPL_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationStart\""
+
+  private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/26ae5cfa/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index c65e7a2..2424586 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -44,18 +44,32 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
    * @param sourceName Filename (or other source identifier) from whence 
@logData is being read
    * @param maybeTruncated Indicate whether log file might be truncated (some 
abnormal situations
    *        encountered, log file might not finished writing) or not
+   * @param eventsFilter Filter function to select JSON event strings in the 
log data stream that
+   *        should be parsed and replayed. When not specified, all event 
strings in the log data
+   *        are parsed and replayed.
    */
   def replay(
       logData: InputStream,
       sourceName: String,
-      maybeTruncated: Boolean = false): Unit = {
+      maybeTruncated: Boolean = false,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+
     var currentLine: String = null
-    var lineNumber: Int = 1
+    var lineNumber: Int = 0
+
     try {
-      val lines = Source.fromInputStream(logData).getLines()
-      while (lines.hasNext) {
-        currentLine = lines.next()
+      val lineEntries = Source.fromInputStream(logData)
+        .getLines()
+        .zipWithIndex
+        .filter { case (line, _) => eventsFilter(line) }
+
+      while (lineEntries.hasNext) {
         try {
+          val entry = lineEntries.next()
+
+          currentLine = entry._1
+          lineNumber = entry._2 + 1
+
           postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
         } catch {
           case e: ClassNotFoundException if 
KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
@@ -64,14 +78,15 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
             logWarning(s"Dropped incompatible Structured Streaming log: 
$currentLine")
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that 
might be truncated
-            if (!maybeTruncated || lines.hasNext) {
+            // the last entry may not be the very last line in the event log, 
but we treat it
+            // as such in a best effort to replay the given input
+            if (!maybeTruncated || lineEntries.hasNext) {
               throw jpe
             } else {
               logWarning(s"Got JsonParseException from log file $sourceName" +
                 s" at line $lineNumber, the file might not have finished 
writing cleanly.")
             }
         }
-        lineNumber += 1
       }
     } catch {
       case ioe: IOException =>
@@ -84,8 +99,14 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
 
 }
 
+
 private[spark] object ReplayListenerBus {
 
+  type ReplayEventsFilter = (String) => Boolean
+
+  // utility filter that selects all event logs during replay
+  val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+
   /**
    * Classes that were removed. Structured Streaming doesn't use them any 
more. However, parsing
    * old json may fail and we can just ignore these failures.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to