This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new be08d581b202 [SPARK-52553][SS] Fix NumberFormatException when reading 
v1 changelog
be08d581b202 is described below

commit be08d581b202622c8dfc4bca5215ba5739d7004f
Author: micheal-o <micheal.okut...@gmail.com>
AuthorDate: Tue Jun 24 13:33:55 2025 +0900

    [SPARK-52553][SS] Fix NumberFormatException when reading v1 changelog
    
    ### What changes were proposed in this pull request?
    When trying to read the changelog version, the reader factory throws
    NumberFormatException for v1 changelog, if it decodes the first few bytes 
in the file as UTF string e.g. "v)"
    
    ### Why are the changes needed?
    Bug fix. It will cause query failure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    I added a test that repros this issue and now passing with this fix
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51255 from micheal-o/fix_changelog_numexcept.
    
    Authored-by: micheal-o <micheal.okut...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/StateStoreChangelog.scala      |  6 +++-
 .../execution/streaming/state/RocksDBSuite.scala   | 32 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index bcaff4c60d08..0aeeeec97bb7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -371,7 +371,11 @@ class StateStoreChangelogReaderFactory(
       // Or if the first record in the changelog file in V1 has a large enough
       // key, readUTF() will throw a UTFDataFormatException so we should return
       // version 1 (SPARK-51922).
-      case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1
+      case _: java.io.EOFException |
+           _: java.io.UTFDataFormatException |
+           // SPARK-52553 - Can throw this if the bytes in the file is 
coincidentally
+           // decoded as UTF string like "v)".
+           _: NumberFormatException => 1
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index bd9c838eaa6a..5d1ed9b8622a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3534,6 +3534,38 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  testWithChangelogCheckpointingEnabled("SPARK-52553 - v1 changelog with 
invalid version number" +
+    " does not cause NumberFormatException") {
+    withTempDir { dir =>
+      withDB(dir.getCanonicalPath) { db =>
+        // In v1 changelog, the first key size would be written first in the 
file.
+        // We want the first few bytes in the changelog file to represent the 
UTF-8 string "v)"
+        // Because it has a prefix v, the changelog factory would try to parse 
) as the version.
+        val dfsChangelogFileMethod = 
PrivateMethod[Path](Symbol("dfsChangelogFile"))
+        val changelogFilePath = db.fileManager invokePrivate 
dfsChangelogFileMethod(1L, None)
+
+        val fileManagerMethod = 
PrivateMethod[CheckpointFileManager](Symbol("fm"))
+        val fm = db.fileManager invokePrivate fileManagerMethod()
+
+        val codecMethod = PrivateMethod[CompressionCodec](Symbol("codec"))
+        val codec = db.fileManager invokePrivate codecMethod()
+
+        // Write a changelog file (1.changelog) with the desired content
+        val output = new DataOutputStream(codec.compressedOutputStream(
+          fm.createAtomic(changelogFilePath, overwriteIfPossible = true)))
+        // Write the string "v)"
+        output.writeUTF("v)")
+        output.close()
+
+        // Now try to read the changelog file using changelog reader
+        // It shouldn't throw NumberFormatException
+        val changelogReader = db.fileManager.getChangelogReader(1)
+        assert(changelogReader.version === 1)
+        changelogReader.closeIfNeeded()
+      }
+    }
+  }
+
   private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = {
     val threadInfo = db.getAcquiredThreadInfo()
     assert(threadInfo != None,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to