This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5740a26525a KAFKA-19712: ProcessorStateManager delegates offset
tracking to stores (#21738)
5740a26525a is described below
commit 5740a26525a27df9eacd847a6d4ed6eb23fda0dc
Author: Nick Telford <[email protected]>
AuthorDate: Mon Mar 16 17:15:55 2026 +0000
KAFKA-19712: ProcessorStateManager delegates offset tracking to stores
(#21738)
As part of KIP-1035, we want to transition away from task-specific
`.checkpoint` files, and instead delegate offset management to
`StateStore`s.
We now have a `LegacyCheckpointingStateStore` wrapper to encapsulate the
management of offsets for `StateStore` implementations that do not know
how to manage their own offsets (i.e. for which `managesOffsets() ==
false`).
As of KAFKA-20212, `RocksDBStore` now knows how to manage its own
offsets, so it will not be wrapped in a `LegacyCheckpointingStateStore`;
only user-defined persistent stores will use this wrapper.
Corresponding changes to `GlobalStateManagerImpl` will be submitted
independently, as KAFKA-20257.
Until both `ProcessorStateManager` and `GlobalStateManagerImpl` have
been updated, the `StateManager` interface must remain as-is. Therefore,
the `flush` and `checkpoint` methods will not be consolidated until a
later PR, which will clean up the interface and its usage by `Task` and
friends.
Reviewers: Bill Bejeck <[email protected]>
---------
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 2 +-
.../processor/internals/ProcessorStateManager.java | 238 +++++++++------------
.../processor/internals/StateDirectory.java | 13 +-
.../processor/internals/StateManagerUtil.java | 2 +-
.../streams/processor/internals/StreamTask.java | 9 -
.../processor/internals/ActiveTaskCreatorTest.java | 2 -
.../internals/ProcessorStateManagerTest.java | 201 ++++++++---------
.../processor/internals/StateManagerUtilTest.java | 2 +-
.../processor/internals/TaskManagerTest.java | 2 -
9 files changed, 193 insertions(+), 278 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 2d7f2b2e1fa..ca09905fdab 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -927,7 +927,7 @@ public class EosIntegrationTest {
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
task00.toString(),
- ".checkpoint"
+ ".checkpoint_" + stateStoreName
).toFile();
assertTrue(checkpointFile.exists());
final Map<TopicPartition, Long> checkpoints = new
OffsetCheckpoint(checkpointFile).read();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 86725805ce5..6a7eb8c25cf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.CachedStateStore;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
@@ -51,10 +51,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.String.format;
-import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
import static
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
-import static
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
/**
* ProcessorStateManager is the source of truth for the current offset for
each state store,
@@ -154,7 +152,12 @@ public class ProcessorStateManager implements StateManager
{
}
StateStore store() {
- return this.stateStore;
+ return
LegacyCheckpointingStateStore.maybeUnwrapStore(this.stateStore);
+ }
+
+ void markCorrupted() {
+ this.corrupted = true;
+ LegacyCheckpointingStateStore.maybeMarkCorrupted(this.stateStore);
}
@Override
@@ -178,7 +181,6 @@ public class ProcessorStateManager implements StateManager {
private final StateDirectory stateDirectory;
private final File baseDir;
- private final OffsetCheckpoint checkpointFile;
private TaskType taskType;
private Logger log;
@@ -211,7 +213,6 @@ public class ProcessorStateManager implements StateManager {
this.sourcePartitions = sourcePartitions;
this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
- this.checkpointFile = new
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
this.stateDirectory = stateDirectory;
log.debug("Created state store manager for task {}", taskId);
@@ -233,12 +234,26 @@ public class ProcessorStateManager implements
StateManager {
void registerStateStores(final List<StateStore> allStores, final
InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
+ final Map<TopicPartition, StateStore> storesToMigrate = new
HashMap<>(stores.size());
for (final StateStore store : allStores) {
if (!stores.containsKey(store.name())) {
- store.init(processorContext, store);
+ if (isLoggingEnabled(store.name())) {
+ final TopicPartition changelogPartition =
getStorePartition(store.name());
+ final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
+ store, eosEnabled, Set.of(changelogPartition),
stateDirectory, taskId, logPrefix);
+ maybeWrappedStore.init(processorContext,
maybeWrappedStore);
+ storesToMigrate.put(changelogPartition, maybeWrappedStore);
+ } else {
+ final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
+ store, eosEnabled, Set.of(), stateDirectory,
taskId, logPrefix);
+ maybeWrappedStore.init(processorContext,
maybeWrappedStore);
+ }
}
log.trace("Registered state store {}", store.name());
}
+
+ // migrate offsets from the legacy checkpoint file into the stores
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix,
stateDirectory, taskId, storesToMigrate);
}
void registerGlobalStateStores(final List<StateStore> stateStores) {
@@ -254,68 +269,52 @@ public class ProcessorStateManager implements
StateManager {
}
// package-private for test only
- void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
- try {
- final Map<TopicPartition, Long> loadedCheckpoints =
checkpointFile.read();
-
- log.trace("Loaded offsets from the checkpoint file: {}",
loadedCheckpoints);
-
- for (final StateStoreMetadata store : stores.values()) {
- if (store.corrupted) {
- log.error("Tried to initialize store offsets for corrupted
store {}", store);
- throw new IllegalStateException("Should not initialize
offsets for a corrupted task");
- }
+ void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+ for (final StateStoreMetadata store : stores.values()) {
+ if (store.corrupted) {
+ log.error("Tried to initialize store offsets for corrupted
store {}", store);
+ throw new ProcessorStateException(
+ "Error initializing offsets for store '" + store + "'",
+ new IllegalStateException("Should not initialize
offsets for a corrupted task")
+ );
+ }
- if (store.changelogPartition == null) {
- log.info("State store {} is not logged and hence would not
be restored", store.stateStore.name());
- } else if (!store.stateStore.persistent()) {
- log.info("Initializing to the starting offset for
changelog {} of in-memory state store {}",
- store.changelogPartition,
store.stateStore.name());
- } else if (store.offset() == null) {
- if
(loadedCheckpoints.containsKey(store.changelogPartition)) {
- final Long offset =
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
- store.setOffset(offset);
-
- log.info("State store {} initialized from checkpoint
with offset {} at changelog {}",
- store.stateStore.name(), store.offset,
store.changelogPartition);
- } else {
- // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
- // and hence we are uncertain that the current local
state only contains committed data;
- // in that case we need to treat it as a
task-corrupted exception
- if (eosEnabled && !storeDirIsEmpty) {
- log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
+ if (store.changelogPartition == null) {
+ log.info("State store {} is not logged and hence would not be
restored", store.stateStore.name());
+ } else if (!store.stateStore.persistent()) {
+ log.info("Initializing to the starting offset for changelog {}
of in-memory state store {}",
+ store.changelogPartition, store.stateStore.name());
+ } else if (store.offset() == null) {
+ final Long offset =
store.stateStore.committedOffset(store.changelogPartition);
+
+ if (offset != null) {
+ store.setOffset(offset);
+ log.info("State store {} initialized from checkpoint with
offset {} at changelog {}",
+ store.stateStore.name(), store.offset,
store.changelogPartition);
+ } else {
+ // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
+ // and hence we are uncertain that the current local state
only contains committed data;
+ // in that case we need to treat it as a task-corrupted
exception
+ if (eosEnabled && !storeDirIsEmpty) {
+ log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
"since under EOS it has the risk of getting
uncommitted data in stores we have to " +
"treat it as a task corruption error and wipe
out the local state of task {} " +
"before re-bootstrapping",
store.stateStore.name(), taskId);
- throw new
TaskCorruptedException(Collections.singleton(taskId));
- } else {
- log.info("State store {} did not find checkpoint
offset, hence would " +
- "default to the starting offset at changelog
{}",
+ throw new
TaskCorruptedException(Collections.singleton(taskId));
+ } else {
+ log.info("State store {} did not find checkpoint
offset, hence would " +
+ "default to the starting offset at
changelog {}",
store.stateStore.name(),
store.changelogPartition);
- }
}
- } else {
- loadedCheckpoints.remove(store.changelogPartition);
- log.debug("Skipping re-initialization of offset from
checkpoint for recycled store {}",
- store.stateStore.name());
}
}
+ }
+ try {
stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
-
- if (!loadedCheckpoints.isEmpty()) {
- log.warn("Some loaded checkpoint offsets cannot find their
corresponding state stores: {}", loadedCheckpoints);
- }
-
- if (eosEnabled) {
- checkpointFile.delete();
- }
- } catch (final TaskCorruptedException e) {
- throw e;
- } catch (final IOException | RuntimeException e) {
- // both IOException or runtime exception like number parsing can
throw
- throw new ProcessorStateException(format("%sError loading and
deleting checkpoint file when creating the state manager",
+ } catch (final RuntimeException e) {
+ throw new ProcessorStateException(format("%sError updating state
directory offsets when creating the state manager",
logPrefix), e);
}
}
@@ -333,7 +332,7 @@ public class ProcessorStateManager implements StateManager {
// TODO (KAFKA-12887): we should not trigger user's exception handler
for illegal-argument but always
// fail-crash; in this case we would not need to immediately close the
state store before throwing
- if (CHECKPOINT_FILE_NAME.equals(storeName)) {
+ if
(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME.startsWith(storeName)) {
store.close();
throw new IllegalArgumentException(format("%sIllegal store name:
%s, which collides with the pre-defined " +
"checkpoint file name", logPrefix, storeName));
@@ -368,7 +367,7 @@ public class ProcessorStateManager implements StateManager {
@Override
public StateStore store(final String name) {
if (stores.containsKey(name)) {
- return stores.get(name).stateStore;
+ return stores.get(name).store();
} else {
return null;
}
@@ -382,7 +381,7 @@ public class ProcessorStateManager implements StateManager {
final Collection<TopicPartition> partitionsToMarkAsCorrupted = new
LinkedList<>(partitions);
for (final StateStoreMetadata storeMetadata : stores.values()) {
if
(partitionsToMarkAsCorrupted.contains(storeMetadata.changelogPartition)) {
- storeMetadata.corrupted = true;
+ storeMetadata.markCorrupted();
partitionsToMarkAsCorrupted.remove(storeMetadata.changelogPartition);
}
}
@@ -488,26 +487,44 @@ public class ProcessorStateManager implements
StateManager {
if (!stores.isEmpty()) {
log.debug("Committing all stores registered in the state manager:
{}", stores);
for (final StateStoreMetadata metadata : stores.values()) {
- final StateStore store = metadata.stateStore;
- log.trace("Committing store {}", store.name());
- try {
- store.commit(Map.of());
- } catch (final RuntimeException exception) {
- if (firstException == null) {
- // do NOT wrap the error if it is actually caused by
Streams itself
- // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
- if (exception instanceof FailedProcessingException)
- firstException = new ProcessorStateException(
- format("%sFailed to commit state store %s",
logPrefix, store.name()),
- exception.getCause());
- else if (exception instanceof StreamsException)
- firstException = exception;
- else
- firstException = new ProcessorStateException(
- format("%sFailed to commit state store %s",
logPrefix, store.name()), exception);
- log.error("Failed to commit state store {}: ",
store.name(), firstException);
- } else {
- log.error("Failed to commit state store {}: ",
store.name(), exception);
+ if (!metadata.corrupted) {
+ final StateStore store = metadata.stateStore;
+ log.trace("Committing store {}", store.name());
+ try {
+ if (metadata.changelogPartition == null ||
metadata.offset == null || !store.persistent()) {
+ store.commit(Map.of());
+ } else {
+ store.commit(Map.of(metadata.changelogPartition,
metadata.offset));
+ }
+
+ if (metadata.commitCallback != null) {
+ try {
+ metadata.commitCallback.onCommit();
+ } catch (final IOException e) {
+ throw new ProcessorStateException(
+ format("%sException caught while
trying to checkpoint store, " +
+ "changelog partition %s",
logPrefix, metadata.changelogPartition),
+ e
+ );
+ }
+ }
+ } catch (final RuntimeException exception) {
+ if (firstException == null) {
+ // do NOT wrap the error if it is actually caused
by Streams itself
+ // In case of FailedProcessingException Do not
keep the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
+ firstException = new ProcessorStateException(
+ format("%sFailed to commit state store
%s", logPrefix, store.name()),
+ exception.getCause());
+ else if (exception instanceof StreamsException)
+ firstException = exception;
+ else
+ firstException = new ProcessorStateException(
+ format("%sFailed to commit state store
%s", logPrefix, store.name()), exception);
+ log.error("Failed to commit state store {}: ",
store.name(), firstException);
+ } else {
+ log.error("Failed to commit state store {}: ",
store.name(), exception);
+ }
}
}
}
@@ -518,6 +535,10 @@ public class ProcessorStateManager implements StateManager
{
}
}
+ @Override
+ public void checkpoint() {
+ }
+
public void flushCache() {
RuntimeException firstException = null;
// attempting to flush the stores
@@ -658,45 +679,6 @@ public class ProcessorStateManager implements StateManager
{
stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
}
- @Override
- public void checkpoint() {
- // checkpoint those stores that are only logged and persistent to the
checkpoint file
- final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
- for (final StateStoreMetadata storeMetadata : stores.values()) {
- if (storeMetadata.commitCallback != null &&
!storeMetadata.corrupted) {
- try {
- storeMetadata.commitCallback.onCommit();
- } catch (final IOException e) {
- throw new ProcessorStateException(
- format("%sException caught while trying to
checkpoint store, " +
- "changelog partition %s", logPrefix,
storeMetadata.changelogPartition),
- e
- );
- }
- }
-
- // store is logged, persistent, not corrupted, and has a valid
current offset
- if (storeMetadata.changelogPartition != null &&
- storeMetadata.stateStore.persistent() &&
- !storeMetadata.corrupted) {
-
- final long checkpointableOffset =
checkpointableOffsetFromChangelogOffset(storeMetadata.offset);
- checkpointingOffsets.put(storeMetadata.changelogPartition,
checkpointableOffset);
- }
- }
-
- log.debug("Writing checkpoint: {} for task {}", checkpointingOffsets,
taskId);
- try {
- checkpointFile.write(checkpointingOffsets);
- } catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to [{}]." +
- " This may occur if OS cleaned the state.dir in case when it
located in ${java.io.tmpdir} directory." +
- " This may also occur due to running multiple instances on the
same machine using the same state dir." +
- " Changing the location of state.dir may resolve the problem.",
- checkpointFile, e);
- }
- }
-
private TopicPartition getStorePartition(final String storeName) {
// NOTE we assume the partition of the topic can always be inferred
from the task id;
// if user ever use a custom partition grouper (deprecated in KIP-528)
this would break and
@@ -723,16 +705,6 @@ public class ProcessorStateManager implements StateManager
{
return found.isEmpty() ? null : found.get(0);
}
- // Pass in a sentinel value to checkpoint when the changelog offset is not
yet initialized/known
- private long checkpointableOffsetFromChangelogOffset(final Long offset) {
- return offset != null ? offset : OFFSET_UNKNOWN;
- }
-
- // Convert the written offsets in the checkpoint file back to the
changelog offset
- private Long changelogOffsetFromCheckpointedOffset(final long offset) {
- return offset != OFFSET_UNKNOWN ? offset : null;
- }
-
public TopicPartition registeredChangelogPartitionFor(final String
storeName) {
final StateStoreMetadata storeMetadata = stores.get(storeName);
if (storeMetadata == null) {
@@ -753,10 +725,4 @@ public class ProcessorStateManager implements StateManager
{
public String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
-
- public void deleteCheckPointFileIfEOSEnabled() throws IOException {
- if (eosEnabled) {
- checkpointFile.delete();
- }
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index f66f177c0ef..906678cd350 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -77,7 +78,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
/**
@@ -263,7 +263,7 @@ public class StateDirectory implements AutoCloseable {
try {
// We only handle TaskCorruptedException at this
point. Any other exception is considered fatal.
StateManagerUtil.registerStateStores(log,
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
- temporaryStateManager.checkpoint();
+ temporaryStateManager.flush();
} catch (final TaskCorruptedException tce) {
// At this point, we only log a warning and continue
with the startup store initialization.
// The task-corrupted exception will be handled in the
first Task assignment phase.
@@ -426,13 +426,6 @@ public class StateDirectory implements AutoCloseable {
return "__" + topologyName + "__";
}
- /**
- * @return The File handle for the checkpoint in the given task's directory
- */
- File checkpointFileFor(final TaskId taskId) {
- return new File(getOrCreateDirectoryForTask(taskId),
StateManagerUtil.CHECKPOINT_FILE_NAME);
- }
-
/**
* Decide if the directory of the task is empty or not
*/
@@ -444,7 +437,7 @@ public class StateDirectory implements AutoCloseable {
private boolean taskDirIsEmpty(final File taskDir) {
final File[] storeDirs = taskDir.listFiles(pathname ->
- !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+
!pathname.getName().startsWith(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME));
boolean taskDirEmpty = true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 3d86b5ed15b..6a02fba3cde 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -147,7 +147,7 @@ final class StateManagerUtil {
// We should only load checkpoint AFTER the corresponding state
directory lock has been acquired and
// the state stores have been registered; we should not try to load at
the state manager construction time.
// See https://issues.apache.org/jira/browse/KAFKA-8574
- stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
+ stateMgr.initializeStoreOffsets(storeDirsEmpty);
log.debug("Initialized state stores");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 85c322f2b5f..6547d8c41eb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -51,7 +51,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
-import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
@@ -404,14 +403,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// just transit the state without any logical changes:
suspended and restoring states
// are not actually any different for inner modules
- // Deleting checkpoint file before transition to RESTORING
state (KAFKA-10362)
- try {
- stateMgr.deleteCheckPointFileIfEOSEnabled();
- log.debug("Deleted check point file upon resuming with EOS
enabled");
- } catch (final IOException ioe) {
- log.error("Encountered error while deleting the checkpoint
file due to this exception", ioe);
- }
-
transitionTo(State.RESTORING);
log.info("Resumed to restoring state");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index bc9449cebce..11f857820da 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -257,9 +257,7 @@ public class ActiveTaskCreatorTest {
when(builder.buildSubtopology(0)).thenReturn(topology);
when(topology.sinkTopics()).thenReturn(emptySet());
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
-
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
-
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
when(topology.source("topic")).thenReturn(sourceNode);
when(sourceNode.timestampExtractor()).thenReturn(mock(TimestampExtractor.class));
when(topology.sources()).thenReturn(Collections.singleton(sourceNode));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 110b1446fc1..02e45d313ea 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateS
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.test.MockCachedKeyValueStore;
@@ -49,6 +50,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@@ -86,6 +88,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -382,6 +385,36 @@ public class ProcessorStateManagerTest {
}
}
+ @Test
+ public void shouldRegisterPersistentUnloggedStore() {
+ final String unloggedStoreName = "unloggedPersistentStore";
+ final MockKeyValueStore unloggedStore = new
MockKeyValueStore(unloggedStoreName, true);
+
+ // Create a state manager with no changelog map entries for the store
— it is persistent
+ // but unlogged (no changelog partition). Registration must not throw
NullPointerException
+ // from Set.of(null) when changelogPartition is absent.
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ Task.TaskType.ACTIVE,
+ false,
+ logContext,
+ stateDirectory,
+ emptyMap(),
+ emptySet()
+ );
+ contextRegistersStateStore(stateMgr);
+
+ try {
+ stateMgr.registerStateStores(singletonList(unloggedStore),
context);
+
+ assertTrue(unloggedStore.initialized);
+ assertThat(stateMgr.store(unloggedStoreName), is(unloggedStore));
+ assertTrue(stateMgr.changelogPartitions().isEmpty());
+ } finally {
+ stateMgr.close();
+ }
+ }
+
@Test
public void shouldInitializeOffsetsFromCheckpointFile() throws IOException
{
final long checkpointOffset = 10L;
@@ -394,14 +427,12 @@ public class ProcessorStateManagerTest {
checkpoint.write(offsets);
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
+ contextRegistersStateStore(stateMgr);
try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.registerStore(persistentStoreTwo,
persistentStoreTwo.stateRestoreCallback, null);
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
persistentStoreTwo, nonPersistentStore), context);
+ stateMgr.initializeStoreOffsets(true);
- assertTrue(checkpointFile.exists());
assertEquals(Set.of(
persistentStorePartition,
persistentStoreTwoPartition,
@@ -435,12 +466,11 @@ public class ProcessorStateManagerTest {
checkpoint.write(offsets);
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ contextRegistersStateStore(stateMgr);
try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.registerStore(persistentStoreTwo,
persistentStoreTwo.stateRestoreCallback, null);
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
persistentStoreTwo, nonPersistentStore), context);
+ stateMgr.initializeStoreOffsets(true);
assertFalse(checkpointFile.exists());
assertEquals(Set.of(
@@ -518,22 +548,24 @@ public class ProcessorStateManagerTest {
}
@Test
- public void shouldFlushCheckpointAndClose() throws IOException {
+ public void shouldCommitAndCloseLegacyStores() throws IOException {
checkpoint.write(emptyMap());
+ final File storeCheckpointFile = new
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME +
"_" + persistentStore.name());
+
// set up ack'ed offsets
final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
- ackedOffsets.put(persistentStorePartition, 123L);
+ ackedOffsets.put(persistentStorePartition, 25_000L);
ackedOffsets.put(nonPersistentStorePartition, 456L);
ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L);
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
+ contextRegistersStateStore(stateMgr);
try {
// make sure the checkpoint file is not written yet
- assertFalse(checkpointFile.exists());
+ assertFalse(storeCheckpointFile.exists());
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
} finally {
stateMgr.flush();
@@ -544,13 +576,14 @@ public class ProcessorStateManagerTest {
assertThat(persistentStore.getLastCommitCount(),
Matchers.lessThan(nonPersistentStore.getLastCommitCount()));
stateMgr.updateChangelogOffsets(ackedOffsets);
- stateMgr.checkpoint();
+ stateMgr.flush();
- assertTrue(checkpointFile.exists());
+ assertTrue(storeCheckpointFile.exists());
// the checkpoint file should contain an offset from the
persistent store only.
- final Map<TopicPartition, Long> checkpointedOffsets =
checkpoint.read();
- assertThat(checkpointedOffsets, is(singletonMap(new
TopicPartition(persistentStoreTopicName, 1), 123L)));
+ final OffsetCheckpoint storeCheckpoint = new
OffsetCheckpoint(storeCheckpointFile);
+ final Map<TopicPartition, Long> checkpointedOffsets =
storeCheckpoint.read();
+ assertThat(checkpointedOffsets, is(singletonMap(new
TopicPartition(persistentStoreTopicName, 1), 25_000L)));
stateMgr.close();
@@ -565,9 +598,10 @@ public class ProcessorStateManagerTest {
checkpoint.write(offsets);
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
+ contextRegistersStateStore(stateMgr);
try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+ stateMgr.registerStateStores(singletonList(persistentStore),
context);
+ stateMgr.initializeStoreOffsets(true);
final StateStoreMetadata storeMetadata =
stateMgr.storeMetadata(persistentStorePartition);
assertThat(storeMetadata, notNullValue());
@@ -582,7 +616,7 @@ public class ProcessorStateManagerTest {
mkEntry(persistentStorePartition, 220L),
mkEntry(irrelevantPartition, 9000L)
));
- stateMgr.checkpoint();
+ stateMgr.flush();
assertThat(stateMgr.storeMetadata(irrelevantPartition),
equalTo(null));
assertThat(storeMetadata.offset(), equalTo(220L));
@@ -591,49 +625,6 @@ public class ProcessorStateManagerTest {
}
}
- @Test
- public void shouldWriteCheckpointForPersistentStore() throws IOException {
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
-
- try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
-
- final StateStoreMetadata storeMetadata =
stateMgr.storeMetadata(persistentStorePartition);
- assertThat(storeMetadata, notNullValue());
-
- stateMgr.restore(storeMetadata, singletonList(consumerRecord),
OptionalLong.of(2L));
-
- stateMgr.checkpoint();
-
- final Map<TopicPartition, Long> read = checkpoint.read();
- assertThat(read, equalTo(singletonMap(persistentStorePartition,
100L)));
- } finally {
- stateMgr.close();
- }
- }
-
- @Test
- public void shouldNotWriteCheckpointForNonPersistentStore() throws
IOException {
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
-
- try {
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
-
- final StateStoreMetadata storeMetadata =
stateMgr.storeMetadata(nonPersistentStorePartition);
- assertThat(storeMetadata, notNullValue());
-
-
stateMgr.updateChangelogOffsets(singletonMap(nonPersistentStorePartition,
876L));
- stateMgr.checkpoint();
-
- final Map<TopicPartition, Long> read = checkpoint.read();
- assertThat(read, equalTo(emptyMap()));
- } finally {
- stateMgr.close();
- }
- }
-
@Test
public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic()
throws IOException {
final ProcessorStateManager stateMgr = new ProcessorStateManager(
@@ -649,7 +640,7 @@ public class ProcessorStateManagerTest {
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 987L));
- stateMgr.checkpoint();
+ stateMgr.flush();
final Map<TopicPartition, Long> read = checkpoint.read();
assertThat(read, equalTo(emptyMap()));
@@ -809,18 +800,19 @@ public class ProcessorStateManagerTest {
@Test
public void shouldLogAWarningIfCheckpointThrowsAnIOException() {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
+ contextRegistersStateStore(stateMgr);
+
stateMgr.registerStateStores(Collections.singletonList(persistentStore),
context);
stateDirectory.clean();
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(ProcessorStateManager.class)) {
-
stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 10L));
- stateMgr.checkpoint();
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) {
+
stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition,
25_000L));
+ stateMgr.flush();
boolean foundExpectedLogMessage = false;
for (final LogCaptureAppender.Event event : appender.getEvents()) {
if ("WARN".equals(event.getLevel())
&&
event.getMessage().startsWith("process-state-manager-test Failed to write
offset checkpoint file to [")
- && event.getMessage().endsWith(".checkpoint]." +
+ && event.getMessage().endsWith(".checkpoint_" +
persistentStoreName + "]." +
" This may occur if OS cleaned the state.dir in case
when it located in ${java.io.tmpdir} directory." +
" This may also occur due to running multiple
instances on the same machine using the same state dir." +
" Changing the location of state.dir may resolve the
problem.")
@@ -838,7 +830,6 @@ public class ProcessorStateManagerTest {
public void shouldThrowIfLoadCheckpointThrows() throws Exception {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
final File file = new File(stateMgr.baseDir(), CHECKPOINT_FILE_NAME);
Files.createFile(file.toPath());
final FileWriter writer = new FileWriter(file);
@@ -846,7 +837,8 @@ public class ProcessorStateManagerTest {
writer.close();
try {
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+
stateMgr.registerStateStores(Collections.singletonList(persistentStore),
context);
+ stateMgr.initializeStoreOffsets(true);
fail("should have thrown processor state exception when IO
exception happens");
} catch (final ProcessorStateException e) {
// pass
@@ -950,7 +942,7 @@ public class ProcessorStateManagerTest {
stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
final TaskCorruptedException exception =
assertThrows(TaskCorruptedException.class,
- () -> stateMgr.initializeStoreOffsetsFromCheckpoint(false));
+ () -> stateMgr.initializeStoreOffsets(false));
assertEquals(
Collections.singleton(taskId),
@@ -972,12 +964,12 @@ public class ProcessorStateManagerTest {
checkpoint.write(offsets);
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ contextRegistersStateStore(stateMgr);
try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
- stateMgr.initializeStoreOffsetsFromCheckpoint(false);
+ stateMgr.initializeStoreOffsets(false);
} finally {
stateMgr.close();
}
@@ -986,11 +978,11 @@ public class ProcessorStateManagerTest {
@Test
public void shouldNotThrowTaskCorruptedExceptionAfterCheckpointing() {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ contextRegistersStateStore(stateMgr);
try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
- stateMgr.initializeStoreOffsetsFromCheckpoint(true);
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
+ stateMgr.initializeStoreOffsets(true);
assertThat(stateMgr.storeMetadata(nonPersistentStorePartition),
notNullValue());
assertThat(stateMgr.storeMetadata(persistentStorePartition),
notNullValue());
@@ -999,18 +991,18 @@ public class ProcessorStateManagerTest {
mkEntry(nonPersistentStorePartition, 876L),
mkEntry(persistentStorePartition, 666L))
);
- stateMgr.checkpoint();
+ stateMgr.flush();
// reset the state and offsets, for example as in a corrupted task
stateMgr.close();
assertNull(stateMgr.storeMetadata(nonPersistentStorePartition));
assertNull(stateMgr.storeMetadata(persistentStorePartition));
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
- stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
+ contextRegistersStateStore(stateMgr);
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
// This should not throw a TaskCorruptedException!
- stateMgr.initializeStoreOffsetsFromCheckpoint(false);
+ stateMgr.initializeStoreOffsets(false);
assertThat(stateMgr.storeMetadata(nonPersistentStorePartition),
notNullValue());
assertThat(stateMgr.storeMetadata(persistentStorePartition),
notNullValue());
} finally {
@@ -1026,7 +1018,7 @@ public class ProcessorStateManagerTest {
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
stateMgr.markChangelogAsCorrupted(Set.of(persistentStorePartition));
- final ProcessorStateException thrown =
assertThrows(ProcessorStateException.class, () ->
stateMgr.initializeStoreOffsetsFromCheckpoint(true));
+ final ProcessorStateException thrown =
assertThrows(ProcessorStateException.class, () ->
stateMgr.initializeStoreOffsets(true));
assertInstanceOf(IllegalStateException.class, thrown.getCause());
} finally {
stateMgr.close();
@@ -1040,36 +1032,6 @@ public class ProcessorStateManagerTest {
stateMgr.close();
}
- @Test
- public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
- final long checkpointOffset = 10L;
- final Map<TopicPartition, Long> offsets = mkMap(
- mkEntry(persistentStorePartition, checkpointOffset),
- mkEntry(nonPersistentStorePartition, checkpointOffset),
- mkEntry(irrelevantPartition, 999L)
- );
- checkpoint.write(offsets);
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
- stateMgr.deleteCheckPointFileIfEOSEnabled();
- stateMgr.close();
- assertFalse(checkpointFile.exists());
- }
-
- @Test
- public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws
IOException {
- final long checkpointOffset = 10L;
- final Map<TopicPartition, Long> offsets = mkMap(
- mkEntry(persistentStorePartition, checkpointOffset),
- mkEntry(nonPersistentStorePartition, checkpointOffset),
- mkEntry(irrelevantPartition, 999L)
- );
- checkpoint.write(offsets);
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, false);
- stateMgr.deleteCheckPointFileIfEOSEnabled();
- stateMgr.close();
- assertTrue(checkpointFile.exists());
- }
-
@Test
public void shouldWritePositionCheckpointFile() throws IOException {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
@@ -1088,7 +1050,7 @@ public class ProcessorStateManagerTest {
assertFalse(persistentCheckpoint.getFile().exists());
- stateMgr.checkpoint();
+ stateMgr.flush();
assertTrue(persistentCheckpoint.getFile().exists());
@@ -1124,7 +1086,7 @@ public class ProcessorStateManagerTest {
final ProcessorStateException processorStateException = assertThrows(
ProcessorStateException.class,
- stateMgr::checkpoint
+ stateMgr::flush
);
assertThat(
@@ -1209,6 +1171,13 @@ public class ProcessorStateManagerTest {
return getStateManager(taskType, false);
}
+ private void contextRegistersStateStore(final StateManager stateManager) {
+ Mockito.doAnswer(a -> {
+ stateManager.registerStore(a.getArgument(0), a.getArgument(1), ()
-> { });
+ return null;
+ }).when(context).register(any(), any());
+ }
+
private MockKeyValueStore getConverterStore() {
return new ConverterStore(persistentStoreName, true);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 1ee6594d119..98ec0f1a64b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -110,7 +110,7 @@ public class StateManagerUtilTest {
topology, stateManager, stateDirectory, processorContext);
inOrder.verify(stateManager).registerStateStores(stateStores,
processorContext);
-
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ inOrder.verify(stateManager).initializeStoreOffsets(true);
verifyNoMoreInteractions(stateManager);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8f9d228c273..cc3cd12b918 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -117,7 +117,6 @@ import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -4992,7 +4991,6 @@ public class TaskManagerTest {
final Path checkpointFilePath = checkpointFile.toPath();
Files.createFile(checkpointFilePath);
new OffsetCheckpoint(checkpointFile).write(offsets);
-
lenient().when(stateDirectory.checkpointFileFor(task)).thenReturn(checkpointFile);
expectDirectoryNotEmpty(task);
}