This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 3686ee0e5242 [SPARK-56539][SS] Add state row format validation to 
prefixScan and rangeScan
3686ee0e5242 is described below

commit 3686ee0e5242ece013c03aa6a9d0b5af7096c82d
Author: Anupam Yadav <[email protected]>
AuthorDate: Wed May 20 06:50:15 2026 +0900

    [SPARK-56539][SS] Add state row format validation to prefixScan and 
rangeScan
    
    ### What changes were proposed in this pull request?
    
    This PR adds `validateStateRowFormat` calls to `prefixScan` and `rangeScan` 
in `RocksDBStateStoreProvider`, matching the existing pattern used by `get()`, 
`prefixScanWithMultiValues`, and `rangeScanWithMultiValues`.
    
    ### Why are the changes needed?
    
    `prefixScan` and `rangeScan` were missing the `validateStateRowFormat` 
check that all other read operations perform. This was identified during the 
review of SPARK-56369 (PR #55226) by viirya and filed as a follow-up by 
HeartSaVioR.
    
    Without this fix, state row format corruption could go undetected when data 
is read through `prefixScan` or `rangeScan` without a prior `get()` call.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is an internal validation improvement.
    
    ### How was this patch tested?
    
    Added two new tests in `RocksDBStateStoreSuite`:
    - `SPARK-56539: validate state row format in prefixScan`
    - `SPARK-56539: validate state row format in rangeScan`
    
    Both tests exercise the validation code path by calling 
`prefixScan`/`rangeScan` without a prior `get()` call (ensuring `isValidated` 
is false).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. The implementation approach was brainstormed with AI. The tests were 
generated and run by AI with human review.
    Model: Claude Opus 4.7
    
    Closes #55468 from yadavay-amzn/fix/SPARK-56539-state-row-validation.
    
    Authored-by: Anupam Yadav <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../state/RocksDBStateStoreProvider.scala          | 10 +++
 .../streaming/state/RocksDBStateStoreSuite.scala   | 82 ++++++++++++++++++++++
 2 files changed, 92 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index b3d734c71f91..c181130eec94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -516,6 +516,11 @@ private[sql] class RocksDBStateStoreProvider
       val iter = rocksDbIter.map { kv =>
         rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
           kvEncoder._2.decodeValue(kv.value))
+        if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+          StateStoreProvider.validateStateRowFormat(
+            rowPair.key, keySchema, rowPair.value, valueSchema, stateStoreId, 
storeConf)
+          isValidated = true
+        }
         rowPair
       }
 
@@ -575,6 +580,11 @@ private[sql] class RocksDBStateStoreProvider
       val iter = rocksDbIter.map { kv =>
         rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
           kvEncoder._2.decodeValue(kv.value))
+        if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+          StateStoreProvider.validateStateRowFormat(
+            rowPair.key, keySchema, rowPair.value, valueSchema, stateStoreId, 
storeConf)
+          isValidated = true
+        }
         rowPair
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index ed8c22740b32..e501366b7f98 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -1856,6 +1856,88 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  test("SPARK-56539: prefixScan triggers validateStateRowFormat on schema 
mismatch") {
+    // Write data with correct schema, then reopen with a mismatched 
valueSchema.
+    // prefixScan should trigger validateStateRowFormat and throw on the first 
iteration.
+    val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+    val conf = new Configuration
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+    // Step 1: Write data with correct schema and commit
+    val provider1 = new RocksDBStateStoreProvider()
+    provider1.init(storeId, keySchema, valueSchema,
+      PrefixKeyScanStateEncoderSpec(keySchema, 1), useColumnFamilies = false,
+      new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+      stateSchemaProvider = Some(new TestStateSchemaProvider))
+    val store1 = provider1.getStore(0)
+    store1.put(dataToKeyRow("a", 1), dataToValueRow(1))
+    store1.commit()
+    provider1.close()
+
+    // Step 2: Reopen with a wrong valueSchema (StringType instead of 
IntegerType)
+    // The stored IntegerType value bytes will be misinterpreted as a 
variable-length
+    // StringType offset/size, causing structural integrity validation to fail.
+    val wrongValueSchema = StructType(Seq(StructField("v1", StringType, true)))
+    val provider2 = new RocksDBStateStoreProvider()
+    provider2.init(storeId, keySchema, wrongValueSchema,
+      PrefixKeyScanStateEncoderSpec(keySchema, 1), useColumnFamilies = false,
+      new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+      stateSchemaProvider = Some(new TestStateSchemaProvider))
+    val store2 = provider2.getStore(1)
+    try {
+      // prefixScan should trigger validation and throw because stored value 
bytes
+      // are not structurally valid for the declared StringType schema
+      intercept[StateStoreValueRowFormatValidationFailure] {
+        store2.prefixScan(dataToPrefixKeyRow("a")).toSeq
+      }
+    } finally {
+      store2.abort()
+      provider2.close()
+    }
+  }
+
+  test("SPARK-56539: rangeScan triggers validateStateRowFormat on schema 
mismatch") {
+    // Write data with correct schema, then reopen with a mismatched 
valueSchema.
+    // rangeScan should trigger validateStateRowFormat and throw on the first 
iteration.
+    val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+    val conf = new Configuration
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+    // Step 1: Write data with correct schema and commit
+    val provider1 = new RocksDBStateStoreProvider()
+    provider1.init(storeId, keySchemaWithRangeScan, valueSchema,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
+      useColumnFamilies = false,
+      new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+      stateSchemaProvider = Some(new TestStateSchemaProvider))
+    val store1 = provider1.getStore(0)
+    store1.put(dataToKeyRowWithRangeScan(10L, "a"), dataToValueRow(10))
+    store1.commit()
+    provider1.close()
+
+    // Step 2: Reopen with a wrong valueSchema (StringType instead of 
IntegerType)
+    val wrongValueSchema = StructType(Seq(StructField("v1", StringType, true)))
+    val provider2 = new RocksDBStateStoreProvider()
+    provider2.init(storeId, keySchemaWithRangeScan, wrongValueSchema,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
+      useColumnFamilies = false,
+      new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+      stateSchemaProvider = Some(new TestStateSchemaProvider))
+    val store2 = provider2.getStore(1)
+    try {
+      // rangeScan should trigger validation and throw because stored value 
bytes
+      // are not structurally valid for the declared StringType schema
+      intercept[StateStoreValueRowFormatValidationFailure] {
+        store2.rangeScan(
+          Some(dataToKeyRowWithRangeScan(10L, "a")),
+          Some(dataToKeyRowWithRangeScan(20L, "a"))).toSeq
+      }
+    } finally {
+      store2.abort()
+      provider2.close()
+    }
+  }
+
   testWithColumnFamiliesAndEncodingTypes(
     "rocksdb key and value schema encoders for column families",
     TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to