This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5740a26525a KAFKA-19712: ProcessorStateManager delegates offset 
tracking to stores (#21738)
5740a26525a is described below

commit 5740a26525a27df9eacd847a6d4ed6eb23fda0dc
Author: Nick Telford <[email protected]>
AuthorDate: Mon Mar 16 17:15:55 2026 +0000

    KAFKA-19712: ProcessorStateManager delegates offset tracking to stores 
(#21738)
    
    As part of KIP-1035, we want to transition away from task-specific
    `.checkpoint` files, and instead delegate offset management to
    `StateStore`s.
    
    We now have a `LegacyCheckpointingStateStore` wrapper to encapsulate the
    management of offsets for `StateStore` implementations that do not know
    how to manage their own offsets (i.e. for which `managesOffsets() ==
    false`).
    
    As of KAFKA-20212, `RocksDBStore` now knows how to manage its own
    offsets, so it will not be wrapped in a `LegacyCheckpointingStateStore`;
    only user-defined persistent stores will use this wrapper.
    
    Corresponding changes to `GlobalStateManagerImpl` will be submitted
    independently, as KAFKA-20257.
    
    Until both `ProcessorStateManager` and `GlobalStateManagerImpl` have
    been updated, the `StateManager` interface must remain as-is. Therefore,
    the `flush` and `checkpoint` methods will not be consolidated until a
    later PR, which will clean up the interface and its usage by `Task` and
    friends.
    
    Reviewers: Bill Bejeck <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../streams/integration/EosIntegrationTest.java    |   2 +-
 .../processor/internals/ProcessorStateManager.java | 238 +++++++++------------
 .../processor/internals/StateDirectory.java        |  13 +-
 .../processor/internals/StateManagerUtil.java      |   2 +-
 .../streams/processor/internals/StreamTask.java    |   9 -
 .../processor/internals/ActiveTaskCreatorTest.java |   2 -
 .../internals/ProcessorStateManagerTest.java       | 201 ++++++++---------
 .../processor/internals/StateManagerUtilTest.java  |   2 +-
 .../processor/internals/TaskManagerTest.java       |   2 -
 9 files changed, 193 insertions(+), 278 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 2d7f2b2e1fa..ca09905fdab 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -927,7 +927,7 @@ public class EosIntegrationTest {
             streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
             
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
             task00.toString(),
-            ".checkpoint"
+            ".checkpoint_" + stateStoreName
         ).toFile();
         assertTrue(checkpointFile.exists());
         final Map<TopicPartition, Long> checkpoints = new 
OffsetCheckpoint(checkpointFile).read();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 86725805ce5..6a7eb8c25cf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 
@@ -51,10 +51,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
-import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
 import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
-import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
 
 /**
  * ProcessorStateManager is the source of truth for the current offset for 
each state store,
@@ -154,7 +152,12 @@ public class ProcessorStateManager implements StateManager 
{
         }
 
         StateStore store() {
-            return this.stateStore;
+            return 
LegacyCheckpointingStateStore.maybeUnwrapStore(this.stateStore);
+        }
+
+        void markCorrupted() {
+            this.corrupted = true;
+            LegacyCheckpointingStateStore.maybeMarkCorrupted(this.stateStore);
         }
 
         @Override
@@ -178,7 +181,6 @@ public class ProcessorStateManager implements StateManager {
 
     private final StateDirectory stateDirectory;
     private final File baseDir;
-    private final OffsetCheckpoint checkpointFile;
 
     private TaskType taskType;
     private Logger log;
@@ -211,7 +213,6 @@ public class ProcessorStateManager implements StateManager {
         this.sourcePartitions = sourcePartitions;
 
         this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
-        this.checkpointFile = new 
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
         this.stateDirectory = stateDirectory;
 
         log.debug("Created state store manager for task {}", taskId);
@@ -233,12 +234,26 @@ public class ProcessorStateManager implements 
StateManager {
 
     void registerStateStores(final List<StateStore> allStores, final 
InternalProcessorContext<?, ?> processorContext) {
         processorContext.uninitialize();
+        final Map<TopicPartition, StateStore> storesToMigrate = new 
HashMap<>(stores.size());
         for (final StateStore store : allStores) {
             if (!stores.containsKey(store.name())) {
-                store.init(processorContext, store);
+                if (isLoggingEnabled(store.name())) {
+                    final TopicPartition changelogPartition = 
getStorePartition(store.name());
+                    final StateStore maybeWrappedStore = 
LegacyCheckpointingStateStore.maybeWrapStore(
+                            store, eosEnabled, Set.of(changelogPartition), 
stateDirectory, taskId, logPrefix);
+                    maybeWrappedStore.init(processorContext, 
maybeWrappedStore);
+                    storesToMigrate.put(changelogPartition, maybeWrappedStore);
+                } else {
+                    final StateStore maybeWrappedStore = 
LegacyCheckpointingStateStore.maybeWrapStore(
+                            store, eosEnabled, Set.of(), stateDirectory, 
taskId, logPrefix);
+                    maybeWrappedStore.init(processorContext, 
maybeWrappedStore);
+                }
             }
             log.trace("Registered state store {}", store.name());
         }
+
+        // migrate offsets from the legacy checkpoint file into the stores
+        LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, 
stateDirectory, taskId, storesToMigrate);
     }
 
     void registerGlobalStateStores(final List<StateStore> stateStores) {
@@ -254,68 +269,52 @@ public class ProcessorStateManager implements 
StateManager {
     }
 
     // package-private for test only
-    void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
-        try {
-            final Map<TopicPartition, Long> loadedCheckpoints = 
checkpointFile.read();
-
-            log.trace("Loaded offsets from the checkpoint file: {}", 
loadedCheckpoints);
-
-            for (final StateStoreMetadata store : stores.values()) {
-                if (store.corrupted) {
-                    log.error("Tried to initialize store offsets for corrupted 
store {}", store);
-                    throw new IllegalStateException("Should not initialize 
offsets for a corrupted task");
-                }
+    void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+        for (final StateStoreMetadata store : stores.values()) {
+            if (store.corrupted) {
+                log.error("Tried to initialize store offsets for corrupted 
store {}", store);
+                throw new ProcessorStateException(
+                        "Error initializing offsets for store '" + store + "'",
+                        new IllegalStateException("Should not initialize 
offsets for a corrupted task")
+                );
+            }
 
-                if (store.changelogPartition == null) {
-                    log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
-                } else if (!store.stateStore.persistent()) {
-                    log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
-                             store.changelogPartition, 
store.stateStore.name());
-                } else if (store.offset() == null) {
-                    if 
(loadedCheckpoints.containsKey(store.changelogPartition)) {
-                        final Long offset = 
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
-                        store.setOffset(offset);
-
-                        log.info("State store {} initialized from checkpoint 
with offset {} at changelog {}",
-                                  store.stateStore.name(), store.offset, 
store.changelogPartition);
-                    } else {
-                        // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
-                        // and hence we are uncertain that the current local 
state only contains committed data;
-                        // in that case we need to treat it as a 
task-corrupted exception
-                        if (eosEnabled && !storeDirIsEmpty) {
-                            log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
+            if (store.changelogPartition == null) {
+                log.info("State store {} is not logged and hence would not be 
restored", store.stateStore.name());
+            } else if (!store.stateStore.persistent()) {
+                log.info("Initializing to the starting offset for changelog {} 
of in-memory state store {}",
+                        store.changelogPartition, store.stateStore.name());
+            } else if (store.offset() == null) {
+                final Long offset = 
store.stateStore.committedOffset(store.changelogPartition);
+
+                if (offset != null) {
+                    store.setOffset(offset);
+                    log.info("State store {} initialized from checkpoint with 
offset {} at changelog {}",
+                            store.stateStore.name(), store.offset, 
store.changelogPartition);
+                } else {
+                    // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+                    // and hence we are uncertain that the current local state 
only contains committed data;
+                    // in that case we need to treat it as a task-corrupted 
exception
+                    if (eosEnabled && !storeDirIsEmpty) {
+                        log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
                                 "since under EOS it has the risk of getting 
uncommitted data in stores we have to " +
                                 "treat it as a task corruption error and wipe 
out the local state of task {} " +
                                 "before re-bootstrapping", 
store.stateStore.name(), taskId);
 
-                            throw new 
TaskCorruptedException(Collections.singleton(taskId));
-                        } else {
-                            log.info("State store {} did not find checkpoint 
offset, hence would " +
-                                "default to the starting offset at changelog 
{}",
+                        throw new 
TaskCorruptedException(Collections.singleton(taskId));
+                    } else {
+                        log.info("State store {} did not find checkpoint 
offset, hence would " +
+                                        "default to the starting offset at 
changelog {}",
                                 store.stateStore.name(), 
store.changelogPartition);
-                        }
                     }
-                }  else {
-                    loadedCheckpoints.remove(store.changelogPartition);
-                    log.debug("Skipping re-initialization of offset from 
checkpoint for recycled store {}",
-                              store.stateStore.name());
                 }
             }
+        }
 
+        try {
             stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
-
-            if (!loadedCheckpoints.isEmpty()) {
-                log.warn("Some loaded checkpoint offsets cannot find their 
corresponding state stores: {}", loadedCheckpoints);
-            }
-
-            if (eosEnabled) {
-                checkpointFile.delete();
-            }
-        } catch (final TaskCorruptedException e) {
-            throw e;
-        } catch (final IOException | RuntimeException e) {
-            // both IOException or runtime exception like number parsing can 
throw
-            throw new ProcessorStateException(format("%sError loading and 
deleting checkpoint file when creating the state manager",
+        } catch (final RuntimeException e) {
+            throw new ProcessorStateException(format("%sError updating state 
directory offsets when creating the state manager",
                 logPrefix), e);
         }
     }
@@ -333,7 +332,7 @@ public class ProcessorStateManager implements StateManager {
 
         // TODO (KAFKA-12887): we should not trigger user's exception handler 
for illegal-argument but always
         // fail-crash; in this case we would not need to immediately close the 
state store before throwing
-        if (CHECKPOINT_FILE_NAME.equals(storeName)) {
+        if 
(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME.startsWith(storeName)) {
             store.close();
             throw new IllegalArgumentException(format("%sIllegal store name: 
%s, which collides with the pre-defined " +
                 "checkpoint file name", logPrefix, storeName));
@@ -368,7 +367,7 @@ public class ProcessorStateManager implements StateManager {
     @Override
     public StateStore store(final String name) {
         if (stores.containsKey(name)) {
-            return stores.get(name).stateStore;
+            return stores.get(name).store();
         } else {
             return null;
         }
@@ -382,7 +381,7 @@ public class ProcessorStateManager implements StateManager {
         final Collection<TopicPartition> partitionsToMarkAsCorrupted = new 
LinkedList<>(partitions);
         for (final StateStoreMetadata storeMetadata : stores.values()) {
             if 
(partitionsToMarkAsCorrupted.contains(storeMetadata.changelogPartition)) {
-                storeMetadata.corrupted = true;
+                storeMetadata.markCorrupted();
                 
partitionsToMarkAsCorrupted.remove(storeMetadata.changelogPartition);
             }
         }
@@ -488,26 +487,44 @@ public class ProcessorStateManager implements 
StateManager {
         if (!stores.isEmpty()) {
             log.debug("Committing all stores registered in the state manager: 
{}", stores);
             for (final StateStoreMetadata metadata : stores.values()) {
-                final StateStore store = metadata.stateStore;
-                log.trace("Committing store {}", store.name());
-                try {
-                    store.commit(Map.of());
-                } catch (final RuntimeException exception) {
-                    if (firstException == null) {
-                        // do NOT wrap the error if it is actually caused by 
Streams itself
-                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
-                        if (exception instanceof FailedProcessingException)
-                            firstException = new ProcessorStateException(
-                                format("%sFailed to commit state store %s", 
logPrefix, store.name()),
-                                exception.getCause());
-                        else if (exception instanceof StreamsException)
-                            firstException = exception;
-                        else
-                            firstException = new ProcessorStateException(
-                                format("%sFailed to commit state store %s", 
logPrefix, store.name()), exception);
-                        log.error("Failed to commit state store {}: ", 
store.name(), firstException);
-                    } else {
-                        log.error("Failed to commit state store {}: ", 
store.name(), exception);
+                if (!metadata.corrupted) {
+                    final StateStore store = metadata.stateStore;
+                    log.trace("Committing store {}", store.name());
+                    try {
+                        if (metadata.changelogPartition == null || 
metadata.offset == null || !store.persistent()) {
+                            store.commit(Map.of());
+                        } else {
+                            store.commit(Map.of(metadata.changelogPartition, 
metadata.offset));
+                        }
+
+                        if (metadata.commitCallback != null) {
+                            try {
+                                metadata.commitCallback.onCommit();
+                            } catch (final IOException e) {
+                                throw new ProcessorStateException(
+                                        format("%sException caught while 
trying to checkpoint store, " +
+                                                "changelog partition %s", 
logPrefix, metadata.changelogPartition),
+                                        e
+                                );
+                            }
+                        }
+                    } catch (final RuntimeException exception) {
+                        if (firstException == null) {
+                            // do NOT wrap the error if it is actually caused 
by Streams itself
+                            // In case of FailedProcessingException Do not 
keep the failed processing exception in the stack trace
+                            if (exception instanceof FailedProcessingException)
+                                firstException = new ProcessorStateException(
+                                        format("%sFailed to commit state store 
%s", logPrefix, store.name()),
+                                        exception.getCause());
+                            else if (exception instanceof StreamsException)
+                                firstException = exception;
+                            else
+                                firstException = new ProcessorStateException(
+                                        format("%sFailed to commit state store 
%s", logPrefix, store.name()), exception);
+                            log.error("Failed to commit state store {}: ", 
store.name(), firstException);
+                        } else {
+                            log.error("Failed to commit state store {}: ", 
store.name(), exception);
+                        }
                     }
                 }
             }
@@ -518,6 +535,10 @@ public class ProcessorStateManager implements StateManager 
{
         }
     }
 
+    @Override
+    public void checkpoint() {
+    }
+
     public void flushCache() {
         RuntimeException firstException = null;
         // attempting to flush the stores
@@ -658,45 +679,6 @@ public class ProcessorStateManager implements StateManager 
{
         stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
     }
 
-    @Override
-    public void checkpoint() {
-        // checkpoint those stores that are only logged and persistent to the 
checkpoint file
-        final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
-        for (final StateStoreMetadata storeMetadata : stores.values()) {
-            if (storeMetadata.commitCallback != null && 
!storeMetadata.corrupted) {
-                try {
-                    storeMetadata.commitCallback.onCommit();
-                } catch (final IOException e) {
-                    throw new ProcessorStateException(
-                            format("%sException caught while trying to 
checkpoint store, " +
-                                    "changelog partition %s", logPrefix, 
storeMetadata.changelogPartition),
-                            e
-                    );
-                }
-            }
-
-            // store is logged, persistent, not corrupted, and has a valid 
current offset
-            if (storeMetadata.changelogPartition != null &&
-                storeMetadata.stateStore.persistent() &&
-                !storeMetadata.corrupted) {
-
-                final long checkpointableOffset = 
checkpointableOffsetFromChangelogOffset(storeMetadata.offset);
-                checkpointingOffsets.put(storeMetadata.changelogPartition, 
checkpointableOffset);
-            }
-        }
-
-        log.debug("Writing checkpoint: {} for task {}", checkpointingOffsets, 
taskId);
-        try {
-            checkpointFile.write(checkpointingOffsets);
-        } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to [{}]." +
-                " This may occur if OS cleaned the state.dir in case when it 
located in ${java.io.tmpdir} directory." +
-                " This may also occur due to running multiple instances on the 
same machine using the same state dir." +
-                " Changing the location of state.dir may resolve the problem.",
-                checkpointFile, e);
-        }
-    }
-
     private  TopicPartition getStorePartition(final String storeName) {
         // NOTE we assume the partition of the topic can always be inferred 
from the task id;
         // if user ever use a custom partition grouper (deprecated in KIP-528) 
this would break and
@@ -723,16 +705,6 @@ public class ProcessorStateManager implements StateManager 
{
         return found.isEmpty() ? null : found.get(0);
     }
 
-    // Pass in a sentinel value to checkpoint when the changelog offset is not 
yet initialized/known
-    private long checkpointableOffsetFromChangelogOffset(final Long offset) {
-        return offset != null ? offset : OFFSET_UNKNOWN;
-    }
-
-    // Convert the written offsets in the checkpoint file back to the 
changelog offset
-    private Long changelogOffsetFromCheckpointedOffset(final long offset) {
-        return offset != OFFSET_UNKNOWN ? offset : null;
-    }
-
     public TopicPartition registeredChangelogPartitionFor(final String 
storeName) {
         final StateStoreMetadata storeMetadata = stores.get(storeName);
         if (storeMetadata == null) {
@@ -753,10 +725,4 @@ public class ProcessorStateManager implements StateManager 
{
     public String changelogFor(final String storeName) {
         return storeToChangelogTopic.get(storeName);
     }
-
-    public void deleteCheckPointFileIfEOSEnabled() throws IOException {
-        if (eosEnabled) {
-            checkpointFile.delete();
-        }
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index f66f177c0ef..906678cd350 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.api.FixedKeyRecord;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -77,7 +78,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
 /**
@@ -263,7 +263,7 @@ public class StateDirectory implements AutoCloseable {
                     try {
                         // We only handle TaskCorruptedException at this 
point. Any other exception is considered fatal.
                         StateManagerUtil.registerStateStores(log, 
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
-                        temporaryStateManager.checkpoint();
+                        temporaryStateManager.flush();
                     } catch (final TaskCorruptedException tce) {
                         // At this point, we only log a warning and continue 
with the startup store initialization.
                         // The task-corrupted exception will be handled in the 
first Task assignment phase.
@@ -426,13 +426,6 @@ public class StateDirectory implements AutoCloseable {
         return "__" + topologyName + "__";
     }
 
-    /**
-     * @return The File handle for the checkpoint in the given task's directory
-     */
-    File checkpointFileFor(final TaskId taskId) {
-        return new File(getOrCreateDirectoryForTask(taskId), 
StateManagerUtil.CHECKPOINT_FILE_NAME);
-    }
-
     /**
      * Decide if the directory of the task is empty or not
      */
@@ -444,7 +437,7 @@ public class StateDirectory implements AutoCloseable {
 
     private boolean taskDirIsEmpty(final File taskDir) {
         final File[] storeDirs = taskDir.listFiles(pathname ->
-                !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+                
!pathname.getName().startsWith(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME));
 
         boolean taskDirEmpty = true;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 3d86b5ed15b..6a02fba3cde 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -147,7 +147,7 @@ final class StateManagerUtil {
         // We should only load checkpoint AFTER the corresponding state 
directory lock has been acquired and
         // the state stores have been registered; we should not try to load at 
the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574
-        stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
+        stateMgr.initializeStoreOffsets(storeDirsEmpty);
         log.debug("Initialized state stores");
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 85c322f2b5f..6547d8c41eb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -51,7 +51,6 @@ import 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
-import java.io.IOException;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
@@ -404,14 +403,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 // just transit the state without any logical changes: 
suspended and restoring states
                 // are not actually any different for inner modules
 
-                // Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
-                try {
-                    stateMgr.deleteCheckPointFileIfEOSEnabled();
-                    log.debug("Deleted check point file upon resuming with EOS 
enabled");
-                } catch (final IOException ioe) {
-                    log.error("Encountered error while deleting the checkpoint 
file due to this exception", ioe);
-                }
-
                 transitionTo(State.RESTORING);
                 log.info("Resumed to restoring state");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index bc9449cebce..11f857820da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -257,9 +257,7 @@ public class ActiveTaskCreatorTest {
         when(builder.buildSubtopology(0)).thenReturn(topology);
         when(topology.sinkTopics()).thenReturn(emptySet());
         
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
-        
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
         
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
-        
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
         when(topology.source("topic")).thenReturn(sourceNode);
         
when(sourceNode.timestampExtractor()).thenReturn(mock(TimestampExtractor.class));
         when(topology.sources()).thenReturn(Collections.singleton(sourceNode));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 110b1446fc1..02e45d313ea 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateS
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils;
 import org.apache.kafka.test.MockCachedKeyValueStore;
@@ -49,6 +50,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
@@ -86,6 +88,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -382,6 +385,36 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    @Test
+    public void shouldRegisterPersistentUnloggedStore() {
+        final String unloggedStoreName = "unloggedPersistentStore";
+        final MockKeyValueStore unloggedStore = new 
MockKeyValueStore(unloggedStoreName, true);
+
+        // Create a state manager with no changelog map entries for the store 
— it is persistent
+        // but unlogged (no changelog partition). Registration must not throw 
NullPointerException
+        // from Set.of(null) when changelogPartition is absent.
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            Task.TaskType.ACTIVE,
+            false,
+            logContext,
+            stateDirectory,
+            emptyMap(),
+            emptySet()
+        );
+        contextRegistersStateStore(stateMgr);
+
+        try {
+            stateMgr.registerStateStores(singletonList(unloggedStore), 
context);
+
+            assertTrue(unloggedStore.initialized);
+            assertThat(stateMgr.store(unloggedStoreName), is(unloggedStore));
+            assertTrue(stateMgr.changelogPartitions().isEmpty());
+        } finally {
+            stateMgr.close();
+        }
+    }
+
     @Test
     public void shouldInitializeOffsetsFromCheckpointFile() throws IOException 
{
         final long checkpointOffset = 10L;
@@ -394,14 +427,12 @@ public class ProcessorStateManagerTest {
         checkpoint.write(offsets);
 
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+        contextRegistersStateStore(stateMgr);
 
         try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback, null);
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
persistentStoreTwo, nonPersistentStore), context);
+            stateMgr.initializeStoreOffsets(true);
 
-            assertTrue(checkpointFile.exists());
             assertEquals(Set.of(
                 persistentStorePartition,
                 persistentStoreTwoPartition,
@@ -435,12 +466,11 @@ public class ProcessorStateManagerTest {
         checkpoint.write(offsets);
 
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        contextRegistersStateStore(stateMgr);
 
         try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback, null);
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
persistentStoreTwo, nonPersistentStore), context);
+            stateMgr.initializeStoreOffsets(true);
 
             assertFalse(checkpointFile.exists());
             assertEquals(Set.of(
@@ -518,22 +548,24 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldFlushCheckpointAndClose() throws IOException {
+    public void shouldCommitAndCloseLegacyStores() throws IOException {
         checkpoint.write(emptyMap());
 
+        final File storeCheckpointFile = new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME + 
"_" + persistentStore.name());
+
         // set up ack'ed offsets
         final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
-        ackedOffsets.put(persistentStorePartition, 123L);
+        ackedOffsets.put(persistentStorePartition, 25_000L);
         ackedOffsets.put(nonPersistentStorePartition, 456L);
         ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L);
 
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+        contextRegistersStateStore(stateMgr);
         try {
             // make sure the checkpoint file is not written yet
-            assertFalse(checkpointFile.exists());
+            assertFalse(storeCheckpointFile.exists());
 
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
nonPersistentStore), context);
         } finally {
             stateMgr.flush();
 
@@ -544,13 +576,14 @@ public class ProcessorStateManagerTest {
             assertThat(persistentStore.getLastCommitCount(), 
Matchers.lessThan(nonPersistentStore.getLastCommitCount()));
 
             stateMgr.updateChangelogOffsets(ackedOffsets);
-            stateMgr.checkpoint();
+            stateMgr.flush();
 
-            assertTrue(checkpointFile.exists());
+            assertTrue(storeCheckpointFile.exists());
 
             // the checkpoint file should contain an offset from the 
persistent store only.
-            final Map<TopicPartition, Long> checkpointedOffsets = 
checkpoint.read();
-            assertThat(checkpointedOffsets, is(singletonMap(new 
TopicPartition(persistentStoreTopicName, 1), 123L)));
+            final OffsetCheckpoint storeCheckpoint = new 
OffsetCheckpoint(storeCheckpointFile);
+            final Map<TopicPartition, Long> checkpointedOffsets = 
storeCheckpoint.read();
+            assertThat(checkpointedOffsets, is(singletonMap(new 
TopicPartition(persistentStoreTopicName, 1), 25_000L)));
 
             stateMgr.close();
 
@@ -565,9 +598,10 @@ public class ProcessorStateManagerTest {
         checkpoint.write(offsets);
 
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+        contextRegistersStateStore(stateMgr);
         try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+            stateMgr.registerStateStores(singletonList(persistentStore), 
context);
+            stateMgr.initializeStoreOffsets(true);
 
             final StateStoreMetadata storeMetadata = 
stateMgr.storeMetadata(persistentStorePartition);
             assertThat(storeMetadata, notNullValue());
@@ -582,7 +616,7 @@ public class ProcessorStateManagerTest {
                 mkEntry(persistentStorePartition, 220L),
                 mkEntry(irrelevantPartition, 9000L)
             ));
-            stateMgr.checkpoint();
+            stateMgr.flush();
 
             assertThat(stateMgr.storeMetadata(irrelevantPartition), 
equalTo(null));
             assertThat(storeMetadata.offset(), equalTo(220L));
@@ -591,49 +625,6 @@ public class ProcessorStateManagerTest {
         }
     }
 
-    @Test
-    public void shouldWriteCheckpointForPersistentStore() throws IOException {
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-
-        try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
-
-            final StateStoreMetadata storeMetadata = 
stateMgr.storeMetadata(persistentStorePartition);
-            assertThat(storeMetadata, notNullValue());
-
-            stateMgr.restore(storeMetadata, singletonList(consumerRecord), 
OptionalLong.of(2L));
-
-            stateMgr.checkpoint();
-
-            final Map<TopicPartition, Long> read = checkpoint.read();
-            assertThat(read, equalTo(singletonMap(persistentStorePartition, 
100L)));
-        } finally {
-            stateMgr.close();
-        }
-    }
-
-    @Test
-    public void shouldNotWriteCheckpointForNonPersistentStore() throws 
IOException {
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-
-        try {
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
-
-            final StateStoreMetadata storeMetadata = 
stateMgr.storeMetadata(nonPersistentStorePartition);
-            assertThat(storeMetadata, notNullValue());
-
-            
stateMgr.updateChangelogOffsets(singletonMap(nonPersistentStorePartition, 
876L));
-            stateMgr.checkpoint();
-
-            final Map<TopicPartition, Long> read = checkpoint.read();
-            assertThat(read, equalTo(emptyMap()));
-        } finally {
-            stateMgr.close();
-        }
-    }
-
     @Test
     public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() 
throws IOException {
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
@@ -649,7 +640,7 @@ public class ProcessorStateManagerTest {
             stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
 
             
stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 987L));
-            stateMgr.checkpoint();
+            stateMgr.flush();
 
             final Map<TopicPartition, Long> read = checkpoint.read();
             assertThat(read, equalTo(emptyMap()));
@@ -809,18 +800,19 @@ public class ProcessorStateManagerTest {
     @Test
     public void shouldLogAWarningIfCheckpointThrowsAnIOException() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-        stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
+        contextRegistersStateStore(stateMgr);
+        
stateMgr.registerStateStores(Collections.singletonList(persistentStore), 
context);
         stateDirectory.clean();
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(ProcessorStateManager.class)) {
-            
stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 10L));
-            stateMgr.checkpoint();
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+            
stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 
25_000L));
+            stateMgr.flush();
 
             boolean foundExpectedLogMessage = false;
             for (final LogCaptureAppender.Event event : appender.getEvents()) {
                 if ("WARN".equals(event.getLevel())
                     && 
event.getMessage().startsWith("process-state-manager-test Failed to write 
offset checkpoint file to [")
-                    && event.getMessage().endsWith(".checkpoint]." +
+                    && event.getMessage().endsWith(".checkpoint_" + 
persistentStoreName + "]." +
                         " This may occur if OS cleaned the state.dir in case 
when it located in ${java.io.tmpdir} directory." +
                         " This may also occur due to running multiple 
instances on the same machine using the same state dir." +
                         " Changing the location of state.dir may resolve the 
problem.")
@@ -838,7 +830,6 @@ public class ProcessorStateManagerTest {
     public void shouldThrowIfLoadCheckpointThrows() throws Exception {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
 
-        stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
         final File file = new File(stateMgr.baseDir(), CHECKPOINT_FILE_NAME);
         Files.createFile(file.toPath());
         final FileWriter writer = new FileWriter(file);
@@ -846,7 +837,8 @@ public class ProcessorStateManagerTest {
         writer.close();
 
         try {
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+            
stateMgr.registerStateStores(Collections.singletonList(persistentStore), 
context);
+            stateMgr.initializeStoreOffsets(true);
             fail("should have thrown processor state exception when IO 
exception happens");
         } catch (final ProcessorStateException e) {
             // pass
@@ -950,7 +942,7 @@ public class ProcessorStateManagerTest {
             stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
 
             final TaskCorruptedException exception = 
assertThrows(TaskCorruptedException.class,
-                () -> stateMgr.initializeStoreOffsetsFromCheckpoint(false));
+                () -> stateMgr.initializeStoreOffsets(false));
 
             assertEquals(
                 Collections.singleton(taskId),
@@ -972,12 +964,12 @@ public class ProcessorStateManagerTest {
         checkpoint.write(offsets);
 
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        contextRegistersStateStore(stateMgr);
 
         try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
nonPersistentStore), context);
 
-            stateMgr.initializeStoreOffsetsFromCheckpoint(false);
+            stateMgr.initializeStoreOffsets(false);
         } finally {
             stateMgr.close();
         }
@@ -986,11 +978,11 @@ public class ProcessorStateManagerTest {
     @Test
     public void shouldNotThrowTaskCorruptedExceptionAfterCheckpointing() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        contextRegistersStateStore(stateMgr);
 
         try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
-            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
nonPersistentStore), context);
+            stateMgr.initializeStoreOffsets(true);
 
             assertThat(stateMgr.storeMetadata(nonPersistentStorePartition), 
notNullValue());
             assertThat(stateMgr.storeMetadata(persistentStorePartition), 
notNullValue());
@@ -999,18 +991,18 @@ public class ProcessorStateManagerTest {
                 mkEntry(nonPersistentStorePartition, 876L),
                 mkEntry(persistentStorePartition, 666L))
             );
-            stateMgr.checkpoint();
+            stateMgr.flush();
 
             // reset the state and offsets, for example as in a corrupted task
             stateMgr.close();
             assertNull(stateMgr.storeMetadata(nonPersistentStorePartition));
             assertNull(stateMgr.storeMetadata(persistentStorePartition));
 
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
+            contextRegistersStateStore(stateMgr);
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
nonPersistentStore), context);
 
             // This should not throw a TaskCorruptedException!
-            stateMgr.initializeStoreOffsetsFromCheckpoint(false);
+            stateMgr.initializeStoreOffsets(false);
             assertThat(stateMgr.storeMetadata(nonPersistentStorePartition), 
notNullValue());
             assertThat(stateMgr.storeMetadata(persistentStorePartition), 
notNullValue());
         } finally {
@@ -1026,7 +1018,7 @@ public class ProcessorStateManagerTest {
             stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
             
stateMgr.markChangelogAsCorrupted(Set.of(persistentStorePartition));
 
-            final ProcessorStateException thrown = 
assertThrows(ProcessorStateException.class, () -> 
stateMgr.initializeStoreOffsetsFromCheckpoint(true));
+            final ProcessorStateException thrown = 
assertThrows(ProcessorStateException.class, () -> 
stateMgr.initializeStoreOffsets(true));
             assertInstanceOf(IllegalStateException.class, thrown.getCause());
         } finally {
             stateMgr.close();
@@ -1040,36 +1032,6 @@ public class ProcessorStateManagerTest {
         stateMgr.close();
     }
 
-    @Test
-    public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
-        final long checkpointOffset = 10L;
-        final Map<TopicPartition, Long> offsets = mkMap(
-                mkEntry(persistentStorePartition, checkpointOffset),
-                mkEntry(nonPersistentStorePartition, checkpointOffset),
-                mkEntry(irrelevantPartition, 999L)
-        );
-        checkpoint.write(offsets);
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
-        stateMgr.deleteCheckPointFileIfEOSEnabled();
-        stateMgr.close();
-        assertFalse(checkpointFile.exists());
-    }
-
-    @Test
-    public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws 
IOException {
-        final long checkpointOffset = 10L;
-        final Map<TopicPartition, Long> offsets = mkMap(
-                mkEntry(persistentStorePartition, checkpointOffset),
-                mkEntry(nonPersistentStorePartition, checkpointOffset),
-                mkEntry(irrelevantPartition, 999L)
-        );
-        checkpoint.write(offsets);
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, false);
-        stateMgr.deleteCheckPointFileIfEOSEnabled();
-        stateMgr.close();
-        assertTrue(checkpointFile.exists());
-    }
-
     @Test
     public void shouldWritePositionCheckpointFile() throws IOException {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
@@ -1088,7 +1050,7 @@ public class ProcessorStateManagerTest {
 
         assertFalse(persistentCheckpoint.getFile().exists());
 
-        stateMgr.checkpoint();
+        stateMgr.flush();
 
         assertTrue(persistentCheckpoint.getFile().exists());
 
@@ -1124,7 +1086,7 @@ public class ProcessorStateManagerTest {
 
         final ProcessorStateException processorStateException = assertThrows(
             ProcessorStateException.class,
-            stateMgr::checkpoint
+            stateMgr::flush
         );
 
         assertThat(
@@ -1209,6 +1171,13 @@ public class ProcessorStateManagerTest {
         return getStateManager(taskType, false);
     }
 
+    private void contextRegistersStateStore(final StateManager stateManager) {
+        Mockito.doAnswer(a -> {
+            stateManager.registerStore(a.getArgument(0), a.getArgument(1), () 
-> { });
+            return null;
+        }).when(context).register(any(), any());
+    }
+
     private MockKeyValueStore getConverterStore() {
         return new ConverterStore(persistentStoreName, true);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 1ee6594d119..98ec0f1a64b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -110,7 +110,7 @@ public class StateManagerUtilTest {
             topology, stateManager, stateDirectory, processorContext);
 
         inOrder.verify(stateManager).registerStateStores(stateStores, 
processorContext);
-        
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        inOrder.verify(stateManager).initializeStoreOffsets(true);
         verifyNoMoreInteractions(stateManager);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8f9d228c273..cc3cd12b918 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -117,7 +117,6 @@ import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -4992,7 +4991,6 @@ public class TaskManagerTest {
         final Path checkpointFilePath = checkpointFile.toPath();
         Files.createFile(checkpointFilePath);
         new OffsetCheckpoint(checkpointFile).write(offsets);
-        
lenient().when(stateDirectory.checkpointFileFor(task)).thenReturn(checkpointFile);
         expectDirectoryNotEmpty(task);
     }
 

Reply via email to