This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4becf481586 MINOR:Remove explicit flushes from RocksDBStores (#21871)
4becf481586 is described below
commit 4becf4815861dfe2a289da07a4c7775f1cd6fe98
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Mar 31 17:45:01 2026 -0500
MINOR:Remove explicit flushes from RocksDBStores (#21871)
RocksDBStores now manage their own offsets by storing them in a
`offsets` ColumnFamily. Also, these stores use a RocksDB atomic flush to
guarantee consistency between the default and offsets ColumnFamilies. As
described in KIP-1035, this new behavior enables RocksDB stores to avoid
explicitly flushing the memtables to the SST files.
This is a follow-up PR for https://github.com/apache/kafka/pull/21578 we
can now implement this because KAFKA-19712 is solved.
Reviewers: NIck Telford <[email protected]>, Bill Bejeck
<[email protected]>
---
.../state/internals/AbstractColumnFamilyAccessor.java | 11 -----------
.../streams/state/internals/DualColumnFamilyAccessor.java | 5 -----
.../apache/kafka/streams/state/internals/RocksDBStore.java | 5 -----
.../state/internals/DualColumnFamilyAccessorTest.java | 13 -------------
.../kafka/streams/state/internals/RocksDBStoreTest.java | 11 -----------
5 files changed, 45 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 0288872f567..a7c7ab0df63 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -69,8 +69,6 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
}
}
}
- // We need to remove this flush call when implementing KAFKA-19712
- this.flush(accessor, offsetColumnFamilyHandle);
}
@Override
@@ -112,15 +110,6 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
return null;
}
- /**
- * Invokes commit in the underlying ColumnFamilyAccessor.
- * Subclasses should implement this method to define specific commit
behavior.
- * This method will be removed when implementing KAFKA-19712
- *
- * @param accessor the RocksDB accessor used to interact with the database
- * @throws RocksDBException if an error occurs during the commit operation
- */
- protected abstract void flush(final RocksDBStore.DBAccessor accessor,
final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException;
private void wipeOffsets(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
try (final RocksIterator iter =
accessor.newIterator(offsetColumnFamilyHandle)) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
index db4274cc625..70a19ff125f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -239,11 +239,6 @@ class DualColumnFamilyAccessor extends
AbstractColumnFamilyAccessor {
+ accessor.approximateNumEntries(newColumnFamily);
}
- @Override
- public void flush(final DBAccessor accessor, final ColumnFamilyHandle
offsetColumnFamilyHandle) throws RocksDBException {
- accessor.flush(oldColumnFamily, newColumnFamily,
offsetColumnFamilyHandle);
- }
-
@Override
public void addToBatch(final byte[] key,
final byte[] value,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 621e1eaa566..7b616eb6b88 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -1115,11 +1115,6 @@ public class RocksDBStore implements
KeyValueStore<Bytes, byte[]>, BatchWritingS
return accessor.approximateNumEntries(columnFamily);
}
- @Override
- public void flush(final DBAccessor accessor, final ColumnFamilyHandle
offsetColumnFamilyHandle) throws RocksDBException {
- accessor.flush(columnFamily, offsetColumnFamilyHandle);
- }
-
@Override
public void addToBatch(final byte[] key,
final byte[] value,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
index 94efbb18a66..437352b5c90 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -35,9 +34,7 @@ import org.rocksdb.WriteBatchInterface;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -341,16 +338,6 @@ public class DualColumnFamilyAccessorTest extends
AbstractColumnFamilyAccessorTe
assertEquals(150L, result);
}
- @Test
- public void shouldFlushBothColumnFamiliesOnCommit() throws
RocksDBException {
- final Map<TopicPartition, Long> offsets = new HashMap<>();
- offsets.put(new TopicPartition("topic", 0), 100L);
-
- accessor.commit(dbAccessor, offsets);
-
- verify(dbAccessor).flush(oldCF, newCF, offsetsCF);
- }
-
@Test
public void shouldCreateRangeIterator() {
final RocksIterator iterNewFormat = mock(RocksIterator.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 1a6a83848d7..128534a1bb8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -86,7 +86,6 @@ import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import java.io.File;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -918,16 +917,6 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
}
}
- @Test
- public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws
IOException {
- rocksDBStore.init(context, rocksDBStore);
- Utils.delete(dir);
- rocksDBStore.put(
- new Bytes(stringSerializer.serialize(null, "anyKey")),
- stringSerializer.serialize(null, "anyValue"));
- assertThrows(ProcessorStateException.class, () ->
rocksDBStore.commit(Map.of()));
- }
-
@Test
public void shouldHandleToggleOfEnablingBloomFilters() {
final Properties props = StreamsTestUtils.getStreamsConfig();