This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new c04dc0486076 [SPARK-51373][SS] Removing extra copy for column family prefix from 'ReplyChangelog' c04dc0486076 is described below commit c04dc048607605c4bc2dba8372ade6899f62a4ce Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Tue Mar 4 17:27:58 2025 +0900 [SPARK-51373][SS] Removing extra copy for column family prefix from 'ReplyChangelog' ### What changes were proposed in this pull request? There is currently an extra copy when column families are enabled to first remove the column family prefix, and then one to add this column family prefix back when replaying the changelog. Because we don't do anything with this prefix or the raw bytes, we don't need to add and remove and can instead just passthrough. ### Why are the changes needed? This change is needed for performance reasons for the TransformWithState operator - extra copies incur performance hit, and this change removes these extra copies. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests are sufficient, as we are not adding any new functionality. ### Was this patch authored or co-authored using generative AI tooling? No Closes #50119 from ericm-db/changelog-copy. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit c2f2be68dd09db0233ba67c35644b311233e501a) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 24 +++--- .../execution/streaming/state/RocksDBSuite.scala | 89 ++++++++++++++++++++++ 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 820322d1e0ee..1c04398a4ed7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -671,16 +671,15 @@ class RocksDB( if (useColumnFamilies) { changelogReader.foreach { case (recordType, key, value) => - val (keyWithoutPrefix, cfName) = decodeStateRowWithPrefix(key) recordType match { case RecordType.PUT_RECORD => - put(keyWithoutPrefix, value, cfName) + put(key, value, includesPrefix = true) case RecordType.DELETE_RECORD => - remove(keyWithoutPrefix, cfName) + remove(key, includesPrefix = true) case RecordType.MERGE_RECORD => - merge(keyWithoutPrefix, value, cfName) + merge(key, value, includesPrefix = true) } } } else { @@ -801,8 +800,9 @@ class RocksDB( def put( key: Array[Byte], value: Array[Byte], - cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { - val keyWithPrefix = if (useColumnFamilies) { + cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME, + includesPrefix: Boolean = false): Unit = { + val keyWithPrefix = if (useColumnFamilies && !includesPrefix) { encodeStateRowWithPrefix(key, cfName) } else { key @@ -827,8 +827,9 @@ class RocksDB( def merge( key: Array[Byte], value: Array[Byte], - cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { - val keyWithPrefix = if (useColumnFamilies) { + cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME, + includesPrefix: Boolean = false): Unit = { + val keyWithPrefix = if (useColumnFamilies && !includesPrefix) { encodeStateRowWithPrefix(key, cfName) } else { key @@ -843,8 +844,11 @@ class RocksDB( * Remove the key if present. * @note This update is not committed to disk until commit() is called. */ - def remove(key: Array[Byte], cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { - val keyWithPrefix = if (useColumnFamilies) { + def remove( + key: Array[Byte], + cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME, + includesPrefix: Boolean = false): Unit = { + val keyWithPrefix = if (useColumnFamilies && !includesPrefix) { encodeStateRowWithPrefix(key, cfName) } else { key diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 50240c0605e8..475f8b116830 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -1196,6 +1196,95 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithColumnFamilies( + "RocksDB: test includesPrefix parameter during changelog replay", + TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled => + + // Only test when column families are enabled, as the includesPrefix parameter + // is only relevant in that case + if (colFamiliesEnabled) { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + + withDB(remoteDir, conf = conf, useColumnFamilies = true) { db => + // Create a test column family + val testCfName = "test_cf" + db.createColFamilyIfAbsent(testCfName, isInternal = false) + + // Write initial data + db.load(0) + db.put("key1", "value1", StateStore.DEFAULT_COL_FAMILY_NAME) + db.put("key2", "value2", testCfName) + db.commit() + + // Get the encoded keys with column family prefixes + val keyWithPrefix1 = getKeyWithPrefix(db, "key1", StateStore.DEFAULT_COL_FAMILY_NAME) + val keyWithPrefix2 = getKeyWithPrefix(db, "key2", testCfName) + + // Pretend we're replaying changelog with already-prefixed keys + // Throughout this test, we will load version 0 and the latest version + // in order to ensure that the changelog files are read from and + // replayed + db.load(0) + db.load(1) + + // Use the includesPrefix=true parameter with keys that already have prefixes + db.put(keyWithPrefix1, "updated1", includesPrefix = true) + db.put(keyWithPrefix2, "updated2", includesPrefix = true) + db.commit() + + // Verify the updates were applied correctly + db.load(0) + db.load(2) + assert(toStr(db.get("key1", StateStore.DEFAULT_COL_FAMILY_NAME)) === "updated1") + assert(toStr(db.get("key2", testCfName)) === "updated2") + + // Test remove with includesPrefix + db.remove(keyWithPrefix1, includesPrefix = true) + db.remove(keyWithPrefix2, includesPrefix = true) + db.commit() + + // Verify removals worked + db.load(0) + db.load(3) + assert(db.get("key1", StateStore.DEFAULT_COL_FAMILY_NAME) === null) + assert(db.get("key2", testCfName) === null) + + // Add back some data for testing merge operation + db.put("merge_key", "base", StateStore.DEFAULT_COL_FAMILY_NAME) + db.commit() + + // Get encoded key for merge test + val mergeKeyWithPrefix = getKeyWithPrefix( + db, "merge_key", StateStore.DEFAULT_COL_FAMILY_NAME) + + // Test merge with includesPrefix + db.load(0) + db.load(4) + db.merge(mergeKeyWithPrefix, "appended", includesPrefix = true) + db.commit() + + // Verify merge operation worked + db.load(0) + db.load(5) + assert(toStr(db.get("merge_key", StateStore.DEFAULT_COL_FAMILY_NAME)) === "base,appended") + } + } + } + + // Helper method to get a key with column family prefix + private def getKeyWithPrefix(db: RocksDB, key: String, cfName: String): Array[Byte] = { + // This uses reflection to call the private encodeStateRowWithPrefix method + val encodeMethod = classOf[RocksDB].getDeclaredMethod( + "encodeStateRowWithPrefix", + classOf[Array[Byte]], + classOf[String] + ) + encodeMethod.setAccessible(true) + encodeMethod.invoke(db, key.getBytes, cfName).asInstanceOf[Array[Byte]] + } + testWithStateStoreCheckpointIdsAndColumnFamilies(s"RocksDB: get, put, iterator, commit, load", TestWithBothChangelogCheckpointingEnabledAndDisabled) { case (enableStateStoreCheckpointIds, colFamiliesEnabled) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org