This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 525e134efc77 [SPARK-51922][SS] Fix UTFDataFormatException thrown from StateStoreChangelogReaderFactory for v1 525e134efc77 is described below commit 525e134efc774205ced8ca6a50ed58a9fd00ecb2 Author: Livia Zhu <livia....@databricks.com> AuthorDate: Sat Apr 26 14:43:14 2025 +0900 [SPARK-51922][SS] Fix UTFDataFormatException thrown from StateStoreChangelogReaderFactory for v1 Catch the UTFDataFormatException thrown for v1 in the StateStoreChangelogReaderFactory and assign the version to 1. We should not throw this error. No Unit test No Closes #50721 from liviazhu-db/liviazhu-db/master. Authored-by: Livia Zhu <livia....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit b634978936499f58f8cb2e8ea16339feb02ffb52) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/execution/streaming/state/RocksDB.scala | 2 +- .../streaming/state/StateStoreChangelog.scala | 5 ++++- .../sql/execution/streaming/state/RocksDBSuite.scala | 18 ++++++++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index cfabc6f5ffbc..a16d5a100281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -135,7 +135,7 @@ class RocksDB( private val nativeStats = rocksDbOptions.statistics() private val workingDir = createTempDir("workingDir") - private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), + private[spark] val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), hadoopConf, conf.compressionCodec, loggingId = loggingId) private val byteArrayPair = new ByteArrayPair() private val commitLatencyMs = new mutable.HashMap[String, Long]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index b4fbb5560f2f..bcaff4c60d08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -368,7 +368,10 @@ class StateStoreChangelogReaderFactory( // When there is no record being written in the changelog file in V1, // the file contains a single int -1 meaning EOF, then the above readUTF() // throws with EOFException and we return version 1. - case _: java.io.EOFException => 1 + // Or if the first record in the changelog file in V1 has a large enough + // key, readUTF() will throw a UTFDataFormatException so we should return + // version 1 (SPARK-51922). + case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index f9b58fe31578..bd9c838eaa6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3516,6 +3516,24 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithChangelogCheckpointingEnabled("SPARK-51922 - Changelog writer v1 with large key" + + " does not cause UTFDataFormatException") { + val remoteDir = Utils.createTempDir() + + withDB(remoteDir.toString) { db => + db.load(0) + val key = new Array[Char](98304).mkString("") // Large key that would trigger UTFException + // if handled incorrectly + db.put(key, "0") + db.commit() + + val changelogReader = db.fileManager.getChangelogReader(1) + assert(changelogReader.version === 1) + val entries = changelogReader.toSeq + assert(entries.size == 1) + } + } + private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = { val threadInfo = db.getAcquiredThreadInfo() assert(threadInfo != None, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org