This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c04dc0486076 [SPARK-51373][SS] Removing extra copy for column family 
prefix from 'ReplyChangelog'
c04dc0486076 is described below

commit c04dc048607605c4bc2dba8372ade6899f62a4ce
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Tue Mar 4 17:27:58 2025 +0900

    [SPARK-51373][SS] Removing extra copy for column family prefix from 
'ReplyChangelog'
    
    ### What changes were proposed in this pull request?
    
    There is currently an extra copy when column families are enabled to first 
remove the column family prefix, and then one to add this column family prefix 
back when replaying the changelog. Because we don't do anything with this 
prefix or the raw bytes, we don't need to add and remove and can instead just 
passthrough.
    
    ### Why are the changes needed?
    
    This change is needed for performance reasons for the TransformWithState 
operator - extra copies incur performance hit, and this change removes these 
extra copies.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests are sufficient, as we are not adding any new 
functionality.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50119 from ericm-db/changelog-copy.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit c2f2be68dd09db0233ba67c35644b311233e501a)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 24 +++---
 .../execution/streaming/state/RocksDBSuite.scala   | 89 ++++++++++++++++++++++
 2 files changed, 103 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 820322d1e0ee..1c04398a4ed7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -671,16 +671,15 @@ class RocksDB(
 
         if (useColumnFamilies) {
           changelogReader.foreach { case (recordType, key, value) =>
-            val (keyWithoutPrefix, cfName) = decodeStateRowWithPrefix(key)
             recordType match {
               case RecordType.PUT_RECORD =>
-                put(keyWithoutPrefix, value, cfName)
+                put(key, value, includesPrefix = true)
 
               case RecordType.DELETE_RECORD =>
-                remove(keyWithoutPrefix, cfName)
+                remove(key, includesPrefix = true)
 
               case RecordType.MERGE_RECORD =>
-                merge(keyWithoutPrefix, value, cfName)
+                merge(key, value, includesPrefix = true)
             }
           }
         } else {
@@ -801,8 +800,9 @@ class RocksDB(
   def put(
       key: Array[Byte],
       value: Array[Byte],
-      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    val keyWithPrefix = if (useColumnFamilies) {
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+      includesPrefix: Boolean = false): Unit = {
+    val keyWithPrefix = if (useColumnFamilies && !includesPrefix) {
       encodeStateRowWithPrefix(key, cfName)
     } else {
       key
@@ -827,8 +827,9 @@ class RocksDB(
   def merge(
       key: Array[Byte],
       value: Array[Byte],
-      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    val keyWithPrefix = if (useColumnFamilies) {
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+      includesPrefix: Boolean = false): Unit = {
+    val keyWithPrefix = if (useColumnFamilies && !includesPrefix) {
       encodeStateRowWithPrefix(key, cfName)
     } else {
       key
@@ -843,8 +844,11 @@ class RocksDB(
    * Remove the key if present.
    * @note This update is not committed to disk until commit() is called.
    */
-  def remove(key: Array[Byte], cfName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    val keyWithPrefix = if (useColumnFamilies) {
+  def remove(
+      key: Array[Byte],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+      includesPrefix: Boolean = false): Unit = {
+    val keyWithPrefix = if (useColumnFamilies && !includesPrefix) {
       encodeStateRowWithPrefix(key, cfName)
     } else {
       key
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 50240c0605e8..475f8b116830 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -1196,6 +1196,95 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  testWithColumnFamilies(
+    "RocksDB: test includesPrefix parameter during changelog replay",
+    TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
+
+    // Only test when column families are enabled, as the includesPrefix 
parameter
+    // is only relevant in that case
+    if (colFamiliesEnabled) {
+      val remoteDir = Utils.createTempDir().toString
+      val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+      withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
+        // Create a test column family
+        val testCfName = "test_cf"
+        db.createColFamilyIfAbsent(testCfName, isInternal = false)
+
+        // Write initial data
+        db.load(0)
+        db.put("key1", "value1", StateStore.DEFAULT_COL_FAMILY_NAME)
+        db.put("key2", "value2", testCfName)
+        db.commit()
+
+        // Get the encoded keys with column family prefixes
+        val keyWithPrefix1 = getKeyWithPrefix(db, "key1", 
StateStore.DEFAULT_COL_FAMILY_NAME)
+        val keyWithPrefix2 = getKeyWithPrefix(db, "key2", testCfName)
+
+        // Pretend we're replaying changelog with already-prefixed keys
+        // Throughout this test, we will load version 0 and the latest version
+        // in order to ensure that the changelog files are read from and
+        // replayed
+        db.load(0)
+        db.load(1)
+
+        // Use the includesPrefix=true parameter with keys that already have 
prefixes
+        db.put(keyWithPrefix1, "updated1", includesPrefix = true)
+        db.put(keyWithPrefix2, "updated2", includesPrefix = true)
+        db.commit()
+
+        // Verify the updates were applied correctly
+        db.load(0)
+        db.load(2)
+        assert(toStr(db.get("key1", StateStore.DEFAULT_COL_FAMILY_NAME)) === 
"updated1")
+        assert(toStr(db.get("key2", testCfName)) === "updated2")
+
+        // Test remove with includesPrefix
+        db.remove(keyWithPrefix1, includesPrefix = true)
+        db.remove(keyWithPrefix2, includesPrefix = true)
+        db.commit()
+
+        // Verify removals worked
+        db.load(0)
+        db.load(3)
+        assert(db.get("key1", StateStore.DEFAULT_COL_FAMILY_NAME) === null)
+        assert(db.get("key2", testCfName) === null)
+
+        // Add back some data for testing merge operation
+        db.put("merge_key", "base", StateStore.DEFAULT_COL_FAMILY_NAME)
+        db.commit()
+
+        // Get encoded key for merge test
+        val mergeKeyWithPrefix = getKeyWithPrefix(
+          db, "merge_key", StateStore.DEFAULT_COL_FAMILY_NAME)
+
+        // Test merge with includesPrefix
+        db.load(0)
+        db.load(4)
+        db.merge(mergeKeyWithPrefix, "appended", includesPrefix = true)
+        db.commit()
+
+        // Verify merge operation worked
+        db.load(0)
+        db.load(5)
+        assert(toStr(db.get("merge_key", StateStore.DEFAULT_COL_FAMILY_NAME)) 
=== "base,appended")
+      }
+    }
+  }
+
+  // Helper method to get a key with column family prefix
+  private def getKeyWithPrefix(db: RocksDB, key: String, cfName: String): 
Array[Byte] = {
+    // This uses reflection to call the private encodeStateRowWithPrefix method
+    val encodeMethod = classOf[RocksDB].getDeclaredMethod(
+      "encodeStateRowWithPrefix",
+      classOf[Array[Byte]],
+      classOf[String]
+    )
+    encodeMethod.setAccessible(true)
+    encodeMethod.invoke(db, key.getBytes, cfName).asInstanceOf[Array[Byte]]
+  }
+
   testWithStateStoreCheckpointIdsAndColumnFamilies(s"RocksDB: get, put, 
iterator, commit, load",
     TestWithBothChangelogCheckpointingEnabledAndDisabled) {
     case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to