This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 960375f [SPARK-28157][CORE][2.3] Make SHS clear KVStore `LogInfo`s
for the blacklisted entries
960375f is described below
commit 960375f6e9ec3a151af208c0ebc354408c92534c
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Jun 27 16:58:39 2019 +0000
[SPARK-28157][CORE][2.3] Make SHS clear KVStore `LogInfo`s for the
blacklisted entries
## What changes were proposed in this pull request?
At Spark 2.4.0/2.3.2/2.2.3,
[SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated
access permission checks to the file system, and maintains a blacklist for all
event log files failed once at reading. The blacklisted log files are released
back after `CLEAN_INTERVAL_S` seconds.
However, the released files whose sizes don't changes are ignored forever
due to `info.fileSize < entry.getLen()` condition (previously
[here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454)
and now at
[shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571))
which returns `false` always when the size is the same with the exi [...]
This PR aims to remove the existing entry from `KVStore` when it goes to
the blacklist.
## How was this patch tested?
Pass the Jenkins with the updated test case.
Closes #24977 from dongjoon-hyun/SPARK-28157-2.3.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
---
.../apache/spark/deploy/history/FsHistoryProvider.scala | 3 +++
.../spark/deploy/history/FsHistoryProviderSuite.scala | 14 +++++++++-----
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 23572eb..98b64a5 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -497,6 +497,9 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
+ // SPARK-28157 We should remove this blacklisted entry from the
KVStore
+ // to handle permission-only changes with the same file sizes
later.
+ listing.delete(classOf[LogInfo], path.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index bf2b044..24daad2 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -786,11 +786,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"),
1L, "test", None),
SparkListenerApplicationEnd(5L))
+ var isReadable = false
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied
file")).when(mockedFs).open(
argThat(new ArgumentMatcher[Path]() {
override def matches(path: Any): Boolean = {
- path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
+ path.asInstanceOf[Path].getName.toLowerCase == "accessdenied" &&
!isReadable
}
}))
val mockedProvider = spy(provider)
@@ -798,9 +799,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
- writeFile(accessDenied, true, None,
- SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L,
"test", None),
- SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
@@ -808,8 +806,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
+ isReadable = true
mockedProvider.cleanLogs()
- assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+ updateAndCheck(mockedProvider) { list =>
+ assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+ assert(list.exists(_.name == "accessDenied"))
+ assert(list.exists(_.name == "accessGranted"))
+ list.size should be(2)
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]