This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new be08d581b202 [SPARK-52553][SS] Fix NumberFormatException when reading v1 changelog be08d581b202 is described below commit be08d581b202622c8dfc4bca5215ba5739d7004f Author: micheal-o <micheal.okut...@gmail.com> AuthorDate: Tue Jun 24 13:33:55 2025 +0900 [SPARK-52553][SS] Fix NumberFormatException when reading v1 changelog ### What changes were proposed in this pull request? When trying to read the changelog version, the reader factory throws NumberFormatException for v1 changelog, if it decodes the first few bytes in the file as UTF string e.g. "v)" ### Why are the changes needed? Bug fix. It will cause query failure. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I added a test that repros this issue and now passing with this fix ### Was this patch authored or co-authored using generative AI tooling? No Closes #51255 from micheal-o/fix_changelog_numexcept. Authored-by: micheal-o <micheal.okut...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/state/StateStoreChangelog.scala | 6 +++- .../execution/streaming/state/RocksDBSuite.scala | 32 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index bcaff4c60d08..0aeeeec97bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -371,7 +371,11 @@ class StateStoreChangelogReaderFactory( // Or if the first record in the changelog file in V1 has a large enough // key, readUTF() will throw a UTFDataFormatException so we should return // version 1 (SPARK-51922). - case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1 + case _: java.io.EOFException | + _: java.io.UTFDataFormatException | + // SPARK-52553 - Can throw this if the bytes in the file is coincidentally + // decoded as UTF string like "v)". + _: NumberFormatException => 1 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index bd9c838eaa6a..5d1ed9b8622a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3534,6 +3534,38 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithChangelogCheckpointingEnabled("SPARK-52553 - v1 changelog with invalid version number" + + " does not cause NumberFormatException") { + withTempDir { dir => + withDB(dir.getCanonicalPath) { db => + // In v1 changelog, the first key size would be written first in the file. + // We want the first few bytes in the changelog file to represent the UTF-8 string "v)" + // Because it has a prefix v, the changelog factory would try to parse ) as the version. + val dfsChangelogFileMethod = PrivateMethod[Path](Symbol("dfsChangelogFile")) + val changelogFilePath = db.fileManager invokePrivate dfsChangelogFileMethod(1L, None) + + val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) + val fm = db.fileManager invokePrivate fileManagerMethod() + + val codecMethod = PrivateMethod[CompressionCodec](Symbol("codec")) + val codec = db.fileManager invokePrivate codecMethod() + + // Write a changelog file (1.changelog) with the desired content + val output = new DataOutputStream(codec.compressedOutputStream( + fm.createAtomic(changelogFilePath, overwriteIfPossible = true))) + // Write the string "v)" + output.writeUTF("v)") + output.close() + + // Now try to read the changelog file using changelog reader + // It shouldn't throw NumberFormatException + val changelogReader = db.fileManager.getChangelogReader(1) + assert(changelogReader.version === 1) + changelogReader.closeIfNeeded() + } + } + } + private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = { val threadInfo = db.getAcquiredThreadInfo() assert(threadInfo != None, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org