This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 30cea6f2e86 KAFKA-20456: Use lighter options for RocksDB offsets 
column family (#22085)
30cea6f2e86 is described below

commit 30cea6f2e863e2d758f463c37ada3ac791a5d8a7
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Apr 21 13:47:18 2026 -0400

    KAFKA-20456: Use lighter options for RocksDB offsets column family (#22085)
    
    This change gives the offsets column family its own lightweight
    `ColumnFamilyOptions` instead of sharing the data CF's options. The
    offsets CF stores only a handful of key-value pairs (one per changelog
    partition) — it doesn't need  the large write buffers, bloom filters, or
    aggressive compaction configured for the data CF.
    
    Reviewers: Nick Telford <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../RocksDBMigratingSessionStoreWithHeaders.java       |  2 +-
 .../RocksDBMigratingWindowStoreWithHeaders.java        |  2 +-
 .../kafka/streams/state/internals/RocksDBStore.java    | 18 +++++++++++++++++-
 .../state/internals/RocksDBTimestampedStore.java       |  2 +-
 .../internals/RocksDBTimestampedStoreWithHeaders.java  |  4 ++--
 5 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
index b1e081abd03..c1f1e6ec150 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
@@ -61,7 +61,7 @@ public class RocksDBMigratingSessionStoreWithHeaders extends 
RocksDBStore implem
             dbOptions,
             new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
             new 
ColumnFamilyDescriptor(SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions),
-            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
createOffsetsCFOptions())
         );
         final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
         final ColumnFamilyHandle withHeadersColumnFamily = 
columnFamilies.get(1);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
index 1ee56dd2c49..8789272ba37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
@@ -68,7 +68,7 @@ public class RocksDBMigratingWindowStoreWithHeaders extends 
RocksDBStore impleme
             dbOptions,
             new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
             new 
ColumnFamilyDescriptor(WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions),
-            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
createOffsetsCFOptions())
         );
         final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
         final ColumnFamilyHandle withHeadersColumnFamily = 
columnFamilies.get(1);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d99e9a9c346..b4dcf371a2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -310,12 +310,28 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         }
     }
 
+    /**
+     * Creates lightweight {@link ColumnFamilyOptions} for the offsets column 
family. The offsets CF
+     * stores only a small number of key-value pairs (one per changelog 
partition), so it does not
+     * need the heavyweight options used for the data CF (large write buffers, 
bloom filters,
+     * aggressive compaction). Sharing the data CF's options causes 
unnecessary write amplification
+     * and compaction pressure that can contribute to RocksDB write stalls 
under heavy restore I/O.
+     */
+    protected static ColumnFamilyOptions createOffsetsCFOptions() {
+        final ColumnFamilyOptions offsetsCFOptions = new ColumnFamilyOptions();
+        offsetsCFOptions.setCompressionType(CompressionType.NO_COMPRESSION);
+        offsetsCFOptions.setCompactionStyle(CompactionStyle.LEVEL);
+        offsetsCFOptions.setWriteBufferSize(1024 * 1024L); // 1MB — sufficient 
for offset metadata
+        offsetsCFOptions.setMaxWriteBufferNumber(2);
+        return offsetsCFOptions;
+    }
+
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
         final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
                 dbOptions,
                 new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
-                new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+                new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
createOffsetsCFOptions())
         );
 
         cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(1), 
columnFamilies.get(0));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 2a37e05fb26..8771ec110c5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -57,7 +57,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
             dbOptions,
             new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
             new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions),
-            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
createOffsetsCFOptions())
         );
         final ColumnFamilyHandle noTimestampColumnFamily = 
columnFamilies.get(0);
         final ColumnFamilyHandle withTimestampColumnFamily = 
columnFamilies.get(1);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index 50cbecfd293..e8f73af3fc1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -98,7 +98,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
             dbOptions,
             new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
             new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions),
-            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
createOffsetsCFOptions())
         );
 
         final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
@@ -139,7 +139,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
             new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
             new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME, 
columnFamilyOptions),
             new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions),
-            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+            new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
createOffsetsCFOptions())
         );
 
         try {

Reply via email to