This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 30cea6f2e86 KAFKA-20456: Use lighter options for RocksDB offsets
column family (#22085)
30cea6f2e86 is described below
commit 30cea6f2e863e2d758f463c37ada3ac791a5d8a7
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Apr 21 13:47:18 2026 -0400
KAFKA-20456: Use lighter options for RocksDB offsets column family (#22085)
This change gives the offsets column family its own lightweight
`ColumnFamilyOptions` instead of sharing the data CF's options. The
offsets CF stores only a handful of key-value pairs (one per changelog
partition) — it doesn't need the large write buffers, bloom filters, or
aggressive compaction configured for the data CF.
Reviewers: Nick Telford <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../RocksDBMigratingSessionStoreWithHeaders.java | 2 +-
.../RocksDBMigratingWindowStoreWithHeaders.java | 2 +-
.../kafka/streams/state/internals/RocksDBStore.java | 18 +++++++++++++++++-
.../state/internals/RocksDBTimestampedStore.java | 2 +-
.../internals/RocksDBTimestampedStoreWithHeaders.java | 4 ++--
5 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
index b1e081abd03..c1f1e6ec150 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
@@ -61,7 +61,7 @@ public class RocksDBMigratingSessionStoreWithHeaders extends
RocksDBStore implem
dbOptions,
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
new
ColumnFamilyDescriptor(SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions),
- new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
createOffsetsCFOptions())
);
final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
final ColumnFamilyHandle withHeadersColumnFamily =
columnFamilies.get(1);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
index 1ee56dd2c49..8789272ba37 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
@@ -68,7 +68,7 @@ public class RocksDBMigratingWindowStoreWithHeaders extends
RocksDBStore impleme
dbOptions,
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
new
ColumnFamilyDescriptor(WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions),
- new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
createOffsetsCFOptions())
);
final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
final ColumnFamilyHandle withHeadersColumnFamily =
columnFamilies.get(1);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d99e9a9c346..b4dcf371a2d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -310,12 +310,28 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
}
}
+ /**
+ * Creates lightweight {@link ColumnFamilyOptions} for the offsets column
family. The offsets CF
+ * stores only a small number of key-value pairs (one per changelog
partition), so it does not
+ * need the heavyweight options used for the data CF (large write buffers,
bloom filters,
+ * aggressive compaction). Sharing the data CF's options causes
unnecessary write amplification
+ * and compaction pressure that can contribute to RocksDB write stalls
under heavy restore I/O.
+ */
+ protected static ColumnFamilyOptions createOffsetsCFOptions() {
+ final ColumnFamilyOptions offsetsCFOptions = new ColumnFamilyOptions();
+ offsetsCFOptions.setCompressionType(CompressionType.NO_COMPRESSION);
+ offsetsCFOptions.setCompactionStyle(CompactionStyle.LEVEL);
+ offsetsCFOptions.setWriteBufferSize(1024 * 1024L); // 1MB — sufficient
for offset metadata
+ offsetsCFOptions.setMaxWriteBufferNumber(2);
+ return offsetsCFOptions;
+ }
+
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
dbOptions,
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
- new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
createOffsetsCFOptions())
);
cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(1),
columnFamilies.get(0));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 2a37e05fb26..8771ec110c5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -57,7 +57,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
dbOptions,
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions),
- new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
createOffsetsCFOptions())
);
final ColumnFamilyHandle noTimestampColumnFamily =
columnFamilies.get(0);
final ColumnFamilyHandle withTimestampColumnFamily =
columnFamilies.get(1);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index 50cbecfd293..e8f73af3fc1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -98,7 +98,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
dbOptions,
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions),
- new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
createOffsetsCFOptions())
);
final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
@@ -139,7 +139,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME,
columnFamilyOptions),
new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions),
- new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
createOffsetsCFOptions())
);
try {