This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d9669bd [SPARK-33146][CORE] Check for non-fatal errors when loading
new applications in SHS
d9669bd is described below
commit d9669bdf0ff4ed9951d7077b8dc9ad94507615c5
Author: Adam Binford <[email protected]>
AuthorDate: Thu Oct 15 11:59:29 2020 +0900
[SPARK-33146][CORE] Check for non-fatal errors when loading new
applications in SHS
### What changes were proposed in this pull request?
Adds an additional check for non-fatal errors when attempting to add a new
entry to the history server application listing.
### Why are the changes needed?
A bad rolling event log folder (missing appstatus file or no log files)
would cause no applications to be loaded by the Spark history server. Figuring
out why invalid event log folders are created in the first place will be
addressed in separate issues, this just lets the history server skip the
invalid folder and successfully load all the valid applications.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UT
Closes #30037 from Kimahriman/bug/rolling-log-crashing-history.
Authored-by: Adam Binford <[email protected]>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
(cherry picked from commit 9ab0ec4e38e5df0537b38cb0f89e004ad57bec90)
Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
---
.../spark/deploy/history/FsHistoryProvider.scala | 3 ++
.../deploy/history/FsHistoryProviderSuite.scala | 49 ++++++++++++++++++++++
2 files changed, 52 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c262152..5970708 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -526,6 +526,9 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
reader.fileSizeForLastIndex > 0
} catch {
case _: FileNotFoundException => false
+ case NonFatal(e) =>
+ logWarning(s"Error while reading new log
${reader.rootPath}", e)
+ false
}
case _: FileNotFoundException =>
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c2f34fc..f3beb35 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1470,6 +1470,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
}
}
+ test("SPARK-33146: don't let one bad rolling log folder prevent loading
other applications") {
+ withTempDir { dir =>
+ val conf = createTestConf(true)
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+ val provider = new FsHistoryProvider(conf)
+
+ val writer = new RollingEventLogFilesWriter("app", None, dir.toURI,
conf, hadoopConf)
+ writer.start()
+
+ writeEventsToRollingWriter(writer, Seq(
+ SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(dir.listFiles().size === 1)
+ assert(provider.getListing.length === 1)
+
+ // Manually delete the appstatus file to make an invalid rolling event
log
+ val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new
Path(writer.logPath),
+ "app", None, true)
+ fs.delete(appStatusPath, false)
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(provider.getListing.length === 0)
+
+ // Create a new application
+ val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI,
conf, hadoopConf)
+ writer2.start()
+ writeEventsToRollingWriter(writer2, Seq(
+ SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+ // Both folders exist but only one application found
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(provider.getListing.length === 1)
+ assert(dir.listFiles().size === 2)
+
+ // Make sure a new provider sees the valid application
+ provider.stop()
+ val newProvider = new FsHistoryProvider(conf)
+ newProvider.checkForLogs()
+ assert(newProvider.getListing.length === 1)
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform
checks on the updated
* app list. Example:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]