This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 64a4ee5788f KAFKA-20456: Avoid persisting closed state if store is not 
open yet (#22140)
64a4ee5788f is described below

commit 64a4ee5788f9f2d05ac0e0dbb0bac01620879749
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Mon Apr 27 11:19:16 2026 -0500

    KAFKA-20456: Avoid persisting closed state if store is not open yet (#22140)
    
    KIP-1035 implementation added a new path when closing Rocksdb stores:
    persist a status flag that helps to identify unclean shutdowns at the
    store level.
    
    We should only persist the status as closed if the store was previously
    open (and fully reachable) because after an unclean shutdown, RocksDB
    usually executes a background recovery process that causes stalls on the
    first write.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../internals/AbstractColumnFamilyAccessor.java     |  8 ++++++--
 .../internals/AbstractColumnFamilyAccessorTest.java | 21 +++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index a7c7ab0df63..004c824cc3e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -96,9 +96,13 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
 
     @Override
     public void close(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
-        accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+        // Only persist the closed state if the store was previously open.
+        // After an unclean shutdown, RocksDB may still be running background 
recovery,
+        // causing accessor.put() to block.
+        if (storeOpen.compareAndSet(true, false)) {
+            accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+        }
         offsetColumnFamilyHandle.close();
-        storeOpen.set(false);
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index b06ee508746..9a5c39728cd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -43,7 +43,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
 abstract class AbstractColumnFamilyAccessorTest {
@@ -142,6 +146,23 @@ abstract class AbstractColumnFamilyAccessorTest {
         assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
     }
 
+    @Test
+    public void shouldSkipPersistingStateOnCloseWhenStoreIsAlreadyClosed() 
throws RocksDBException {
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+        // Open and close cleanly
+        accessor.open(dbAccessor, false);
+        accessor.close(dbAccessor);
+        assertArrayEquals(closedValue, dbAccessor.get(offsetsCF, 
toBytes("status")));
+
+        // Simulate unclean shutdown: overwrite status to open without going 
through accessor.open()
+        dbAccessor.put(offsetsCF, toBytes("status"), openValue);
+        assertThrowsExactly(ProcessorStateException.class, () -> 
accessor.open(dbAccessor, false));
+
+        dbAccessor = spy(dbAccessor);
+        accessor.close(dbAccessor);
+        verify(dbAccessor, never()).put(any(), any(), any());
+    }
+
     private byte[] toBytes(final String s) {
         return keySerializer.serialize("", s);
     }

Reply via email to