This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c9b210be34 KAFKA-20456: Avoid persisting closed state if store is not
open yet (#22140)
3c9b210be34 is described below
commit 3c9b210be3436c0b2086f779ce56db2377ee338e
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Mon Apr 27 11:19:16 2026 -0500
KAFKA-20456: Avoid persisting closed state if store is not open yet (#22140)
KIP-1035 implementation added a new path when closing Rocksdb stores:
persist a status flag that helps to identify unclean shutdowns at the
store level.
We should only persist the status as closed if the store was previously
open (and fully reachable) because after an unclean shutdown, RocksDB
usually executes a background recovery process that causes stalls on the
first write.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../internals/AbstractColumnFamilyAccessor.java | 8 ++++++--
.../internals/AbstractColumnFamilyAccessorTest.java | 21 +++++++++++++++++++++
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index a7c7ab0df63..004c824cc3e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -96,9 +96,13 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
@Override
public void close(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
- accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+ // Only persist the closed state if the store was previously open.
+ // After an unclean shutdown, RocksDB may still be running background
recovery,
+ // causing accessor.put() to block.
+ if (storeOpen.compareAndSet(true, false)) {
+ accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+ }
offsetColumnFamilyHandle.close();
- storeOpen.set(false);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index b06ee508746..9a5c39728cd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -43,7 +43,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
abstract class AbstractColumnFamilyAccessorTest {
@@ -142,6 +146,23 @@ abstract class AbstractColumnFamilyAccessorTest {
assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
}
+ @Test
+ public void shouldSkipPersistingStateOnCloseWhenStoreIsAlreadyClosed()
throws RocksDBException {
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+ // Open and close cleanly
+ accessor.open(dbAccessor, false);
+ accessor.close(dbAccessor);
+ assertArrayEquals(closedValue, dbAccessor.get(offsetsCF,
toBytes("status")));
+
+ // Simulate unclean shutdown: overwrite status to open without going
through accessor.open()
+ dbAccessor.put(offsetsCF, toBytes("status"), openValue);
+ assertThrowsExactly(ProcessorStateException.class, () ->
accessor.open(dbAccessor, false));
+
+ dbAccessor = spy(dbAccessor);
+ accessor.close(dbAccessor);
+ verify(dbAccessor, never()).put(any(), any(), any());
+ }
+
private byte[] toBytes(final String s) {
return keySerializer.serialize("", s);
}