This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 64a4ee5788f KAFKA-20456: Avoid persisting closed state if store is not
open yet (#22140)
64a4ee5788f is described below
commit 64a4ee5788f9f2d05ac0e0dbb0bac01620879749
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Mon Apr 27 11:19:16 2026 -0500
KAFKA-20456: Avoid persisting closed state if store is not open yet (#22140)
KIP-1035 implementation added a new path when closing Rocksdb stores:
persist a status flag that helps to identify unclean shutdowns at the
store level.
We should only persist the status as closed if the store was previously
open (and fully reachable) because after an unclean shutdown, RocksDB
usually executes a background recovery process that causes stalls on the
first write.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../internals/AbstractColumnFamilyAccessor.java | 8 ++++++--
.../internals/AbstractColumnFamilyAccessorTest.java | 21 +++++++++++++++++++++
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index a7c7ab0df63..004c824cc3e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -96,9 +96,13 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
@Override
public void close(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
- accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+ // Only persist the closed state if the store was previously open.
+ // After an unclean shutdown, RocksDB may still be running background
recovery,
+ // causing accessor.put() to block.
+ if (storeOpen.compareAndSet(true, false)) {
+ accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+ }
offsetColumnFamilyHandle.close();
- storeOpen.set(false);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index b06ee508746..9a5c39728cd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -43,7 +43,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
abstract class AbstractColumnFamilyAccessorTest {
@@ -142,6 +146,23 @@ abstract class AbstractColumnFamilyAccessorTest {
assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
}
+ @Test
+ public void shouldSkipPersistingStateOnCloseWhenStoreIsAlreadyClosed()
throws RocksDBException {
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+ // Open and close cleanly
+ accessor.open(dbAccessor, false);
+ accessor.close(dbAccessor);
+ assertArrayEquals(closedValue, dbAccessor.get(offsetsCF,
toBytes("status")));
+
+ // Simulate unclean shutdown: overwrite status to open without going
through accessor.open()
+ dbAccessor.put(offsetsCF, toBytes("status"), openValue);
+ assertThrowsExactly(ProcessorStateException.class, () ->
accessor.open(dbAccessor, false));
+
+ dbAccessor = spy(dbAccessor);
+ accessor.close(dbAccessor);
+ verify(dbAccessor, never()).put(any(), any(), any());
+ }
+
private byte[] toBytes(final String s) {
return keySerializer.serialize("", s);
}