This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 599d55cdb72 KAFKA-19710: Write legacy .checkpoint on close for
downgrade support (#21814)
599d55cdb72 is described below
commit 599d55cdb7298eeddc81f89af626fc9110bc194b
Author: Nick Telford <[email protected]>
AuthorDate: Thu Mar 19 03:02:27 2026 +0000
KAFKA-19710: Write legacy .checkpoint on close for downgrade support
(#21814)
Kafka Streams 4.3 moved offset management from per-task `.checkpoint`
files into RocksDB column families (KAFKA-17411/KAFKA-19712). An upgrade
path exists (`migrateLegacyOffsets()`) to migrate offsets from old
`.checkpoint` files into the new system, but there is no downgrade path
— if a user rolls back to a pre-4.3 version, the old `.checkpoint` files
no longer exist.
This change adds downgrade support: when `upgrade.from` is set to a
version older than 4.3, a consolidated per-task `.checkpoint` file is
written during close so that an older Kafka Streams version can find its
offsets. The conversion uses `OFFSET_UNKNOWN` for null offsets, matching
the legacy checkpoint format. The downgrade checkpoint is written from
`ProcessorStateManager.close()` for regular tasks and
`GlobalStateManagerImpl.close()` for global stores.
Reviewers: Bill Bejeck <[email protected]>
---
.../processor/internals/ActiveTaskCreator.java | 7 +-
.../internals/GlobalStateManagerImpl.java | 6 ++
.../processor/internals/ProcessorStateManager.java | 32 ++++++++-
.../processor/internals/StandbyTaskCreator.java | 7 +-
.../internals/LegacyCheckpointingStateStore.java | 47 ++++++++++++-
.../internals/GlobalStateManagerImplTest.java | 55 +++++++++++++++
.../internals/ProcessorStateManagerTest.java | 77 +++++++++++++++++---
.../LegacyCheckpointingStateStoreTest.java | 82 ++++++++++++++++++++++
8 files changed, 298 insertions(+), 15 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index ac8ec93a9c3..1d5b7fbf7ed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -135,6 +136,9 @@ class ActiveTaskCreator {
final Map<TaskId,
Set<TopicPartition>> tasksToBeCreated) {
final List<StreamTask> createdTasks = new ArrayList<>();
+ final String upgradeFromStr =
applicationConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+ final UpgradeFromValues upgradeFrom = upgradeFromStr != null ?
UpgradeFromValues.fromString(upgradeFromStr) : null;
+
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions
: tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
final LogContext logContext = getLogContext(taskId);
@@ -148,7 +152,8 @@ class ActiveTaskCreator {
logContext,
stateDirectory,
topology.storeToChangelogTopic(),
- partitions);
+ partitions,
+ upgradeFrom);
final InternalProcessorContext<Object, Object> context = new
ProcessorContextImpl(
taskId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 5b08d9ae53e..30146fbb7be 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -117,6 +118,7 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
private final FixedOrderMap<String, Optional<StateStore>> globalStores =
new FixedOrderMap<>();
private final Map<String, StateStoreMetadata> storeMetadata = new
HashMap<>();
private final boolean eosEnabled;
+ private final UpgradeFromValues upgradeFrom;
private InternalProcessorContext<?, ?> globalProcessorContext;
private DeserializationExceptionHandler deserializationExceptionHandler;
private ProcessingExceptionHandler processingExceptionHandler;
@@ -161,6 +163,8 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
final boolean globalEnabled =
config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
processingExceptionHandler = globalEnabled ?
config.processingExceptionHandler() : null;
eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final String upgradeFromStr =
config.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+ upgradeFrom = upgradeFromStr != null ?
UpgradeFromValues.fromString(upgradeFromStr) : null;
}
@Override
@@ -625,6 +629,8 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
}
}
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(logPrefix,
upgradeFrom, stateDirectory, null, currentOffsets);
+
if (closeFailed.length() > 0) {
throw new ProcessorStateException("Exceptions caught during close
of 1 or more global state globalStores\n" + closeFailed);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 645a56a5463..db033320795 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -181,6 +182,7 @@ public class ProcessorStateManager implements StateManager {
private final StateDirectory stateDirectory;
private final File baseDir;
+ private final UpgradeFromValues upgradeFrom;
private TaskType taskType;
private Logger log;
@@ -203,7 +205,8 @@ public class ProcessorStateManager implements StateManager {
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String>
storeToChangelogTopic,
- final Collection<TopicPartition>
sourcePartitions) throws ProcessorStateException {
+ final Collection<TopicPartition>
sourcePartitions,
+ final UpgradeFromValues upgradeFrom) throws
ProcessorStateException {
this.storeToChangelogTopic = storeToChangelogTopic;
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
@@ -211,6 +214,7 @@ public class ProcessorStateManager implements StateManager {
this.taskType = taskType;
this.eosEnabled = eosEnabled;
this.sourcePartitions = sourcePartitions;
+ this.upgradeFrom = upgradeFrom;
this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
this.stateDirectory = stateDirectory;
@@ -218,6 +222,21 @@ public class ProcessorStateManager implements StateManager
{
log.debug("Created state store manager for task {}", taskId);
}
+ /**
+ * Convenience constructor that defaults {@code upgradeFrom} to {@code
null}.
+ *
+ * @throws ProcessorStateException if the task directory does not exist
and could not be created
+ */
+ public ProcessorStateManager(final TaskId taskId,
+ final TaskType taskType,
+ final boolean eosEnabled,
+ final LogContext logContext,
+ final StateDirectory stateDirectory,
+ final Map<String, String>
storeToChangelogTopic,
+ final Collection<TopicPartition>
sourcePartitions) throws ProcessorStateException {
+ this(taskId, taskType, eosEnabled, logContext, stateDirectory,
storeToChangelogTopic, sourcePartitions, null);
+ }
+
/**
* Special constructor used by {@link StateDirectory} to partially
initialize startup tasks for local state, before
* they're assigned to a thread. When the task is assigned to a thread,
the initialization of this StateManager is
@@ -588,12 +607,14 @@ public class ProcessorStateManager implements
StateManager {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state
stores: {}", stores);
+ final Map<TopicPartition, Long> allOffsets = new HashMap<>();
RuntimeException firstException = null;
// attempting to close the stores, just in case they
// are not closed by a ProcessorNode yet
if (!stores.isEmpty()) {
for (final Map.Entry<String, StateStoreMetadata> entry :
stores.entrySet()) {
- final StateStore store = entry.getValue().stateStore;
+ final StateStoreMetadata metadata = entry.getValue();
+ final StateStore store = metadata.stateStore;
log.trace("Closing store {}", store.name());
try {
store.close();
@@ -615,11 +636,18 @@ public class ProcessorStateManager implements
StateManager {
log.error("Failed to close state store {}: ",
store.name(), exception);
}
}
+
+ // collect offsets for potential downgrade checkpoint
+ if (metadata.changelogPartition != null &&
!metadata.corrupted) {
+ allOffsets.put(metadata.changelogPartition,
metadata.offset);
+ }
}
stores.clear();
}
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(logPrefix,
upgradeFrom, stateDirectory, taskId, allOffsets);
+
if (firstException != null) {
throw firstException;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index f04aec38f46..139efbd63de 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -68,6 +69,9 @@ class StandbyTaskCreator {
Collection<StandbyTask> createTasks(final Map<TaskId, Set<TopicPartition>>
tasksToBeCreated) {
final List<StandbyTask> createdTasks = new ArrayList<>();
+ final String upgradeFromStr =
applicationConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+ final UpgradeFromValues upgradeFrom = upgradeFromStr != null ?
UpgradeFromValues.fromString(upgradeFromStr) : null;
+
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions
: tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions =
newTaskAndPartitions.getValue();
@@ -81,7 +85,8 @@ class StandbyTaskCreator {
getLogContext(taskId),
stateDirectory,
topology.storeToChangelogTopic(),
- partitions);
+ partitions,
+ upgradeFrom);
final InternalProcessorContext<?, ?> context = new
ProcessorContextImpl(
taskId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
index bc059f03ffd..fbf99c200a1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
@@ -43,7 +44,6 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
private final boolean eosEnabled;
private final Set<TopicPartition> changelogPartitions;
- private final StateDirectory stateDirectory;
private final TaskId taskId;
private final OffsetCheckpoint checkpointFile;
private final String logPrefix;
@@ -77,6 +77,45 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
: store;
}
+ /**
+ * Writes a consolidated per-task {@code .checkpoint} file for downgrade
support.
+ *
+ * When {@code upgradeFrom} is set to a version older than 4.3, this
method writes the offsets into the legacy
+ * per-task checkpoint file so that an older Kafka Streams version can
find its offsets after a downgrade.
+ *
+ * This is a no-op if {@code upgradeFrom} is {@code null} or refers to
version 4.3 or later.
+ *
+ * @param logPrefix Log prefix to use for log messages.
+ * @param upgradeFrom The configured {@code upgrade.from} value, or {@code
null} if not set.
+ * @param stateDirectory The singleton {@link StateDirectory} used for
looking up state directories.
+ * @param taskId Either the task ID for regular stores, or {@code null}
for global stores.
+ * @param offsets The offsets to write to the checkpoint file. Entries
with {@code null} values are written as
+ * {@link OffsetCheckpoint#OFFSET_UNKNOWN}.
+ */
+ public static void maybeDowngradeOffsets(final String logPrefix,
+ final UpgradeFromValues
upgradeFrom,
+ final StateDirectory
stateDirectory,
+ final TaskId taskId,
+ final Map<TopicPartition, Long>
offsets) {
+ if (upgradeFrom == null || upgradeFrom.ordinal() >
UpgradeFromValues.UPGRADE_FROM_42.ordinal()) {
+ return;
+ }
+
+ final Map<TopicPartition, Long> checkpointableOffsets = new
HashMap<>();
+ for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
{
+ checkpointableOffsets.put(entry.getKey(),
checkpointableOffsetFromChangelogOffset(entry.getValue()));
+ }
+
+ final File legacyCheckpointFile = checkpointFileFor(stateDirectory,
taskId, null);
+ final OffsetCheckpoint checkpoint = new
OffsetCheckpoint(legacyCheckpointFile);
+ try {
+ log.debug("{}Writing downgrade checkpoint file for task {} with
offsets {}", logPrefix, taskId, checkpointableOffsets);
+ checkpoint.write(checkpointableOffsets);
+ } catch (final IOException e) {
+ log.warn("{}Failed to write downgrade checkpoint file for task
{}", logPrefix, taskId, e);
+ }
+ }
+
public static void maybeMarkCorrupted(final StateStore store) {
if (store instanceof LegacyCheckpointingStateStore<?, ?, ?>) {
((LegacyCheckpointingStateStore<?, ?, ?>) store).markAsCorrupted();
@@ -161,7 +200,6 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
super(wrapped);
this.eosEnabled = eosEnabled;
this.changelogPartitions = changelogPartitions;
- this.stateDirectory = stateDirectory;
this.taskId = taskId;
this.checkpointFile = new
OffsetCheckpoint(checkpointFileFor(stateDirectory, taskId, this));
this.logPrefix = logPrefix;
@@ -304,6 +342,11 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
}
+ // Convert a changelog offset to the value written in the checkpoint file
+ private static long checkpointableOffsetFromChangelogOffset(final Long
offset) {
+ return offset != null ? offset : OFFSET_UNKNOWN;
+ }
+
// Convert the written offsets in the checkpoint file back to the
changelog offset
private static Long changelogOffsetFromCheckpointedOffset(final long
offset) {
return offset != OFFSET_UNKNOWN ? offset : null;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 700c42d7c9b..419fdbf1562 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
@@ -1197,6 +1198,60 @@ public class GlobalStateManagerImplTest {
assertEquals(0, stateRestoreCallback.restored.size());
}
+ @Test
+ public void shouldWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsPre43()
throws IOException {
+ final Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ props.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ props.put(StreamsConfig.UPGRADE_FROM_CONFIG,
UpgradeFromValues.UPGRADE_FROM_42.toString());
+ final StreamsConfig downgradeConfig = new StreamsConfig(props);
+ final StateDirectory downgradeStateDir = new
StateDirectory(downgradeConfig, time, true, false);
+ final GlobalStateManagerImpl downgradeManager = new
GlobalStateManagerImpl(
+ new LogContext("test"),
+ time,
+ topology,
+ consumer,
+ downgradeStateDir,
+ stateRestoreListener,
+ downgradeConfig
+ );
+
+ final InternalMockProcessorContext downgradeContext =
+ new
InternalMockProcessorContext(downgradeStateDir.globalStateDir(),
downgradeConfig);
+ downgradeManager.setGlobalProcessorContext(downgradeContext);
+ downgradeContext.setStateManger(downgradeManager);
+
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+ downgradeManager.initialize();
+
+ // simulate some offsets being tracked
+ downgradeManager.updateChangelogOffsets(Collections.singletonMap(t1,
500L));
+
+ downgradeManager.close();
+
+ // verify the legacy global checkpoint was written
+ final File legacyGlobalFile = new
File(downgradeStateDir.globalStateDir(),
+ LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+ assertTrue(legacyGlobalFile.exists());
+ final Map<TopicPartition, Long> written = new
OffsetCheckpoint(legacyGlobalFile).read();
+ assertEquals(500L, written.get(t1));
+ }
+
+ @Test
+ public void
shouldNotWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsNull() {
+ initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+ processorContext.setStateManger(stateManager);
+ stateManager.initialize();
+
+ stateManager.updateChangelogOffsets(Collections.singletonMap(t1,
500L));
+ stateManager.close();
+
+ final File legacyGlobalFile = new File(stateDirectory.globalStateDir(),
+ LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+ assertFalse(legacyGlobalFile.exists());
+ }
+
private void writeCorruptCheckpoint() throws IOException {
final File checkpointFile = new File(stateManager.baseDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME);
try (final OutputStream stream =
Files.newOutputStream(checkpointFile.toPath())) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ad2f621e38b..76a10cf192a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -465,7 +466,7 @@ public class ProcessorStateManagerTest {
);
checkpoint.write(offsets);
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
contextRegistersStateStore(stateMgr);
try {
@@ -934,7 +935,7 @@ public class ProcessorStateManagerTest {
);
checkpoint.write(offsets);
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
try {
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
@@ -963,7 +964,7 @@ public class ProcessorStateManagerTest {
);
checkpoint.write(offsets);
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
contextRegistersStateStore(stateMgr);
try {
@@ -977,7 +978,7 @@ public class ProcessorStateManagerTest {
@Test
public void shouldNotThrowTaskCorruptedExceptionAfterCheckpointing() {
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
contextRegistersStateStore(stateMgr);
try {
@@ -1012,7 +1013,7 @@ public class ProcessorStateManagerTest {
@Test
public void
shouldThrowIllegalStateIfInitializingOffsetsForCorruptedTasks() {
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
try {
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
@@ -1027,7 +1028,7 @@ public class ProcessorStateManagerTest {
@Test
public void shouldBeAbleToCloseWithoutRegisteringAnyStores() {
- final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true);
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, true, null);
stateMgr.close();
}
@@ -1147,12 +1148,69 @@ public class ProcessorStateManagerTest {
}
}
+ @Test
+ public void shouldWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsPre43()
throws IOException {
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, false, UpgradeFromValues.UPGRADE_FROM_42);
+
+ contextRegistersStateStore(stateMgr);
+ persistentStore.init(context, persistentStore);
+ stateMgr.initializeStoreOffsets(false);
+
+ // update the offset for the persistent store
+ stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition,
100L));
+
+ stateMgr.close();
+
+ // verify the legacy per-task checkpoint was written
+ final File legacyFile = new
File(stateDirectory.getOrCreateDirectoryForTask(taskId),
LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+ assertTrue(legacyFile.exists());
+ final Map<TopicPartition, Long> written = new
OffsetCheckpoint(legacyFile).read();
+ assertEquals(100L, written.get(persistentStorePartition));
+ }
+
+ @Test
+ public void
shouldNotWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsNull() throws
IOException {
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
+
+ contextRegistersStateStore(stateMgr);
+ persistentStore.init(context, persistentStore);
+ stateMgr.initializeStoreOffsets(false);
+
+ stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition,
100L));
+ stateMgr.close();
+
+ final File legacyFile = new
File(stateDirectory.getOrCreateDirectoryForTask(taskId),
LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+ assertFalse(legacyFile.exists());
+ }
+
+ @Test
+ public void shouldExcludeCorruptedStoresFromDowngradeCheckpoint() throws
IOException {
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE, false, UpgradeFromValues.UPGRADE_FROM_42);
+
+ contextRegistersStateStore(stateMgr);
+ persistentStore.init(context, persistentStore);
+ persistentStoreTwo.init(context, persistentStoreTwo);
+ stateMgr.initializeStoreOffsets(false);
+ stateMgr.updateChangelogOffsets(mkMap(
+ mkEntry(persistentStorePartition, 100L),
+ mkEntry(persistentStoreTwoPartition, 200L)
+ ));
+ // mark the first store as corrupted
+
stateMgr.markChangelogAsCorrupted(singletonList(persistentStorePartition));
+ stateMgr.close();
+ final File legacyFile = new
File(stateDirectory.getOrCreateDirectoryForTask(taskId),
LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+ assertTrue(legacyFile.exists());
+ final Map<TopicPartition, Long> written = new
OffsetCheckpoint(legacyFile).read();
+ // only the non-corrupted store should be in the checkpoint
+ assertFalse(written.containsKey(persistentStorePartition));
+ assertEquals(200L, written.get(persistentStoreTwoPartition));
+ }
- private ProcessorStateManager getStateManager(final Task.TaskType
taskType, final boolean eosEnabled) {
+ private ProcessorStateManager getStateManager(final Task.TaskType
taskType, final boolean eosEnabled, final UpgradeFromValues upgradeFrom) {
return new ProcessorStateManager(
taskId,
taskType,
@@ -1164,11 +1222,12 @@ public class ProcessorStateManagerTest {
mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
),
- emptySet());
+ emptySet(),
+ upgradeFrom);
}
private ProcessorStateManager getStateManager(final Task.TaskType
taskType) {
- return getStateManager(taskType, false);
+ return getStateManager(taskType, false, null);
}
private void contextRegistersStateStore(final StateManager stateManager) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
index 9b80801b706..16bc126de10 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
@@ -651,6 +652,87 @@ public class LegacyCheckpointingStateStoreTest {
LOG_PREFIX, stateDirectory, taskId,
Collections.singletonMap(partition, throwingStore)));
}
+ // =====================================================================
+ // maybeDowngradeOffsets()
+ // =====================================================================
+
+ @Test
+ public void shouldWriteDowngradeCheckpointWhenUpgradeFromIsPre43() throws
IOException {
+ final Map<TopicPartition, Long> offsets =
Collections.singletonMap(partition, 100L);
+
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+ LOG_PREFIX, UpgradeFromValues.UPGRADE_FROM_42, stateDirectory,
taskId, offsets);
+
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ assertTrue(legacyFile.exists());
+ final Map<TopicPartition, Long> written = new
OffsetCheckpoint(legacyFile).read();
+ assertEquals(100L, written.get(partition));
+ }
+
+ @Test
+ public void shouldBeNoOpWhenUpgradeFromIsNull() {
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+ LOG_PREFIX, null, stateDirectory, taskId,
Collections.singletonMap(partition, 100L));
+
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ assertFalse(legacyFile.exists());
+ }
+
+ @Test
+ public void shouldWriteNullOffsetsAsOffsetUnknownInDowngradeCheckpoint()
throws IOException {
+ final TopicPartition otherPartition = new
TopicPartition("other-topic", 0);
+ final Map<TopicPartition, Long> offsets = new HashMap<>();
+ offsets.put(partition, 100L);
+ offsets.put(otherPartition, null);
+
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+ LOG_PREFIX, UpgradeFromValues.UPGRADE_FROM_42, stateDirectory,
taskId, offsets);
+
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ final Map<TopicPartition, Long> written = new
OffsetCheckpoint(legacyFile).read();
+ assertEquals(2, written.size());
+ assertEquals(100L, written.get(partition));
+ assertEquals(OffsetCheckpoint.OFFSET_UNKNOWN,
written.get(otherPartition));
+ }
+
+ @Test
+ public void shouldWriteDowngradeCheckpointForGlobalStore() throws
IOException {
+ // ensure global state dir exists
+ stateDirectory.globalStateDir().mkdirs();
+
+ final Map<TopicPartition, Long> offsets =
Collections.singletonMap(partition, 200L);
+
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+ LOG_PREFIX, UpgradeFromValues.UPGRADE_FROM_40, stateDirectory,
null, offsets);
+
+ final File legacyGlobalFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, null);
+ assertTrue(legacyGlobalFile.exists());
+ final Map<TopicPartition, Long> written = new
OffsetCheckpoint(legacyGlobalFile).read();
+ assertEquals(200L, written.get(partition));
+ }
+
+ @Test
+ public void shouldWriteDowngradeCheckpointForAllPre43Versions() throws
IOException {
+ for (final UpgradeFromValues version : UpgradeFromValues.values()) {
+ // clean up any existing checkpoint from previous iteration
+ final File legacyFile =
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+ if (legacyFile.exists()) {
+ legacyFile.delete();
+ }
+
+ LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+ LOG_PREFIX, version, stateDirectory, taskId,
Collections.singletonMap(partition, 100L));
+
+ if (version.ordinal() <=
UpgradeFromValues.UPGRADE_FROM_42.ordinal()) {
+ assertTrue(legacyFile.exists(),
+ "Expected downgrade checkpoint for " + version);
+ } else {
+ assertFalse(legacyFile.exists(),
+ "Expected no downgrade checkpoint for " + version);
+ }
+ }
+ }
+
// =====================================================================
// Helpers
// =====================================================================