This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7eeca02 [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the
blacklisted entries
7eeca02 is described below
commit 7eeca029404c8cc1e2c3e7ae8728b90582e25d76
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Jun 26 18:56:06 2019 +0000
[SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted
entries
## What changes were proposed in this pull request?
At Spark 2.4.0/2.3.2/2.2.3,
[SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated
access permission checks to the file system, and maintains a blacklist for all
event log files failed once at reading. The blacklisted log files are released
back after `CLEAN_INTERVAL_S` seconds.
However, the released files whose sizes don't changes are ignored forever
due to `info.fileSize < entry.getLen()` condition (previously
[here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454)
and now at
[shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571))
which returns `false` always when the size is the same with the exi [...]
This PR aims to remove the existing entry from `KVStore` when it goes to
the blacklist.
## How was this patch tested?
Pass the Jenkins with the updated test case.
Closes #24966 from dongjoon-hyun/SPARK-28157.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
---
.../apache/spark/deploy/history/FsHistoryProvider.scala | 3 +++
.../spark/deploy/history/FsHistoryProviderSuite.scala | 15 ++++++++++-----
2 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 98265ff..f2ee599 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -536,6 +536,9 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
+ // SPARK-28157 We should remove this blacklisted entry from the
KVStore
+ // to handle permission-only changes with the same file sizes
later.
+ listing.delete(classOf[LogInfo], path.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 791814b..571c6e3 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1122,17 +1122,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"),
1L, "test", None),
SparkListenerApplicationEnd(5L))
+ var isReadable = false
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied
file")).when(mockedFs).open(
- argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) ==
"accessdenied"))
+ argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) ==
"accessdenied" &&
+ !isReadable))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
- writeFile(accessDenied, true, None,
- SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L,
"test", None),
- SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
@@ -1140,8 +1139,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
+ isReadable = true
mockedProvider.cleanLogs()
- assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+ updateAndCheck(mockedProvider) { list =>
+ assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+ assert(list.exists(_.name == "accessDenied"))
+ assert(list.exists(_.name == "accessGranted"))
+ list.size should be(2)
+ }
}
test("check in-progress event logs absolute length") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]