This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c26d0bc726ea [SPARK-55701][SS] Fix race condition in 
CompactibleFileStreamLog.allFiles
c26d0bc726ea is described below

commit c26d0bc726eaf04f1270cef36c340df098b486e2
Author: zeruibao <[email protected]>
AuthorDate: Fri Feb 27 14:25:30 2026 +0900

    [SPARK-55701][SS] Fix race condition in CompactibleFileStreamLog.allFiles
    
    ### What changes were proposed in this pull request?
    Changed the exception type thrown in `CompactibleFileStreamLog.allFiles()` 
from `IllegalStateException` to `FileNotFoundException` when a batch metadata 
file is missing (line 270). Since `FileNotFoundException` extends 
`IOException`, the existing retry loop (line 277) now catches this case and 
retries with an updated `latestId`.
    
    ### Why are the changes needed?
    There is a race condition between a batch reader (e.g., `DESCRIBE TABLE` 
via Thrift server) and a streaming writer performing compaction + cleanup 
concurrently:
    
    1. The reader calls `getLatestBatchId()` and observes `latestId = N`.
    2. The writer completes a new compaction batch and `deleteExpiredLog` 
removes old batch files.
    3. The reader tries to read the now-deleted batch files based on the stale 
`latestId`.
    
    The `allFiles()` method already has a retry loop designed to handle this 
exact scenario — it catches `IOException`, refreshes `latestId`, and retries. 
However, the missing-file case was throwing `IllegalStateException`, which is 
not a subclass of `IOException`, so it escaped the retry loop entirely and 
surfaced as a fatal error to the user.
    
    The fix changes the exception to `FileNotFoundException` so the existing 
retry logic handles it correctly. The safety check on re-throw (lines 284-286) 
ensures that if no newer compaction exists, the exception is still propagated 
rather than silently swallowed.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Get help with Claude 4.6 Opus but also review it carefully.
    
    Closes #54500 from zeruibao/SPARK-55701-fix-race-condition.
    
    Authored-by: zeruibao <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../runtime/CompactibleFileStreamLog.scala         |  2 +-
 .../streaming/CompactibleFileStreamLogSuite.scala  | 49 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
index 8a90982b7c0c..7efe7b52c7ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
@@ -267,7 +267,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
           val logs =
             getAllValidBatches(latestId, compactInterval).flatMap { id =>
               filterInBatch(id)(shouldRetain(_, curTime)).getOrElse {
-                throw new IllegalStateException(
+                throw new FileNotFoundException(
                   s"${batchIdToPath(id)} doesn't exist " +
                     s"(latestId: $latestId, compactInterval: 
$compactInterval)")
               }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 5a1608cb6165..b1b1976dff8b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -258,6 +258,55 @@ class CompactibleFileStreamLogSuite extends 
SharedSparkSession {
       })
   }
 
+  test("allFiles retries when compaction deletes batch files during read") {
+    withTempDir { dir =>
+      // Override getLatestBatchId() so the first call returns a stale value 
(4),
+      // simulating the reader listing the directory before compaction batch 5 
is visible.
+      // The retry in allFiles() calls super.getLatestBatchId() (statically 
bound to
+      // HDFSMetadataLog), which bypasses this override and reads the real 
filesystem.
+      @volatile var staleLatestId: Option[Long] = None
+      val compactibleLog = new FakeCompactibleFileStreamLog(
+        FakeCompactibleFileStreamLog.VERSION,
+        _fileCleanupDelayMs = Long.MaxValue,
+        _defaultCompactInterval = 3,
+        _defaultMinBatchesToRetain = 1,
+        spark,
+        dir.getCanonicalPath) {
+        override def getLatestBatchId(): Option[Long] = {
+          staleLatestId match {
+            case some @ Some(_) =>
+              staleLatestId = None
+              some
+            case None =>
+              super.getLatestBatchId()
+          }
+        }
+      }
+
+      val fs = 
compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+      for (batchId <- 0 to 5) {
+        compactibleLog.add(batchId, Array("path_" + batchId))
+      }
+
+      // Delete old files as if a concurrent writer's deleteExpiredLog removed 
them
+      // after writing compaction batch 5.
+      fs.delete(compactibleLog.batchIdToPath(2), false)
+      fs.delete(compactibleLog.batchIdToPath(3), false)
+      fs.delete(compactibleLog.batchIdToPath(4), false)
+
+      // Inject stale latestId=4 right before calling allFiles().
+      // First iteration: getLatestBatchId() returns 4 (stale).
+      //   getAllValidBatches(4, 3) = [2, 3, 4]. Batch 2 is deleted -> 
FileNotFoundException.
+      //   Retry: super.getLatestBatchId() reads filesystem -> returns 5.
+      //   nextCompactionBatchId(4, 3) = 5, and 5 >= 5, so retry proceeds.
+      // Second iteration: getAllValidBatches(5, 3) = [5]. Reads 5.compact -> 
success.
+      staleLatestId = Some(4L)
+      val result = compactibleLog.allFiles()
+      assert(result === (0 to 5).map("path_" + _))
+    }
+  }
+
   private def withFakeCompactibleFileStreamLog(
     fileCleanupDelayMs: Long,
     defaultCompactInterval: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to