This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9ebe4a2d2c1b [SPARK-53792][SS] Fix rocksdbPinnedBlocksMemoryUsage when
bounded memory …
9ebe4a2d2c1b is described below
commit 9ebe4a2d2c1bc490e51843ec77365a1eda352244
Author: Ubuntu <[email protected]>
AuthorDate: Wed Oct 8 10:50:31 2025 -0700
[SPARK-53792][SS] Fix rocksdbPinnedBlocksMemoryUsage when bounded memory …
…usage is enabled
### What changes were proposed in this pull request?
Changing the way we calculate `pinnedBlocksMemUsage` when an instance has
bounded memory enabled.
Before the change, we always report back
`getDBProperty("rocksdb.block-cache-pinned-usage")` which returns that size of
pinned block requested by an instance. This is not accurate when instances
share the same cached, because instances might share the same pinned block
After this change, when isMemoryBounded is enabled for an instance, we call
`lruCache.getPinnedUsage()` to get the total memory usage of SHARED pinned
blocks, and divide the global usage with the number of IS_MEMORY_BOUNDED
instances. This is because RocksDBMemoryManager will return the same cache for
all the instance that has isMemoryBounded = true
Unit test:
build/mvn -Dtest=none
-DwildcardSuites=org.apache.spark.sql.execution.streaming.state.RocksDBSuite
test
### Why are the changes needed?
See above for why this is a bug.
This fix prevents us from over-reporting `pinnedBlocksMemUsage` when
isBoundedMemory is enabled for an instance
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
See added unit tests on the correctness of the calculation.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by cursor and 'claude-4-sonnet'
Closes #52527 from zifeif2/pinned-block-mem.
Authored-by: Ubuntu <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 5 ++-
.../streaming/state/RocksDBMemoryManager.scala | 23 +++++++++++
.../execution/streaming/state/RocksDBSuite.scala | 46 ++++++++++++++++++++++
3 files changed, 73 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 774ed23ed55b..c4dfe39f6744 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1461,7 +1461,6 @@ class RocksDB(
private def metrics: RocksDBMetrics = {
import HistogramType._
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
- val pinnedBlocksMemUsage =
getDBProperty("rocksdb.block-cache-pinned-usage")
val nativeOpsHistograms = Seq(
"get" -> DB_GET,
"put" -> DB_WRITE,
@@ -1497,6 +1496,10 @@ class RocksDB(
// Use RocksDBMemoryManager to calculate the memory usage accounting
val memoryUsage =
RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)
+ val totalPinnedBlocksMemUsage = lruCache.getPinnedUsage()
+ val pinnedBlocksMemUsage =
RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
+ instanceUniqueId, totalPinnedBlocksMemUsage)
+
RocksDBMetrics(
numKeysOnLoadedVersion,
numKeysOnWritingVersion,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
index 82ee1803b317..c96e51bcba9c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
@@ -121,6 +121,29 @@ object RocksDBMemoryManager extends Logging with
UnmanagedMemoryConsumer {
}
}
+ /**
+ * Get the pinned blocks memory usage for a specific instance, accounting
for bounded memory
+ * sharing.
+ * @param uniqueId The instance's unique identifier
+ * @param totalPinnedUsage The total pinned usage from the cache
+ * @return The adjusted pinned blocks memory usage accounting for sharing in
bounded memory mode
+ */
+ def getInstancePinnedBlocksMemUsage(
+ uniqueId: String,
+ totalPinnedUsage: Long): Long = {
+ val instanceInfo = instanceMemoryMap.
+ getOrDefault(uniqueId, InstanceMemoryInfo(0L, isBoundedMemory = false))
+ if (instanceInfo.isBoundedMemory) {
+ // In bounded memory mode, divide by the number of bounded instances
+ // since they share the same cache
+ val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory
*/)
+ totalPinnedUsage / numBoundedInstances
+ } else {
+ // In unbounded memory mode, each instance has its own cache
+ totalPinnedUsage
+ }
+ }
+
def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf):
(WriteBufferManager, Cache)
= synchronized {
// Register with UnifiedMemoryManager (idempotent operation)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 801e74d288b4..94e3bb208bf4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -2788,6 +2788,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
+ test("SPARK-53792: RocksDBMemoryManager getInstancePinnedBlocksMemUsage") {
+ try {
+ // Clear any existing providers from previous tests
+ RocksDBMemoryManager.resetWriteBufferManagerAndCache
+
+ val boundedMemoryId1 = "test-instance-1"
+ val boundedMemoryId2 = "test-instance-2"
+ val unboundedMemoryId = "test-instance"
+ val cacheUsage = 1000L
+ val cacheUsage1 = 300L // This should be ignored for bounded memory
+
+ // Register two bounded memory instances
+ RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId1, 0L,
isBoundedMemory = true)
+ RocksDBMemoryManager.updateMemoryUsage(boundedMemoryId2, 0L,
isBoundedMemory = true)
+ RocksDBMemoryManager.updateMemoryUsage(unboundedMemoryId, 0L,
isBoundedMemory = false)
+
+ // Test that both instances get the same divided value from
globalPinnedUsage
+ val result1 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
+ boundedMemoryId1,
+ cacheUsage)
+ val result2 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
+ boundedMemoryId2,
+ cacheUsage)
+ val result3 = RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
+ unboundedMemoryId,
+ cacheUsage1)
+
+ // With 2 bounded instances, each should get half of globalPinnedUsage
+ assert(result1 === 500L, s"Expected 500L for bounded instance 1, got
$result1")
+ assert(result2 === 500L, s"Expected 500L for bounded instance 2, got
$result2")
+ assert(result3 === 300L, s"Expected 300L for unbounded instance, got
$result3")
+
+ // Test with zero instances (unregistered instance)
+ RocksDBMemoryManager.resetWriteBufferManagerAndCache
+ val nonexistInstanceRes =
RocksDBMemoryManager.getInstancePinnedBlocksMemUsage(
+ boundedMemoryId1,
+ cacheUsage)
+ assert(
+ nonexistInstanceRes === cacheUsage,
+ s"Expected $cacheUsage when no instances, got $nonexistInstanceRes"
+ )
+ } finally {
+ RocksDBMemoryManager.resetWriteBufferManagerAndCache
+ }
+ }
+
testWithColumnFamilies("SPARK-37224: flipping option
'trackTotalNumberOfRows' during restart",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]