This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d1a16a28f8 KAFKA-20663: Fix startup state manager close to release 
StateDirectory task lock (#22490)
3d1a16a28f8 is described below

commit 3d1a16a28f8f0036f7404289d99cf5e33447e90d
Author: Nick Telford <[email protected]>
AuthorDate: Sat Jun 6 18:49:59 2026 +0100

    KAFKA-20663: Fix startup state manager close to release StateDirectory task 
lock (#22490)
    
    ## Summary
    
    `initializeStartupStores()` acquires a `StateDirectory` task lock via
    `StateManagerUtil.registerStateStores()` but then closes the temporary
    state manager by calling `temporaryStateManager.close()` directly —
    which is `ProcessorStateManager.close()`. That method closes the RocksDB
    stores but has no knowledge of the `StateDirectory` lock and never calls
    `stateDirectory.unlock()`. Only `StateManagerUtil.closeStateManager()`
    calls `stateDirectory.unlock()`, and it was never invoked for the
    startup path.
    
    As a result the main thread holds the `StateDirectory` task lock for
    every startup task permanently. At shutdown, when the stream thread
    calls `closeStateManager()` for those tasks, `stateDirectory.lock()`
    returns `false` (main thread owns the lock, stream thread is the
    caller), so `db.close()` is never called. With WAL disabled the RocksDB
    memtables — including the 30-second `maybeCheckpoint()` offset writes —
    are lost when the JVM exits. The SST files retain only the offset from
    the last data-CF auto-flush, which for an inactive segment can be many
    hours old, causing `OffsetOutOfRangeException` on the next restart when
    the stale offset falls outside the changelog retention window.
    
    The secondary symptom is the logged error `"Some task directories still
    locked while closing state, this indicates unclean shutdown"`:
    `StateDirectory.close()` is called from the shutdown helper thread, not
    the main thread, so `unlockStartupStores()` cannot release
    main-thread-owned locks (`unlock()` requires the calling thread to match
    the lock owner).
    
    The fix replaces `temporaryStateManager.close()` with
    `StateManagerUtil.closeStateManager()`, which is the correct counterpart
    to `registerStateStores()` — every other call site in the codebase
    already pairs them. `closeStateManager()` calls
    `ProcessorStateManager.close()` (preserving existing behaviour including
    `maybeDowngradeOffsets()`) and then releases the `StateDirectory` lock
    in a nested `finally` block.
    
    Reviewers: Bill Bejeck <[email protected]>, Eduwer Camacaro
     <[email protected]>, Matthias Sax <[email protected]>
---
 .../kafka/streams/processor/internals/StateDirectory.java     |  8 +++++++-
 .../kafka/streams/processor/internals/StateDirectoryTest.java | 11 ++++++++++-
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 2eab9736b4a..bf57e26e5f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.internals.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -230,6 +231,7 @@ public class StateDirectory implements AutoCloseable {
         final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
         if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
             final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean transactionalStateStoresEnabled = new 
TopologyConfig(config).transactionalStateStoresEnabled;
 
             // Initialize thread-specific resources needed to open stores in 
the state directory
             final String threadLogPrefix = String.format("[%s]", 
Thread.currentThread().getName());
@@ -271,7 +273,11 @@ public class StateDirectory implements AutoCloseable {
                     } finally {
                         // Make sure the state manager writes the local 
checkpoint file before closing the stores
                         // This will be replaced in the future when removing 
the checkpoint file dependency.
-                        temporaryStateManager.close();
+                        StateManagerUtil.closeStateManager(
+                            log, threadLogPrefix, true, eosEnabled,
+                            transactionalStateStoresEnabled,
+                            temporaryStateManager, this, Task.TaskType.ACTIVE
+                        );
                     }
                     tasksInLocalState.add(task);
                 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 0f3e6846d24..e2ef1acd0d5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -877,12 +877,21 @@ public class StateDirectoryTest {
         assertFalse(directory.removeStartupState(taskId));
     }
 
+    @Test
+    public void shouldNotHoldLockAfterInitializeStartupStores() {
+        final TaskId taskId = new TaskId(0, 0);
+        initializeStartupStores(taskId, true);
+
+        assertTrue(directory.hasStartupTasks());
+        assertNull(directory.lockOwner(taskId));
+    }
+
     @Test
     public void shouldUnlockStartupStateOnClose() {
         final TaskId taskId = new TaskId(0, 0);
         initializeStartupStores(taskId, true);
 
-        assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
+        assertNull(directory.lockOwner(taskId));
         directory.close();
         assertNull(directory.lockOwner(taskId));
     }

Reply via email to