This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 823f0cfc8cb KAFKA-20257: GlobalStateManagerImpl delegates offset
tracking to stores (#21739)
823f0cfc8cb is described below
commit 823f0cfc8cb4d6295f9385ce1dca2b172f3af7a8
Author: Nick Telford <[email protected]>
AuthorDate: Mon Mar 16 17:14:46 2026 +0000
KAFKA-20257: GlobalStateManagerImpl delegates offset tracking to stores
(#21739)
As part of KIP-1035, we want to transition away from task-specific
.checkpoint files, and instead delegate offset management to
StateStores.
We now have a `LegacyCheckpointingStateStore` wrapper to encapsulate the
management of offsets for `StateStore` implementations that do not know
how to manage their own offsets (i.e. for which `managesOffsets() ==
false`).
As of KAFKA-20212, `RocksDBStore` now knows how to manage its own
offsets, so it will not be wrapped in a `LegacyCheckpointingStateStore`;
only user-defined persistent stores will use this wrapper.
Corresponding changes to `ProcessorStateManager` have been submitted
independently, as KAFKA-19712.
Until both `ProcessorStateManager` and `GlobalStateManagerImpl` have
been updated, the `StateManager` interface must remain as-is. Therefore,
the `flush` and `checkpoint` methods will not be consolidated until a
later PR, which will clean up the interface and its usage by `Task` and
friends.
Reviewers: Bill Bejeck <[email protected]>
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../GlobalKTableEOSIntegrationTest.java | 2 +-
.../internals/GlobalStateManagerImpl.java | 123 ++++++++++-----------
.../internals/GlobalStateManagerImplTest.java | 123 +++++++++++----------
3 files changed, 124 insertions(+), 124 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 640e438103f..6e8a345c7e9 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -337,7 +337,7 @@ public class GlobalKTableEOSIntegrationTest {
+ File.separator
+ "global");
assertTrue(globalStateDir.mkdirs());
- final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(globalStateDir, ".checkpoint"));
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(globalStateDir, ".checkpoint_" + globalStore));
// set the checkpointed offset to the commit marker of partition-1
// even if `poll()` won't return any data for partition-1, we should
still finish the restore
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f762338b947..adc4e808f8d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -44,13 +45,12 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;
import java.io.File;
-import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -65,7 +65,6 @@ import java.util.function.Supplier;
import static
org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static
org.apache.kafka.streams.processor.internals.RecordDeserializer.handleDeserializationFailure;
-import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
@@ -102,19 +101,20 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
private final Time time;
private final Logger log;
+ private final String logPrefix;
+ private final StateDirectory stateDirectory;
private final File baseDir;
private final long taskTimeoutMs;
private final ProcessorTopology topology;
- private final OffsetCheckpoint checkpointFile;
private final Duration pollMsPlusRequestTimeout;
private final Consumer<byte[], byte[]> globalConsumer;
private final StateRestoreListener stateRestoreListener;
- private final Map<TopicPartition, Long> checkpointFileCache;
+ private final Map<TopicPartition, Long> currentOffsets;
private final Map<String, String> storeToChangelogTopic;
private final Set<String> globalStoreNames = new HashSet<>();
- private final Set<String> globalNonPersistentStoresTopics = new
HashSet<>();
private final FixedOrderMap<String, Optional<StateStore>> globalStores =
new FixedOrderMap<>();
private final Map<String, StateStoreMetadata> storeMetadata = new
HashMap<>();
+ private final boolean eosEnabled;
private InternalProcessorContext<?, ?> globalProcessorContext;
private DeserializationExceptionHandler deserializationExceptionHandler;
private ProcessingExceptionHandler processingExceptionHandler;
@@ -129,20 +129,18 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
final StreamsConfig config) {
this.time = time;
this.topology = topology;
+ this.stateDirectory = stateDirectory;
baseDir = stateDirectory.globalStateDir();
storeToChangelogTopic = topology.storeToChangelogTopic();
- checkpointFile = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
- checkpointFileCache = new HashMap<>();
+ currentOffsets = new HashMap<>();
// Find non persistent store's topics
for (final StateStore store : topology.globalStateStores()) {
globalStoreNames.add(store.name());
- if (!store.persistent()) {
-
globalNonPersistentStoresTopics.add(changelogFor(store.name()));
- }
}
log = logContext.logger(GlobalStateManagerImpl.class);
+ logPrefix = logContext.logPrefix();
this.globalConsumer = globalConsumer;
this.stateRestoreListener = stateRestoreListener;
@@ -160,6 +158,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
@SuppressWarnings("deprecation")
final boolean globalEnabled =
config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
processingExceptionHandler = globalEnabled ?
config.processingExceptionHandler() : null;
+ eosEnabled = StreamsConfigUtils.eosEnabled(config);
}
@Override
@@ -169,41 +168,40 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
@Override
public Set<String> initialize() {
- try {
- checkpointFileCache.putAll(checkpointFile.read());
- } catch (final IOException e) {
- throw new StreamsException("Failed to read checkpoints for global
state globalStores", e);
- }
-
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
globalProcessorContext.taskId().toString(),
globalProcessorContext.metrics()
);
- final Set<String> changelogTopics = new HashSet<>();
+ final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
for (final StateStore stateStore : topology.globalStateStores()) {
- final String sourceTopic =
storeToChangelogTopic.get(stateStore.name());
- changelogTopics.add(sourceTopic);
- stateStore.init(globalProcessorContext, stateStore);
- }
+ final List<TopicPartition> storePartitions =
topicPartitionsForStore(stateStore);
+ final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
+ stateStore, eosEnabled, new HashSet<>(storePartitions),
stateDirectory, null, logPrefix);
+ maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
- // make sure each topic-partition from checkpointFileCache is
associated with a global state store
- checkpointFileCache.keySet().forEach(tp -> {
- if (!changelogTopics.contains(tp.topic())) {
- log.error(
- "Encountered a topic-partition in the global checkpoint
file not associated with any global" +
- " state store, topic-partition: {}, checkpoint file:
{}. If this topic-partition is no longer valid," +
- " an application reset and state store directory
cleanup will be required.",
- tp.topic(),
- checkpointFile
- );
- throw new StreamsException("Encountered a topic-partition not
associated with any global state store");
+ for (final TopicPartition storePartition : storePartitions) {
+ wrappedStores.put(storePartition, maybeWrappedStore);
}
- });
+ }
+
+ // migrate offsets from legacy checkpoint file into the stores
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix,
stateDirectory, null, wrappedStores);
- // restore or reprocess each registered store using the now-populated
currentOffsets
for (final StateStoreMetadata metadata : storeMetadata.values()) {
+ // load the committed offsets from the store
+ final StateStore store = metadata.stateStore;
+ if (store.persistent()) {
+ for (final TopicPartition partition :
metadata.changelogPartitions) {
+ final Long offset = store.committedOffset(partition);
+ if (offset != null) {
+ currentOffsets.put(partition, offset);
+ }
+ }
+ }
+
+ // restore or reprocess each registered store using the
now-populated currentOffsets
try {
if (metadata.reprocessFactory.isPresent()) {
reprocessState(metadata);
@@ -219,7 +217,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
}
public StateStore globalStore(final String name) {
- return globalStores.getOrDefault(name, Optional.empty()).orElse(null);
+ return
LegacyCheckpointingStateStore.maybeUnwrapStore(globalStores.getOrDefault(name,
Optional.empty()).orElse(null));
}
@Override
@@ -267,9 +265,9 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
);
final Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>>
reprocessFactory = topology
- .storeNameToReprocessOnRestore().getOrDefault(store.name(),
Optional.empty());
+ .storeNameToReprocessOnRestore().getOrDefault(store.name(),
Optional.empty());
storeMetadata.put(store.name(), new StateStoreMetadata(
- store, topicPartitions, reprocessFactory, stateRestoreCallback,
converterForStore(store), highWatermarks));
+ store, topicPartitions, reprocessFactory,
stateRestoreCallback, converterForStore(store), highWatermarks));
}
private List<TopicPartition> topicPartitionsForStore(final StateStore
store) {
@@ -310,7 +308,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
globalConsumer.assign(Collections.singletonList(topicPartition));
long offset;
- final Long checkpoint = checkpointFileCache.get(topicPartition);
+ final Long checkpoint = currentOffsets.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
offset = checkpoint;
@@ -441,7 +439,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
stateRestoreListener.onBatchRestored(topicPartition,
storeMetadata.stateStore.name(), offset, batchRestoreCount);
}
stateRestoreListener.onRestoreEnd(topicPartition,
storeMetadata.stateStore.name(), restoreCount);
- checkpointFileCache.put(topicPartition, offset);
+ currentOffsets.put(topicPartition, offset);
}
}
@@ -452,7 +450,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
globalConsumer.assign(Collections.singletonList(topicPartition));
long offset;
- final Long checkpoint = checkpointFileCache.get(topicPartition);
+ final Long checkpoint = currentOffsets.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
offset = checkpoint;
@@ -497,7 +495,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
restoreCount += restoreRecords.size();
}
stateRestoreListener.onRestoreEnd(topicPartition,
storeMetadata.stateStore.name(), restoreCount);
- checkpointFileCache.put(topicPartition, offset);
+ currentOffsets.put(topicPartition, offset);
}
}
@@ -561,7 +559,17 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
final StateStore store = entry.getValue().get();
try {
log.trace("Committing global store={}", store.name());
- store.commit(Map.of());
+ // construct per-store Map of offsets to commit
+ final List<TopicPartition> storePartitions =
storeMetadata.get(store.name()).changelogPartitions;
+ final Map<TopicPartition, Long> storeOffsets = new
HashMap<>(storePartitions.size());
+
+ // only add offsets for persistent stores
+ if (store.persistent()) {
+ for (final TopicPartition storePartition :
storePartitions) {
+ storeOffsets.put(storePartition,
currentOffsets.get(storePartition));
+ }
+ }
+ store.commit(storeOffsets);
} catch (final RuntimeException e) {
throw new ProcessorStateException(
String.format("Failed to commit global state store
%s", store.name()),
@@ -574,6 +582,10 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
}
}
+ @Override
+ public void checkpoint() {
+ }
+
@Override
public void close() {
if (globalStores.isEmpty()) {
@@ -606,28 +618,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
@Override
public void updateChangelogOffsets(final Map<TopicPartition, Long>
offsets) {
- checkpointFileCache.putAll(offsets);
- }
-
- @Override
- public void checkpoint() {
- final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
-
- // Skip non persistent store
- for (final Map.Entry<TopicPartition, Long> topicPartitionOffset :
checkpointFileCache.entrySet()) {
- final String topic = topicPartitionOffset.getKey().topic();
- if (!globalNonPersistentStoresTopics.contains(topic)) {
- filteredOffsets.put(topicPartitionOffset.getKey(),
topicPartitionOffset.getValue());
- }
- }
-
- try {
- checkpointFile.write(filteredOffsets);
- } catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to {} for global
stores." +
- " This may occur if OS cleaned the state.dir in case when it
is located in the (default) ${java.io.tmpdir}/kafka-streams directory." +
- " Changing the location of state.dir may resolve the problem",
checkpointFile, e);
- }
+ currentOffsets.putAll(offsets);
}
@Override
@@ -637,7 +628,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
@Override
public Map<TopicPartition, Long> changelogOffsets() {
- return Collections.unmodifiableMap(checkpointFileCache);
+ return Collections.unmodifiableMap(currentOffsets);
}
public final String changelogFor(final String storeName) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 5b814b7f636..6de572c5a32 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockStateRestoreListener;
@@ -175,70 +176,58 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldReadCheckpointOffsets() throws IOException {
- final Map<TopicPartition, Long> expected = writeCheckpoint();
+ writeCheckpoint();
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+ processorContext.setStateManger(stateManager);
stateManager.initialize();
final Map<TopicPartition, Long> offsets =
stateManager.changelogOffsets();
- assertEquals(expected, offsets);
+ assertEquals(mkMap(
+ mkEntry(t1, 1L),
+ mkEntry(t2, 0L),
+ mkEntry(t3, 0L),
+ mkEntry(t4, 0L),
+ mkEntry(t5, 0L)
+ ), offsets);
}
@Test
public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws
IOException {
- final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1,
25L);
+ final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1,
25_000L);
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+ processorContext.setStateManger(stateManager);
+ stateManager.setGlobalProcessorContext(processorContext);
stateManager.initialize();
stateManager.updateChangelogOffsets(offsets);
+ final File storeCheckpointFile = new
File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME +
"_" + storeName1);
+
// set readonly to the CHECKPOINT_FILE_NAME.tmp file because we will
write data to the .tmp file first
// and then swap to CHECKPOINT_FILE_NAME by replacing it
- final File file = new File(stateDirectory.globalStateDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+ final File file = new File(stateDirectory.globalStateDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName1 + ".tmp");
Files.createFile(file.toPath());
file.setWritable(false);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
- stateManager.checkpoint();
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+ stateManager.flush();
assertThat(appender.getMessages(), hasItem(containsString(
- "Failed to write offset checkpoint file to " +
checkpointFile.getPath() + " for global stores")));
+ "Failed to write offset checkpoint file to [" +
storeCheckpointFile.getPath() + "]. " +
+ "This may occur if OS cleaned the state.dir in case when it
located in ${java.io.tmpdir} directory. " +
+ "This may also occur due to running multiple instances on the
same machine using the same state dir. " +
+ "Changing the location of state.dir may resolve the
problem.")));
}
}
- @Test
- public void shouldThrowStreamsExceptionForOldTopicPartitions() throws
IOException {
- final HashMap<TopicPartition, Long> expectedOffsets = new HashMap<>();
- expectedOffsets.put(t1, 1L);
- expectedOffsets.put(t2, 1L);
- expectedOffsets.put(t3, 1L);
- expectedOffsets.put(t4, 1L);
-
- // add an old topic (a topic not associated with any global state
store)
- final HashMap<TopicPartition, Long> startOffsets = new
HashMap<>(expectedOffsets);
- final TopicPartition tOld = new TopicPartition("oldTopic", 1);
- startOffsets.put(tOld, 1L);
-
- // start with a checkpoint file will all topic-partitions: expected
and old (not
- // associated with any global state store).
- final OffsetCheckpoint checkpoint = new
OffsetCheckpoint(checkpointFile);
- checkpoint.write(startOffsets);
-
- // initialize will throw exception
- final StreamsException e = assertThrows(StreamsException.class, () ->
stateManager.initialize());
- assertThat(e.getMessage(), equalTo("Encountered a topic-partition not
associated with any global state store"));
- }
-
- @Test
- public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
- writeCheckpoint();
- stateManager.initialize();
- assertTrue(checkpointFile.exists());
- }
-
@Test
public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets()
throws IOException {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
writeCorruptCheckpoint();
assertThrows(StreamsException.class, stateManager::initialize);
}
@Test
public void shouldInitializeStateStores() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
assertTrue(store1.initialized);
assertTrue(store2.initialized);
@@ -246,12 +235,14 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldReturnInitializedStoreNames() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
final Set<String> storeNames = stateManager.initialize();
assertEquals(Set.of(storeName1, storeName2, storeName3, storeName4,
storeName5), storeNames);
}
@Test
public void
shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
try {
@@ -264,6 +255,7 @@ public class GlobalStateManagerImplTest {
@Test
public void
shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
initializeConsumer(2, 0, t1);
stateManager.registerStore(store1, stateRestoreCallback, null);
@@ -277,8 +269,8 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
- stateManager.initialize();
try {
+ stateManager.initialize();
stateManager.registerStore(store1, stateRestoreCallback, null);
fail("Should have raised a StreamsException as there are no
partition for the store");
} catch (final StreamsException e) {
@@ -402,7 +394,8 @@ public class GlobalStateManagerImplTest {
@Test
- public void shouldFlushStateStores() {
+ public void shouldCommitStateStores() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
// register the stores
initializeConsumer(1, 0, t1);
@@ -417,6 +410,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldThrowProcessorStateStoreExceptionIfStoreCommitFailed() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
// register the stores
initializeConsumer(1, 0, t1);
@@ -431,6 +425,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldCloseStateStores() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
// register the stores
initializeConsumer(1, 0, t1);
@@ -445,6 +440,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
initializeConsumer(1, 0, t1);
stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) {
@@ -459,6 +455,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
try {
stateManager.registerStore(store1, null, null);
@@ -470,6 +467,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldNotCloseStoresIfCloseAlreadyCalled() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
initializeConsumer(1, 0, t1);
stateManager.registerStore(new NoOpReadOnlyStore<>("t1-store") {
@@ -488,8 +486,8 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldAttemptToCloseAllStoresEvenWhenSomeException() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
- initializeConsumer(1, 0, t1);
final NoOpReadOnlyStore<Object, Object> store = new
NoOpReadOnlyStore<>("t1-store") {
@Override
public void close() {
@@ -497,6 +495,7 @@ public class GlobalStateManagerImplTest {
throw new RuntimeException("KABOOM!");
}
};
+ initializeConsumer(1, 0, t1);
stateManager.registerStore(store, stateRestoreCallback, null);
initializeConsumer(1, 0, t2);
@@ -513,19 +512,28 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldCheckpointOffsets() throws IOException {
- final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1,
25L);
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+ stateManager.setGlobalProcessorContext(processorContext);
+ processorContext.setStateManger(stateManager);
+ final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1,
25_000L);
stateManager.initialize();
stateManager.updateChangelogOffsets(offsets);
- stateManager.checkpoint();
+ stateManager.flush();
- final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
- assertThat(result, equalTo(offsets));
- assertThat(stateManager.changelogOffsets(), equalTo(offsets));
+ assertThat(readOffsetsCheckpoint(storeName1), equalTo(offsets));
+ assertThat(stateManager.changelogOffsets(), equalTo(mkMap(
+ mkEntry(t1, 25_000L),
+ mkEntry(t2, 0L),
+ mkEntry(t3, 0L),
+ mkEntry(t4, 0L),
+ mkEntry(t5, 0L)
+ )));
}
@Test
public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
stateManager.initialize();
initializeConsumer(10, 0, t1);
stateManager.registerStore(store1, stateRestoreCallback, null);
@@ -534,7 +542,7 @@ public class GlobalStateManagerImplTest {
final Map<TopicPartition, Long> initialCheckpoint =
stateManager.changelogOffsets();
stateManager.updateChangelogOffsets(Collections.singletonMap(t1,
101L));
- stateManager.checkpoint();
+ stateManager.flush();
final Map<TopicPartition, Long> updatedCheckpoint =
stateManager.changelogOffsets();
assertThat(updatedCheckpoint.get(t2),
equalTo(initialCheckpoint.get(t2)));
@@ -566,10 +574,12 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
initializeConsumer(0, 0, t2, t3, t4, t5);
- initializeConsumer(10, 0, t1);
processorContext.setStateManger(stateManager);
+ stateManager.setGlobalProcessorContext(processorContext);
+
+ initializeConsumer(10, 0, t1);
stateManager.initialize();
- stateManager.checkpoint();
+ stateManager.flush();
stateManager.close();
final Map<TopicPartition, Long> checkpointMap =
stateManager.changelogOffsets();
@@ -582,26 +592,23 @@ public class GlobalStateManagerImplTest {
mkEntry(t5, 0L)
)));
- // checkpoint file only contains persistent store offsets
- assertThat(readOffsetsCheckpoint(), equalTo(mkMap(
- mkEntry(t1, 10L),
- mkEntry(t2, 0L)
- )));
+ assertThat(readOffsetsCheckpoint(storeName1),
equalTo(mkMap(mkEntry(t1, 10L))));
+ assertThat(readOffsetsCheckpoint(storeName2),
equalTo(mkMap(mkEntry(t2, 0L))));
}
@Test
public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws
IOException {
- initializeConsumer(0, 0, t1, t3, t4, t5);
+ initializeConsumer(0, 0, t1, t2, t4, t5);
initializeConsumer(10, 0, t3);
stateManager.initialize();
stateManager.close();
- assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
+ assertThat(readOffsetsCheckpoint(storeName3),
equalTo(Collections.emptyMap()));
}
- private Map<TopicPartition, Long> readOffsetsCheckpoint() throws
IOException {
+ private Map<TopicPartition, Long> readOffsetsCheckpoint(final String
storeName) throws IOException {
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new
File(stateManager.baseDir(),
-
StateManagerUtil.CHECKPOINT_FILE_NAME));
+ StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName));
return offsetCheckpoint.read();
}
@@ -1168,6 +1175,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldFailOnDeserializationErrorsWhenReprocessing() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
setUpReprocessing();
initializeConsumer(0, 0, t1, t2, t3, t4);
initializeConsumer(2, 0, t5);
@@ -1180,6 +1188,7 @@ public class GlobalStateManagerImplTest {
public void shouldSkipOnDeserializationErrorsWhenReprocessing() {
stateManager.setDeserializationExceptionHandler(new
LogAndContinueExceptionHandler());
setUpReprocessing();
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
initializeConsumer(2, 0, t5);
stateManager.initialize();