This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4becf481586 MINOR:Remove explicit flushes from RocksDBStores (#21871)
4becf481586 is described below

commit 4becf4815861dfe2a289da07a4c7775f1cd6fe98
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Mar 31 17:45:01 2026 -0500

    MINOR:Remove explicit flushes from RocksDBStores (#21871)
    
    RocksDBStores now manage their own offsets by storing them in a
    `offsets` ColumnFamily. Also, these stores use a RocksDB atomic flush to
    guarantee consistency between the default and offsets ColumnFamilies. As
    described in KIP-1035, this new behavior enables RocksDB stores to avoid
    explicitly flushing the memtables to the SST files.
    
    This is a follow-up PR for https://github.com/apache/kafka/pull/21578 we
    can now implement this because KAFKA-19712 is solved.
    
    Reviewers: NIck Telford <[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../state/internals/AbstractColumnFamilyAccessor.java       | 11 -----------
 .../streams/state/internals/DualColumnFamilyAccessor.java   |  5 -----
 .../apache/kafka/streams/state/internals/RocksDBStore.java  |  5 -----
 .../state/internals/DualColumnFamilyAccessorTest.java       | 13 -------------
 .../kafka/streams/state/internals/RocksDBStoreTest.java     | 11 -----------
 5 files changed, 45 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 0288872f567..a7c7ab0df63 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -69,8 +69,6 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
                 }
             }
         }
-        // We need to remove this flush call when implementing KAFKA-19712
-        this.flush(accessor, offsetColumnFamilyHandle);
     }
 
     @Override
@@ -112,15 +110,6 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
         return null;
     }
 
-    /**
-     * Invokes commit in the underlying ColumnFamilyAccessor.
-     * Subclasses should implement this method to define specific commit 
behavior.
-     * This method will be removed when implementing KAFKA-19712
-     *
-     * @param accessor the RocksDB accessor used to interact with the database
-     * @throws RocksDBException if an error occurs during the commit operation
-     */
-    protected abstract void flush(final RocksDBStore.DBAccessor accessor, 
final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException;
 
     private void wipeOffsets(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
         try (final RocksIterator iter = 
accessor.newIterator(offsetColumnFamilyHandle)) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
index db4274cc625..70a19ff125f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -239,11 +239,6 @@ class DualColumnFamilyAccessor extends 
AbstractColumnFamilyAccessor {
                 + accessor.approximateNumEntries(newColumnFamily);
     }
 
-    @Override
-    public void flush(final DBAccessor accessor, final ColumnFamilyHandle 
offsetColumnFamilyHandle) throws RocksDBException {
-        accessor.flush(oldColumnFamily, newColumnFamily, 
offsetColumnFamilyHandle);
-    }
-
     @Override
     public void addToBatch(final byte[] key,
                            final byte[] value,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 621e1eaa566..7b616eb6b88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -1115,11 +1115,6 @@ public class RocksDBStore implements 
KeyValueStore<Bytes, byte[]>, BatchWritingS
             return accessor.approximateNumEntries(columnFamily);
         }
 
-        @Override
-        public void flush(final DBAccessor accessor, final ColumnFamilyHandle 
offsetColumnFamilyHandle) throws RocksDBException {
-            accessor.flush(columnFamily, offsetColumnFamilyHandle);
-        }
-
         @Override
         public void addToBatch(final byte[] key,
                                final byte[] value,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
index 94efbb18a66..437352b5c90 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -35,9 +34,7 @@ import org.rocksdb.WriteBatchInterface;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -341,16 +338,6 @@ public class DualColumnFamilyAccessorTest extends 
AbstractColumnFamilyAccessorTe
         assertEquals(150L, result);
     }
 
-    @Test
-    public void shouldFlushBothColumnFamiliesOnCommit() throws 
RocksDBException {
-        final Map<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(new TopicPartition("topic", 0), 100L);
-
-        accessor.commit(dbAccessor, offsets);
-
-        verify(dbAccessor).flush(oldCF, newCF, offsetsCF);
-    }
-
     @Test
     public void shouldCreateRangeIterator() {
         final RocksIterator iterNewFormat = mock(RocksIterator.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 1a6a83848d7..128534a1bb8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -86,7 +86,6 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.Statistics;
 
 import java.io.File;
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -918,16 +917,6 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         }
     }
 
-    @Test
-    public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws 
IOException {
-        rocksDBStore.init(context, rocksDBStore);
-        Utils.delete(dir);
-        rocksDBStore.put(
-            new Bytes(stringSerializer.serialize(null, "anyKey")),
-            stringSerializer.serialize(null, "anyValue"));
-        assertThrows(ProcessorStateException.class, () -> 
rocksDBStore.commit(Map.of()));
-    }
-
     @Test
     public void shouldHandleToggleOfEnablingBloomFilters() {
         final Properties props = StreamsTestUtils.getStreamsConfig();

Reply via email to