This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c9b210be34 KAFKA-20456: Avoid persisting closed state if store is not 
open yet (#22140)
3c9b210be34 is described below

commit 3c9b210be3436c0b2086f779ce56db2377ee338e
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Mon Apr 27 11:19:16 2026 -0500

    KAFKA-20456: Avoid persisting closed state if store is not open yet (#22140)
    
    KIP-1035 implementation added a new path when closing Rocksdb stores:
    persist a status flag that helps to identify unclean shutdowns at the
    store level.
    
    We should only persist the status as closed if the store was previously
    open (and fully reachable) because after an unclean shutdown, RocksDB
    usually executes a background recovery process that causes stalls on the
    first write.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../internals/AbstractColumnFamilyAccessor.java     |  8 ++++++--
 .../internals/AbstractColumnFamilyAccessorTest.java | 21 +++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index a7c7ab0df63..004c824cc3e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -96,9 +96,13 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
 
     @Override
     public void close(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
-        accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+        // Only persist the closed state if the store was previously open.
+        // After an unclean shutdown, RocksDB may still be running background 
recovery,
+        // causing accessor.put() to block.
+        if (storeOpen.compareAndSet(true, false)) {
+            accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+        }
         offsetColumnFamilyHandle.close();
-        storeOpen.set(false);
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index b06ee508746..9a5c39728cd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -43,7 +43,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
 abstract class AbstractColumnFamilyAccessorTest {
@@ -142,6 +146,23 @@ abstract class AbstractColumnFamilyAccessorTest {
         assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
     }
 
+    @Test
+    public void shouldSkipPersistingStateOnCloseWhenStoreIsAlreadyClosed() 
throws RocksDBException {
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+        // Open and close cleanly
+        accessor.open(dbAccessor, false);
+        accessor.close(dbAccessor);
+        assertArrayEquals(closedValue, dbAccessor.get(offsetsCF, 
toBytes("status")));
+
+        // Simulate unclean shutdown: overwrite status to open without going 
through accessor.open()
+        dbAccessor.put(offsetsCF, toBytes("status"), openValue);
+        assertThrowsExactly(ProcessorStateException.class, () -> 
accessor.open(dbAccessor, false));
+
+        dbAccessor = spy(dbAccessor);
+        accessor.close(dbAccessor);
+        verify(dbAccessor, never()).put(any(), any(), any());
+    }
+
     private byte[] toBytes(final String s) {
         return keySerializer.serialize("", s);
     }

Reply via email to