This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 56bca499e47f [SPARK-51700][SS] Fix incorrect logging when no files are
eligible for deletion in RocksDBFileManager
56bca499e47f is described below
commit 56bca499e47fe4ef4f91846421261b3ac1969f35
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Thu Apr 3 15:16:42 2025 +0900
[SPARK-51700][SS] Fix incorrect logging when no files are eligible for
deletion in RocksDBFileManager
### What changes were proposed in this pull request?
Fix incorrect logging when no files are eligible for deletion in
RocksDBFileManager
### Why are the changes needed?
Without this, when no files are eligible, we are logging the
wrong(negative) value in the logs
```
25/04/01 20:16:23 INFO RocksDBFileManager
StateStoreId(opId=0,partId=138,name=default): Skipping deleting files. Need at
least 30 stale versions for batch deletion but found only -90.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50502 from anishshri-db/task/SPARK-51700.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit a222f78cc2ba8d565766a0d711fc864c3ab86579)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/execution/streaming/state/RocksDBFileManager.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index bb1198dfccaf..71d6ec3d4da9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.{mutable, Map}
+import scala.math._
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
@@ -507,7 +508,9 @@ class RocksDBFileManager(
logInfo(log"Estimated maximum version is " +
log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION,
minSeenVersion)}")
- val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 -
numVersionsToRetain
+ // If the number of versions to delete is negative, that means that
none of the versions
+ // are eligible for deletion and we set the variable to 0
+ val versionsToDelete = max(maxSeenVersion.get - minSeenVersion + 1 -
numVersionsToRetain, 0)
if (versionsToDelete < minVersionsToDelete) {
logInfo(log"Skipping deleting files." +
log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE,
minVersionsToDelete)}" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]