This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 525e134efc77 [SPARK-51922][SS] Fix UTFDataFormatException thrown from 
StateStoreChangelogReaderFactory for v1
525e134efc77 is described below

commit 525e134efc774205ced8ca6a50ed58a9fd00ecb2
Author: Livia Zhu <livia....@databricks.com>
AuthorDate: Sat Apr 26 14:43:14 2025 +0900

    [SPARK-51922][SS] Fix UTFDataFormatException thrown from 
StateStoreChangelogReaderFactory for v1
    
    Catch the UTFDataFormatException thrown for v1 in the 
StateStoreChangelogReaderFactory and assign the version to 1.
    
    We should not throw this error.
    
    No
    
    Unit test
    
    No
    
    Closes #50721 from liviazhu-db/liviazhu-db/master.
    
    Authored-by: Livia Zhu <livia....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit b634978936499f58f8cb2e8ea16339feb02ffb52)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/execution/streaming/state/RocksDB.scala  |  2 +-
 .../streaming/state/StateStoreChangelog.scala          |  5 ++++-
 .../sql/execution/streaming/state/RocksDBSuite.scala   | 18 ++++++++++++++++++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index cfabc6f5ffbc..a16d5a100281 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -135,7 +135,7 @@ class RocksDB(
   private val nativeStats = rocksDbOptions.statistics()
 
   private val workingDir = createTempDir("workingDir")
-  private val fileManager = new RocksDBFileManager(dfsRootDir, 
createTempDir("fileManager"),
+  private[spark] val fileManager = new RocksDBFileManager(dfsRootDir, 
createTempDir("fileManager"),
     hadoopConf, conf.compressionCodec, loggingId = loggingId)
   private val byteArrayPair = new ByteArrayPair()
   private val commitLatencyMs = new mutable.HashMap[String, Long]()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index b4fbb5560f2f..bcaff4c60d08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -368,7 +368,10 @@ class StateStoreChangelogReaderFactory(
       // When there is no record being written in the changelog file in V1,
       // the file contains a single int -1 meaning EOF, then the above 
readUTF()
       // throws with EOFException and we return version 1.
-      case _: java.io.EOFException => 1
+      // Or if the first record in the changelog file in V1 has a large enough
+      // key, readUTF() will throw a UTFDataFormatException so we should return
+      // version 1 (SPARK-51922).
+      case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index f9b58fe31578..bd9c838eaa6a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3516,6 +3516,24 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  testWithChangelogCheckpointingEnabled("SPARK-51922 - Changelog writer v1 
with large key" +
+    " does not cause UTFDataFormatException") {
+    val remoteDir = Utils.createTempDir()
+
+    withDB(remoteDir.toString) { db =>
+      db.load(0)
+      val key = new Array[Char](98304).mkString("") // Large key that would 
trigger UTFException
+      // if handled incorrectly
+      db.put(key, "0")
+      db.commit()
+
+      val changelogReader = db.fileManager.getChangelogReader(1)
+      assert(changelogReader.version === 1)
+      val entries = changelogReader.toSeq
+      assert(entries.size == 1)
+    }
+  }
+
   private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = {
     val threadInfo = db.getAcquiredThreadInfo()
     assert(threadInfo != None,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to