This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b63497893649 [SPARK-51922][SS] Fix UTFDataFormatException thrown from 
StateStoreChangelogReaderFactory for v1
b63497893649 is described below

commit b634978936499f58f8cb2e8ea16339feb02ffb52
Author: Livia Zhu <livia....@databricks.com>
AuthorDate: Sat Apr 26 14:43:14 2025 +0900

    [SPARK-51922][SS] Fix UTFDataFormatException thrown from 
StateStoreChangelogReaderFactory for v1
    
    ### What changes were proposed in this pull request?
    
    Catch the UTFDataFormatException thrown for v1 in the 
StateStoreChangelogReaderFactory and assign the version to 1.
    
    ### Why are the changes needed?
    
    We should not throw this error.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50721 from liviazhu-db/liviazhu-db/master.
    
    Authored-by: Livia Zhu <livia....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/execution/streaming/state/RocksDB.scala  |  2 +-
 .../streaming/state/StateStoreChangelog.scala          |  5 ++++-
 .../sql/execution/streaming/state/RocksDBSuite.scala   | 18 ++++++++++++++++++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 07553f51c60e..6b3bec207703 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -153,7 +153,7 @@ class RocksDB(
     )
   }
 
-  private val fileManager = createFileManager(dfsRootDir, 
createTempDir("fileManager"),
+  private[spark] val fileManager = createFileManager(dfsRootDir, 
createTempDir("fileManager"),
     hadoopConf, conf.compressionCodec, loggingId = loggingId)
   private val byteArrayPair = new ByteArrayPair()
   private val commitLatencyMs = new mutable.HashMap[String, Long]()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index b4fbb5560f2f..bcaff4c60d08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -368,7 +368,10 @@ class StateStoreChangelogReaderFactory(
       // When there is no record being written in the changelog file in V1,
       // the file contains a single int -1 meaning EOF, then the above 
readUTF()
       // throws with EOFException and we return version 1.
-      case _: java.io.EOFException => 1
+      // Or if the first record in the changelog file in V1 has a large enough
+      // key, readUTF() will throw a UTFDataFormatException so we should return
+      // version 1 (SPARK-51922).
+      case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index f9b58fe31578..bd9c838eaa6a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3516,6 +3516,24 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  testWithChangelogCheckpointingEnabled("SPARK-51922 - Changelog writer v1 
with large key" +
+    " does not cause UTFDataFormatException") {
+    val remoteDir = Utils.createTempDir()
+
+    withDB(remoteDir.toString) { db =>
+      db.load(0)
+      val key = new Array[Char](98304).mkString("") // Large key that would 
trigger UTFException
+      // if handled incorrectly
+      db.put(key, "0")
+      db.commit()
+
+      val changelogReader = db.fileManager.getChangelogReader(1)
+      assert(changelogReader.version === 1)
+      val entries = changelogReader.toSeq
+      assert(entries.size == 1)
+    }
+  }
+
   private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = {
     val threadInfo = db.getAcquiredThreadInfo()
     assert(threadInfo != None,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to