This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b63497893649 [SPARK-51922][SS] Fix UTFDataFormatException thrown from StateStoreChangelogReaderFactory for v1 b63497893649 is described below commit b634978936499f58f8cb2e8ea16339feb02ffb52 Author: Livia Zhu <livia....@databricks.com> AuthorDate: Sat Apr 26 14:43:14 2025 +0900 [SPARK-51922][SS] Fix UTFDataFormatException thrown from StateStoreChangelogReaderFactory for v1 ### What changes were proposed in this pull request? Catch the UTFDataFormatException thrown for v1 in the StateStoreChangelogReaderFactory and assign the version to 1. ### Why are the changes needed? We should not throw this error. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50721 from liviazhu-db/liviazhu-db/master. Authored-by: Livia Zhu <livia....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/execution/streaming/state/RocksDB.scala | 2 +- .../streaming/state/StateStoreChangelog.scala | 5 ++++- .../sql/execution/streaming/state/RocksDBSuite.scala | 18 ++++++++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 07553f51c60e..6b3bec207703 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -153,7 +153,7 @@ class RocksDB( ) } - private val fileManager = createFileManager(dfsRootDir, createTempDir("fileManager"), + private[spark] val fileManager = createFileManager(dfsRootDir, createTempDir("fileManager"), hadoopConf, conf.compressionCodec, loggingId = loggingId) private val byteArrayPair = new ByteArrayPair() private val commitLatencyMs = new mutable.HashMap[String, Long]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index b4fbb5560f2f..bcaff4c60d08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -368,7 +368,10 @@ class StateStoreChangelogReaderFactory( // When there is no record being written in the changelog file in V1, // the file contains a single int -1 meaning EOF, then the above readUTF() // throws with EOFException and we return version 1. - case _: java.io.EOFException => 1 + // Or if the first record in the changelog file in V1 has a large enough + // key, readUTF() will throw a UTFDataFormatException so we should return + // version 1 (SPARK-51922). + case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index f9b58fe31578..bd9c838eaa6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3516,6 +3516,24 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithChangelogCheckpointingEnabled("SPARK-51922 - Changelog writer v1 with large key" + + " does not cause UTFDataFormatException") { + val remoteDir = Utils.createTempDir() + + withDB(remoteDir.toString) { db => + db.load(0) + val key = new Array[Char](98304).mkString("") // Large key that would trigger UTFException + // if handled incorrectly + db.put(key, "0") + db.commit() + + val changelogReader = db.fileManager.getChangelogReader(1) + assert(changelogReader.version === 1) + val entries = changelogReader.toSeq + assert(entries.size == 1) + } + } + private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = { val threadInfo = db.getAcquiredThreadInfo() assert(threadInfo != None, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org