This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 823f0cfc8cb KAFKA-20257: GlobalStateManagerImpl delegates offset 
tracking to stores (#21739)
823f0cfc8cb is described below

commit 823f0cfc8cb4d6295f9385ce1dca2b172f3af7a8
Author: Nick Telford <[email protected]>
AuthorDate: Mon Mar 16 17:14:46 2026 +0000

    KAFKA-20257: GlobalStateManagerImpl delegates offset tracking to stores 
(#21739)
    
    As part of KIP-1035, we want to transition away from task-specific
    .checkpoint files, and instead delegate offset management to
    StateStores.
    
    We now have a `LegacyCheckpointingStateStore` wrapper to encapsulate the
    management of offsets for `StateStore` implementations that do not know
    how to manage their own offsets (i.e. for which `managesOffsets() ==
    false`).
    
    As of KAFKA-20212, `RocksDBStore` now knows how to manage its own
    offsets, so it will not be wrapped in a `LegacyCheckpointingStateStore`;
    only user-defined persistent stores will use this wrapper.
    
    Corresponding changes to `ProcessorStateManager` have been submitted
    independently, as KAFKA-19712.
    
    Until both `ProcessorStateManager` and `GlobalStateManagerImpl` have
    been updated, the `StateManager` interface must remain as-is. Therefore,
    the `flush` and `checkpoint` methods will not be consolidated until a
    later PR, which will clean up the interface and its usage by `Task` and
    friends.
    
    Reviewers: Bill Bejeck <[email protected]>
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../GlobalKTableEOSIntegrationTest.java            |   2 +-
 .../internals/GlobalStateManagerImpl.java          | 123 ++++++++++-----------
 .../internals/GlobalStateManagerImplTest.java      | 123 +++++++++++----------
 3 files changed, 124 insertions(+), 124 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 640e438103f..6e8a345c7e9 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -337,7 +337,7 @@ public class GlobalKTableEOSIntegrationTest {
                 + File.separator
                 + "global");
         assertTrue(globalStateDir.mkdirs());
-        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(globalStateDir, ".checkpoint"));
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(globalStateDir, ".checkpoint_" + globalStore));
 
         // set the checkpointed offset to the commit marker of partition-1
         // even if `poll()` won't return any data for partition-1, we should 
still finish the restore
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f762338b947..adc4e808f8d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
 import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.CommitCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -44,13 +45,12 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 
 import org.slf4j.Logger;
 
 import java.io.File;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -65,7 +65,6 @@ import java.util.function.Supplier;
 
 import static 
org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.processor.internals.RecordDeserializer.handleDeserializationFailure;
-import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
@@ -102,19 +101,20 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
     private final Time time;
     private final Logger log;
+    private final String logPrefix;
+    private final StateDirectory stateDirectory;
     private final File baseDir;
     private final long taskTimeoutMs;
     private final ProcessorTopology topology;
-    private final OffsetCheckpoint checkpointFile;
     private final Duration pollMsPlusRequestTimeout;
     private final Consumer<byte[], byte[]> globalConsumer;
     private final StateRestoreListener stateRestoreListener;
-    private final Map<TopicPartition, Long> checkpointFileCache;
+    private final Map<TopicPartition, Long> currentOffsets;
     private final Map<String, String> storeToChangelogTopic;
     private final Set<String> globalStoreNames = new HashSet<>();
-    private final Set<String> globalNonPersistentStoresTopics = new 
HashSet<>();
     private final FixedOrderMap<String, Optional<StateStore>> globalStores = 
new FixedOrderMap<>();
     private final Map<String, StateStoreMetadata> storeMetadata = new 
HashMap<>();
+    private final boolean eosEnabled;
     private InternalProcessorContext<?, ?> globalProcessorContext;
     private DeserializationExceptionHandler deserializationExceptionHandler;
     private ProcessingExceptionHandler processingExceptionHandler;
@@ -129,20 +129,18 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                                   final StreamsConfig config) {
         this.time = time;
         this.topology = topology;
+        this.stateDirectory = stateDirectory;
         baseDir = stateDirectory.globalStateDir();
         storeToChangelogTopic = topology.storeToChangelogTopic();
-        checkpointFile = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-        checkpointFileCache = new HashMap<>();
+        currentOffsets = new HashMap<>();
 
         // Find non persistent store's topics
         for (final StateStore store : topology.globalStateStores()) {
             globalStoreNames.add(store.name());
-            if (!store.persistent()) {
-                
globalNonPersistentStoresTopics.add(changelogFor(store.name()));
-            }
         }
 
         log = logContext.logger(GlobalStateManagerImpl.class);
+        logPrefix = logContext.logPrefix();
         this.globalConsumer = globalConsumer;
         this.stateRestoreListener = stateRestoreListener;
 
@@ -160,6 +158,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         @SuppressWarnings("deprecation")
         final boolean globalEnabled = 
config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
         processingExceptionHandler = globalEnabled ? 
config.processingExceptionHandler() : null;
+        eosEnabled = StreamsConfigUtils.eosEnabled(config);
     }
 
     @Override
@@ -169,41 +168,40 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
     @Override
     public Set<String> initialize() {
-        try {
-            checkpointFileCache.putAll(checkpointFile.read());
-        } catch (final IOException e) {
-            throw new StreamsException("Failed to read checkpoints for global 
state globalStores", e);
-        }
-
         droppedRecordsSensor = droppedRecordsSensor(
             Thread.currentThread().getName(),
             globalProcessorContext.taskId().toString(),
             globalProcessorContext.metrics()
         );
 
-        final Set<String> changelogTopics = new HashSet<>();
+        final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
         for (final StateStore stateStore : topology.globalStateStores()) {
-            final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
-            changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
-        }
+            final List<TopicPartition> storePartitions = 
topicPartitionsForStore(stateStore);
+            final StateStore maybeWrappedStore = 
LegacyCheckpointingStateStore.maybeWrapStore(
+                    stateStore, eosEnabled, new HashSet<>(storePartitions), 
stateDirectory, null, logPrefix);
+            maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
 
-        // make sure each topic-partition from checkpointFileCache is 
associated with a global state store
-        checkpointFileCache.keySet().forEach(tp -> {
-            if (!changelogTopics.contains(tp.topic())) {
-                log.error(
-                    "Encountered a topic-partition in the global checkpoint 
file not associated with any global" +
-                        " state store, topic-partition: {}, checkpoint file: 
{}. If this topic-partition is no longer valid," +
-                        " an application reset and state store directory 
cleanup will be required.",
-                    tp.topic(),
-                    checkpointFile
-                );
-                throw new StreamsException("Encountered a topic-partition not 
associated with any global state store");
+            for (final TopicPartition storePartition : storePartitions) {
+                wrappedStores.put(storePartition, maybeWrappedStore);
             }
-        });
+        }
+
+        // migrate offsets from legacy checkpoint file into the stores
+        LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, 
stateDirectory, null, wrappedStores);
 
-        // restore or reprocess each registered store using the now-populated 
currentOffsets
         for (final StateStoreMetadata metadata : storeMetadata.values()) {
+            // load the committed offsets from the store
+            final StateStore store = metadata.stateStore;
+            if (store.persistent()) {
+                for (final TopicPartition partition : 
metadata.changelogPartitions) {
+                    final Long offset = store.committedOffset(partition);
+                    if (offset != null) {
+                        currentOffsets.put(partition, offset);
+                    }
+                }
+            }
+
+            // restore or reprocess each registered store using the 
now-populated currentOffsets
             try {
                 if (metadata.reprocessFactory.isPresent()) {
                     reprocessState(metadata);
@@ -219,7 +217,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     }
 
     public StateStore globalStore(final String name) {
-        return globalStores.getOrDefault(name, Optional.empty()).orElse(null);
+        return 
LegacyCheckpointingStateStore.maybeUnwrapStore(globalStores.getOrDefault(name, 
Optional.empty()).orElse(null));
     }
 
     @Override
@@ -267,9 +265,9 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         );
 
         final Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>> 
reprocessFactory = topology
-            .storeNameToReprocessOnRestore().getOrDefault(store.name(), 
Optional.empty());
+                .storeNameToReprocessOnRestore().getOrDefault(store.name(), 
Optional.empty());
         storeMetadata.put(store.name(), new StateStoreMetadata(
-            store, topicPartitions, reprocessFactory, stateRestoreCallback, 
converterForStore(store), highWatermarks));
+                store, topicPartitions, reprocessFactory, 
stateRestoreCallback, converterForStore(store), highWatermarks));
     }
 
     private List<TopicPartition> topicPartitionsForStore(final StateStore 
store) {
@@ -310,7 +308,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
             globalConsumer.assign(Collections.singletonList(topicPartition));
             long offset;
-            final Long checkpoint = checkpointFileCache.get(topicPartition);
+            final Long checkpoint = currentOffsets.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
                 offset = checkpoint;
@@ -441,7 +439,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                 stateRestoreListener.onBatchRestored(topicPartition, 
storeMetadata.stateStore.name(), offset, batchRestoreCount);
             }
             stateRestoreListener.onRestoreEnd(topicPartition, 
storeMetadata.stateStore.name(), restoreCount);
-            checkpointFileCache.put(topicPartition, offset);
+            currentOffsets.put(topicPartition, offset);
 
         }
     }
@@ -452,7 +450,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
             globalConsumer.assign(Collections.singletonList(topicPartition));
             long offset;
-            final Long checkpoint = checkpointFileCache.get(topicPartition);
+            final Long checkpoint = currentOffsets.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
                 offset = checkpoint;
@@ -497,7 +495,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                 restoreCount += restoreRecords.size();
             }
             stateRestoreListener.onRestoreEnd(topicPartition, 
storeMetadata.stateStore.name(), restoreCount);
-            checkpointFileCache.put(topicPartition, offset);
+            currentOffsets.put(topicPartition, offset);
         }
     }
 
@@ -561,7 +559,17 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                 final StateStore store = entry.getValue().get();
                 try {
                     log.trace("Committing global store={}", store.name());
-                    store.commit(Map.of());
+                    // construct per-store Map of offsets to commit
+                    final List<TopicPartition> storePartitions = 
storeMetadata.get(store.name()).changelogPartitions;
+                    final Map<TopicPartition, Long> storeOffsets = new 
HashMap<>(storePartitions.size());
+
+                    // only add offsets for persistent stores
+                    if (store.persistent()) {
+                        for (final TopicPartition storePartition : 
storePartitions) {
+                            storeOffsets.put(storePartition, 
currentOffsets.get(storePartition));
+                        }
+                    }
+                    store.commit(storeOffsets);
                 } catch (final RuntimeException e) {
                     throw new ProcessorStateException(
                         String.format("Failed to commit global state store 
%s", store.name()),
@@ -574,6 +582,10 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         }
     }
 
+    @Override
+    public void checkpoint() {
+    }
+
     @Override
     public void close() {
         if (globalStores.isEmpty()) {
@@ -606,28 +618,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
     @Override
     public void updateChangelogOffsets(final Map<TopicPartition, Long> 
offsets) {
-        checkpointFileCache.putAll(offsets);
-    }
-
-    @Override
-    public void checkpoint() {
-        final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
-
-        // Skip non persistent store
-        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointFileCache.entrySet()) {
-            final String topic = topicPartitionOffset.getKey().topic();
-            if (!globalNonPersistentStoresTopics.contains(topic)) {
-                filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
-            }
-        }
-
-        try {
-            checkpointFile.write(filteredOffsets);
-        } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to {} for global 
stores." +
-                " This may occur if OS cleaned the state.dir in case when it 
is located in the (default) ${java.io.tmpdir}/kafka-streams directory." +
-                " Changing the location of state.dir may resolve the problem", 
checkpointFile, e);
-        }
+        currentOffsets.putAll(offsets);
     }
 
     @Override
@@ -637,7 +628,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
     @Override
     public Map<TopicPartition, Long> changelogOffsets() {
-        return Collections.unmodifiableMap(checkpointFileCache);
+        return Collections.unmodifiableMap(currentOffsets);
     }
 
     public final String changelogFor(final String storeName) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 5b814b7f636..6de572c5a32 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -175,70 +176,58 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldReadCheckpointOffsets() throws IOException {
-        final Map<TopicPartition, Long> expected = writeCheckpoint();
+        writeCheckpoint();
 
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+        processorContext.setStateManger(stateManager);
         stateManager.initialize();
         final Map<TopicPartition, Long> offsets = 
stateManager.changelogOffsets();
-        assertEquals(expected, offsets);
+        assertEquals(mkMap(
+                mkEntry(t1, 1L),
+                mkEntry(t2, 0L),
+                mkEntry(t3, 0L),
+                mkEntry(t4, 0L),
+                mkEntry(t5, 0L)
+        ), offsets);
     }
 
     @Test
     public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
-        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 
25L);
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 
25_000L);
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+        processorContext.setStateManger(stateManager);
+        stateManager.setGlobalProcessorContext(processorContext);
         stateManager.initialize();
         stateManager.updateChangelogOffsets(offsets);
 
+        final File storeCheckpointFile = new 
File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + 
"_" + storeName1);
+
         // set readonly to the CHECKPOINT_FILE_NAME.tmp file because we will 
write data to the .tmp file first
         // and then swap to CHECKPOINT_FILE_NAME by replacing it
-        final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+        final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName1 + ".tmp");
         Files.createFile(file.toPath());
         file.setWritable(false);
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
-            stateManager.checkpoint();
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+            stateManager.flush();
             assertThat(appender.getMessages(), hasItem(containsString(
-                "Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));
+                "Failed to write offset checkpoint file to [" + 
storeCheckpointFile.getPath() + "]. " +
+                "This may occur if OS cleaned the state.dir in case when it 
located in ${java.io.tmpdir} directory. " +
+                "This may also occur due to running multiple instances on the 
same machine using the same state dir. " +
+                "Changing the location of state.dir may resolve the 
problem.")));
         }
     }
 
-    @Test
-    public void shouldThrowStreamsExceptionForOldTopicPartitions() throws 
IOException {
-        final HashMap<TopicPartition, Long> expectedOffsets = new HashMap<>();
-        expectedOffsets.put(t1, 1L);
-        expectedOffsets.put(t2, 1L);
-        expectedOffsets.put(t3, 1L);
-        expectedOffsets.put(t4, 1L);
-
-        // add an old topic (a topic not associated with any global state 
store)
-        final HashMap<TopicPartition, Long> startOffsets = new 
HashMap<>(expectedOffsets);
-        final TopicPartition tOld = new TopicPartition("oldTopic", 1);
-        startOffsets.put(tOld, 1L);
-
-        // start with a checkpoint file will all topic-partitions: expected 
and old (not
-        // associated with any global state store).
-        final OffsetCheckpoint checkpoint = new 
OffsetCheckpoint(checkpointFile);
-        checkpoint.write(startOffsets);
-
-        // initialize will throw exception
-        final StreamsException e = assertThrows(StreamsException.class, () -> 
stateManager.initialize());
-        assertThat(e.getMessage(), equalTo("Encountered a topic-partition not 
associated with any global state store"));
-    }
-
-    @Test
-    public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
-        writeCheckpoint();
-        stateManager.initialize();
-        assertTrue(checkpointFile.exists());
-    }
-
     @Test
     public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() 
throws IOException {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         writeCorruptCheckpoint();
         assertThrows(StreamsException.class, stateManager::initialize);
     }
 
     @Test
     public void shouldInitializeStateStores() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         assertTrue(store1.initialized);
         assertTrue(store2.initialized);
@@ -246,12 +235,14 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldReturnInitializedStoreNames() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         final Set<String> storeNames = stateManager.initialize();
         assertEquals(Set.of(storeName1, storeName2, storeName3, storeName4, 
storeName5), storeNames);
     }
 
     @Test
     public void 
shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
 
         try {
@@ -264,6 +255,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void 
shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         initializeConsumer(2, 0, t1);
         stateManager.registerStore(store1, stateRestoreCallback, null);
@@ -277,8 +269,8 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
-        stateManager.initialize();
         try {
+            stateManager.initialize();
             stateManager.registerStore(store1, stateRestoreCallback, null);
             fail("Should have raised a StreamsException as there are no 
partition for the store");
         } catch (final StreamsException e) {
@@ -402,7 +394,8 @@ public class GlobalStateManagerImplTest {
 
 
     @Test
-    public void shouldFlushStateStores() {
+    public void shouldCommitStateStores() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         // register the stores
         initializeConsumer(1, 0, t1);
@@ -417,6 +410,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowProcessorStateStoreExceptionIfStoreCommitFailed() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         // register the stores
         initializeConsumer(1, 0, t1);
@@ -431,6 +425,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldCloseStateStores() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         // register the stores
         initializeConsumer(1, 0, t1);
@@ -445,6 +440,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         initializeConsumer(1, 0, t1);
         stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) {
@@ -459,6 +455,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         try {
             stateManager.registerStore(store1, null, null);
@@ -470,6 +467,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldNotCloseStoresIfCloseAlreadyCalled() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         initializeConsumer(1, 0, t1);
         stateManager.registerStore(new NoOpReadOnlyStore<>("t1-store") {
@@ -488,8 +486,8 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldAttemptToCloseAllStoresEvenWhenSomeException() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
-        initializeConsumer(1, 0, t1);
         final NoOpReadOnlyStore<Object, Object> store = new 
NoOpReadOnlyStore<>("t1-store") {
             @Override
             public void close() {
@@ -497,6 +495,7 @@ public class GlobalStateManagerImplTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
+        initializeConsumer(1, 0, t1);
         stateManager.registerStore(store, stateRestoreCallback, null);
 
         initializeConsumer(1, 0, t2);
@@ -513,19 +512,28 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldCheckpointOffsets() throws IOException {
-        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 
25L);
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+        stateManager.setGlobalProcessorContext(processorContext);
+        processorContext.setStateManger(stateManager);
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 
25_000L);
         stateManager.initialize();
 
         stateManager.updateChangelogOffsets(offsets);
-        stateManager.checkpoint();
+        stateManager.flush();
 
-        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
-        assertThat(result, equalTo(offsets));
-        assertThat(stateManager.changelogOffsets(), equalTo(offsets));
+        assertThat(readOffsetsCheckpoint(storeName1), equalTo(offsets));
+        assertThat(stateManager.changelogOffsets(), equalTo(mkMap(
+                mkEntry(t1, 25_000L),
+                mkEntry(t2, 0L),
+                mkEntry(t3, 0L),
+                mkEntry(t4, 0L),
+                mkEntry(t5, 0L)
+        )));
     }
 
     @Test
     public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         stateManager.initialize();
         initializeConsumer(10, 0, t1);
         stateManager.registerStore(store1, stateRestoreCallback, null);
@@ -534,7 +542,7 @@ public class GlobalStateManagerImplTest {
 
         final Map<TopicPartition, Long> initialCheckpoint = 
stateManager.changelogOffsets();
         stateManager.updateChangelogOffsets(Collections.singletonMap(t1, 
101L));
-        stateManager.checkpoint();
+        stateManager.flush();
 
         final Map<TopicPartition, Long> updatedCheckpoint = 
stateManager.changelogOffsets();
         assertThat(updatedCheckpoint.get(t2), 
equalTo(initialCheckpoint.get(t2)));
@@ -566,10 +574,12 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
         initializeConsumer(0, 0, t2, t3, t4, t5);
-        initializeConsumer(10, 0, t1);
         processorContext.setStateManger(stateManager);
+        stateManager.setGlobalProcessorContext(processorContext);
+
+        initializeConsumer(10, 0, t1);
         stateManager.initialize();
-        stateManager.checkpoint();
+        stateManager.flush();
         stateManager.close();
 
         final Map<TopicPartition, Long> checkpointMap = 
stateManager.changelogOffsets();
@@ -582,26 +592,23 @@ public class GlobalStateManagerImplTest {
                 mkEntry(t5, 0L)
         )));
 
-        // checkpoint file only contains persistent store offsets
-        assertThat(readOffsetsCheckpoint(), equalTo(mkMap(
-                mkEntry(t1, 10L),
-                mkEntry(t2, 0L)
-        )));
+        assertThat(readOffsetsCheckpoint(storeName1), 
equalTo(mkMap(mkEntry(t1, 10L))));
+        assertThat(readOffsetsCheckpoint(storeName2), 
equalTo(mkMap(mkEntry(t2, 0L))));
     }
 
     @Test
     public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws 
IOException {
-        initializeConsumer(0, 0, t1, t3, t4, t5);
+        initializeConsumer(0, 0, t1, t2, t4, t5);
         initializeConsumer(10, 0, t3);
         stateManager.initialize();
         stateManager.close();
 
-        assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
+        assertThat(readOffsetsCheckpoint(storeName3), 
equalTo(Collections.emptyMap()));
     }
 
-    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {
+    private Map<TopicPartition, Long> readOffsetsCheckpoint(final String 
storeName) throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
-                                                                               
 StateManagerUtil.CHECKPOINT_FILE_NAME));
+                StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName));
         return offsetCheckpoint.read();
     }
 
@@ -1168,6 +1175,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldFailOnDeserializationErrorsWhenReprocessing() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         setUpReprocessing();
         initializeConsumer(0, 0, t1, t2, t3, t4);
         initializeConsumer(2, 0, t5);
@@ -1180,6 +1188,7 @@ public class GlobalStateManagerImplTest {
     public void shouldSkipOnDeserializationErrorsWhenReprocessing() {
         stateManager.setDeserializationExceptionHandler(new 
LogAndContinueExceptionHandler());
         setUpReprocessing();
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
         initializeConsumer(2, 0, t5);
 
         stateManager.initialize();

Reply via email to