This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 599d55cdb72 KAFKA-19710: Write legacy .checkpoint on close for 
downgrade support (#21814)
599d55cdb72 is described below

commit 599d55cdb7298eeddc81f89af626fc9110bc194b
Author: Nick Telford <[email protected]>
AuthorDate: Thu Mar 19 03:02:27 2026 +0000

    KAFKA-19710: Write legacy .checkpoint on close for downgrade support 
(#21814)
    
    Kafka Streams 4.3 moved offset management from per-task `.checkpoint`
    files into RocksDB column families (KAFKA-17411/KAFKA-19712). An upgrade
    path exists (`migrateLegacyOffsets()`) to migrate offsets from old
    `.checkpoint` files into the new system, but there is no downgrade path
    — if a user rolls back to a pre-4.3 version, the old `.checkpoint` files
    no longer exist.
    
    This change adds downgrade support: when `upgrade.from` is set to a
    version older than 4.3, a consolidated per-task `.checkpoint` file is
    written during close so that an older Kafka Streams version can find its
    offsets. The conversion uses `OFFSET_UNKNOWN` for null offsets, matching
    the legacy checkpoint format. The downgrade checkpoint is written from
    `ProcessorStateManager.close()` for regular tasks and
    `GlobalStateManagerImpl.close()` for global stores.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../processor/internals/ActiveTaskCreator.java     |  7 +-
 .../internals/GlobalStateManagerImpl.java          |  6 ++
 .../processor/internals/ProcessorStateManager.java | 32 ++++++++-
 .../processor/internals/StandbyTaskCreator.java    |  7 +-
 .../internals/LegacyCheckpointingStateStore.java   | 47 ++++++++++++-
 .../internals/GlobalStateManagerImplTest.java      | 55 +++++++++++++++
 .../internals/ProcessorStateManagerTest.java       | 77 +++++++++++++++++---
 .../LegacyCheckpointingStateStoreTest.java         | 82 ++++++++++++++++++++++
 8 files changed, 298 insertions(+), 15 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index ac8ec93a9c3..1d5b7fbf7ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -135,6 +136,9 @@ class ActiveTaskCreator {
                                               final Map<TaskId, 
Set<TopicPartition>> tasksToBeCreated) {
         final List<StreamTask> createdTasks = new ArrayList<>();
 
+        final String upgradeFromStr = 
applicationConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+        final UpgradeFromValues upgradeFrom = upgradeFromStr != null ? 
UpgradeFromValues.fromString(upgradeFromStr) : null;
+
         for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions 
: tasksToBeCreated.entrySet()) {
             final TaskId taskId = newTaskAndPartitions.getKey();
             final LogContext logContext = getLogContext(taskId);
@@ -148,7 +152,8 @@ class ActiveTaskCreator {
                 logContext,
                 stateDirectory,
                 topology.storeToChangelogTopic(),
-                partitions);
+                partitions,
+                upgradeFrom);
 
             final InternalProcessorContext<Object, Object> context = new 
ProcessorContextImpl(
                 taskId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 5b08d9ae53e..30146fbb7be 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
 import org.apache.kafka.streams.errors.internals.FailedProcessingException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.CommitCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -117,6 +118,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     private final FixedOrderMap<String, Optional<StateStore>> globalStores = 
new FixedOrderMap<>();
     private final Map<String, StateStoreMetadata> storeMetadata = new 
HashMap<>();
     private final boolean eosEnabled;
+    private final UpgradeFromValues upgradeFrom;
     private InternalProcessorContext<?, ?> globalProcessorContext;
     private DeserializationExceptionHandler deserializationExceptionHandler;
     private ProcessingExceptionHandler processingExceptionHandler;
@@ -161,6 +163,8 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         final boolean globalEnabled = 
config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
         processingExceptionHandler = globalEnabled ? 
config.processingExceptionHandler() : null;
         eosEnabled = StreamsConfigUtils.eosEnabled(config);
+        final String upgradeFromStr = 
config.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+        upgradeFrom = upgradeFromStr != null ? 
UpgradeFromValues.fromString(upgradeFromStr) : null;
     }
 
     @Override
@@ -625,6 +629,8 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             }
         }
 
+        LegacyCheckpointingStateStore.maybeDowngradeOffsets(logPrefix, 
upgradeFrom, stateDirectory, null, currentOffsets);
+
         if (closeFailed.length() > 0) {
             throw new ProcessorStateException("Exceptions caught during close 
of 1 or more global state globalStores\n" + closeFailed);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 645a56a5463..db033320795 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.CommitCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -181,6 +182,7 @@ public class ProcessorStateManager implements StateManager {
 
     private final StateDirectory stateDirectory;
     private final File baseDir;
+    private final UpgradeFromValues upgradeFrom;
 
     private TaskType taskType;
     private Logger log;
@@ -203,7 +205,8 @@ public class ProcessorStateManager implements StateManager {
                                  final LogContext logContext,
                                  final StateDirectory stateDirectory,
                                  final Map<String, String> 
storeToChangelogTopic,
-                                 final Collection<TopicPartition> 
sourcePartitions) throws ProcessorStateException {
+                                 final Collection<TopicPartition> 
sourcePartitions,
+                                 final UpgradeFromValues upgradeFrom) throws 
ProcessorStateException {
         this.storeToChangelogTopic = storeToChangelogTopic;
         this.log = logContext.logger(ProcessorStateManager.class);
         this.logPrefix = logContext.logPrefix();
@@ -211,6 +214,7 @@ public class ProcessorStateManager implements StateManager {
         this.taskType = taskType;
         this.eosEnabled = eosEnabled;
         this.sourcePartitions = sourcePartitions;
+        this.upgradeFrom = upgradeFrom;
 
         this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
         this.stateDirectory = stateDirectory;
@@ -218,6 +222,21 @@ public class ProcessorStateManager implements StateManager 
{
         log.debug("Created state store manager for task {}", taskId);
     }
 
+    /**
+     * Convenience constructor that defaults {@code upgradeFrom} to {@code 
null}.
+     *
+     * @throws ProcessorStateException if the task directory does not exist 
and could not be created
+     */
+    public ProcessorStateManager(final TaskId taskId,
+                                 final TaskType taskType,
+                                 final boolean eosEnabled,
+                                 final LogContext logContext,
+                                 final StateDirectory stateDirectory,
+                                 final Map<String, String> 
storeToChangelogTopic,
+                                 final Collection<TopicPartition> 
sourcePartitions) throws ProcessorStateException {
+        this(taskId, taskType, eosEnabled, logContext, stateDirectory, 
storeToChangelogTopic, sourcePartitions, null);
+    }
+
     /**
      * Special constructor used by {@link StateDirectory} to partially 
initialize startup tasks for local state, before
      * they're assigned to a thread. When the task is assigned to a thread, 
the initialization of this StateManager is
@@ -588,12 +607,14 @@ public class ProcessorStateManager implements 
StateManager {
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
+        final Map<TopicPartition, Long> allOffsets = new HashMap<>();
         RuntimeException firstException = null;
         // attempting to close the stores, just in case they
         // are not closed by a ProcessorNode yet
         if (!stores.isEmpty()) {
             for (final Map.Entry<String, StateStoreMetadata> entry : 
stores.entrySet()) {
-                final StateStore store = entry.getValue().stateStore;
+                final StateStoreMetadata metadata = entry.getValue();
+                final StateStore store = metadata.stateStore;
                 log.trace("Closing store {}", store.name());
                 try {
                     store.close();
@@ -615,11 +636,18 @@ public class ProcessorStateManager implements 
StateManager {
                         log.error("Failed to close state store {}: ", 
store.name(), exception);
                     }
                 }
+
+                // collect offsets for potential downgrade checkpoint
+                if (metadata.changelogPartition != null && 
!metadata.corrupted) {
+                    allOffsets.put(metadata.changelogPartition, 
metadata.offset);
+                }
             }
 
             stores.clear();
         }
 
+        LegacyCheckpointingStateStore.maybeDowngradeOffsets(logPrefix, 
upgradeFrom, stateDirectory, taskId, allOffsets);
+
         if (firstException != null) {
             throw firstException;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index f04aec38f46..139efbd63de 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -68,6 +69,9 @@ class StandbyTaskCreator {
     Collection<StandbyTask> createTasks(final Map<TaskId, Set<TopicPartition>> 
tasksToBeCreated) {
         final List<StandbyTask> createdTasks = new ArrayList<>();
 
+        final String upgradeFromStr = 
applicationConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+        final UpgradeFromValues upgradeFrom = upgradeFromStr != null ? 
UpgradeFromValues.fromString(upgradeFromStr) : null;
+
         for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions 
: tasksToBeCreated.entrySet()) {
             final TaskId taskId = newTaskAndPartitions.getKey();
             final Set<TopicPartition> partitions = 
newTaskAndPartitions.getValue();
@@ -81,7 +85,8 @@ class StandbyTaskCreator {
                     getLogContext(taskId),
                     stateDirectory,
                     topology.storeToChangelogTopic(),
-                    partitions);
+                    partitions,
+                    upgradeFrom);
 
                 final InternalProcessorContext<?, ?> context = new 
ProcessorContextImpl(
                     taskId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
index bc059f03ffd..fbf99c200a1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
@@ -43,7 +44,6 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
 
     private final boolean eosEnabled;
     private final Set<TopicPartition> changelogPartitions;
-    private final StateDirectory stateDirectory;
     private final TaskId taskId;
     private final OffsetCheckpoint checkpointFile;
     private final String logPrefix;
@@ -77,6 +77,45 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
                 : store;
     }
 
+    /**
+     * Writes a consolidated per-task {@code .checkpoint} file for downgrade 
support.
+     *
+     * When {@code upgradeFrom} is set to a version older than 4.3, this 
method writes the offsets into the legacy
+     * per-task checkpoint file so that an older Kafka Streams version can 
find its offsets after a downgrade.
+     *
+     * This is a no-op if {@code upgradeFrom} is {@code null} or refers to 
version 4.3 or later.
+     *
+     * @param logPrefix Log prefix to use for log messages.
+     * @param upgradeFrom The configured {@code upgrade.from} value, or {@code 
null} if not set.
+     * @param stateDirectory The singleton {@link StateDirectory} used for 
looking up state directories.
+     * @param taskId Either the task ID for regular stores, or {@code null} 
for global stores.
+     * @param offsets The offsets to write to the checkpoint file. Entries 
with {@code null} values are written as
+     *                {@link OffsetCheckpoint#OFFSET_UNKNOWN}.
+     */
+    public static void maybeDowngradeOffsets(final String logPrefix,
+                                             final UpgradeFromValues 
upgradeFrom,
+                                             final StateDirectory 
stateDirectory,
+                                             final TaskId taskId,
+                                             final Map<TopicPartition, Long> 
offsets) {
+        if (upgradeFrom == null || upgradeFrom.ordinal() > 
UpgradeFromValues.UPGRADE_FROM_42.ordinal()) {
+            return;
+        }
+
+        final Map<TopicPartition, Long> checkpointableOffsets = new 
HashMap<>();
+        for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) 
{
+            checkpointableOffsets.put(entry.getKey(), 
checkpointableOffsetFromChangelogOffset(entry.getValue()));
+        }
+
+        final File legacyCheckpointFile = checkpointFileFor(stateDirectory, 
taskId, null);
+        final OffsetCheckpoint checkpoint = new 
OffsetCheckpoint(legacyCheckpointFile);
+        try {
+            log.debug("{}Writing downgrade checkpoint file for task {} with 
offsets {}", logPrefix, taskId, checkpointableOffsets);
+            checkpoint.write(checkpointableOffsets);
+        } catch (final IOException e) {
+            log.warn("{}Failed to write downgrade checkpoint file for task 
{}", logPrefix, taskId, e);
+        }
+    }
+
     public static void maybeMarkCorrupted(final StateStore store) {
         if (store instanceof LegacyCheckpointingStateStore<?, ?, ?>) {
             ((LegacyCheckpointingStateStore<?, ?, ?>) store).markAsCorrupted();
@@ -161,7 +200,6 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
         super(wrapped);
         this.eosEnabled = eosEnabled;
         this.changelogPartitions = changelogPartitions;
-        this.stateDirectory = stateDirectory;
         this.taskId = taskId;
         this.checkpointFile = new 
OffsetCheckpoint(checkpointFileFor(stateDirectory, taskId, this));
         this.logPrefix = logPrefix;
@@ -304,6 +342,11 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
         return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
     }
 
+    // Convert a changelog offset to the value written in the checkpoint file
+    private static long checkpointableOffsetFromChangelogOffset(final Long 
offset) {
+        return offset != null ? offset : OFFSET_UNKNOWN;
+    }
+
     // Convert the written offsets in the checkpoint file back to the 
changelog offset
     private static Long changelogOffsetFromCheckpointedOffset(final long 
offset) {
         return offset != OFFSET_UNKNOWN ? offset : null;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 700c42d7c9b..419fdbf1562 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
@@ -1197,6 +1198,60 @@ public class GlobalStateManagerImplTest {
         assertEquals(0, stateRestoreCallback.restored.size());
     }
 
+    @Test
+    public void shouldWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsPre43() 
throws IOException {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.UPGRADE_FROM_CONFIG, 
UpgradeFromValues.UPGRADE_FROM_42.toString());
+        final StreamsConfig downgradeConfig = new StreamsConfig(props);
+        final StateDirectory downgradeStateDir = new 
StateDirectory(downgradeConfig, time, true, false);
+        final GlobalStateManagerImpl downgradeManager = new 
GlobalStateManagerImpl(
+            new LogContext("test"),
+            time,
+            topology,
+            consumer,
+            downgradeStateDir,
+            stateRestoreListener,
+            downgradeConfig
+        );
+
+        final InternalMockProcessorContext downgradeContext =
+            new 
InternalMockProcessorContext(downgradeStateDir.globalStateDir(), 
downgradeConfig);
+        downgradeManager.setGlobalProcessorContext(downgradeContext);
+        downgradeContext.setStateManger(downgradeManager);
+
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+        downgradeManager.initialize();
+
+        // simulate some offsets being tracked
+        downgradeManager.updateChangelogOffsets(Collections.singletonMap(t1, 
500L));
+
+        downgradeManager.close();
+
+        // verify the legacy global checkpoint was written
+        final File legacyGlobalFile = new 
File(downgradeStateDir.globalStateDir(),
+            LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+        assertTrue(legacyGlobalFile.exists());
+        final Map<TopicPartition, Long> written = new 
OffsetCheckpoint(legacyGlobalFile).read();
+        assertEquals(500L, written.get(t1));
+    }
+
+    @Test
+    public void 
shouldNotWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsNull() {
+        initializeConsumer(0, 0, t1, t2, t3, t4, t5);
+        processorContext.setStateManger(stateManager);
+        stateManager.initialize();
+
+        stateManager.updateChangelogOffsets(Collections.singletonMap(t1, 
500L));
+        stateManager.close();
+
+        final File legacyGlobalFile = new File(stateDirectory.globalStateDir(),
+            LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+        assertFalse(legacyGlobalFile.exists());
+    }
+
     private void writeCorruptCheckpoint() throws IOException {
         final File checkpointFile = new File(stateManager.baseDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME);
         try (final OutputStream stream = 
Files.newOutputStream(checkpointFile.toPath())) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ad2f621e38b..76a10cf192a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.CommitCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -465,7 +466,7 @@ public class ProcessorStateManagerTest {
         );
         checkpoint.write(offsets);
 
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
         contextRegistersStateStore(stateMgr);
 
         try {
@@ -934,7 +935,7 @@ public class ProcessorStateManagerTest {
         );
         checkpoint.write(offsets);
 
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
 
         try {
             stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
@@ -963,7 +964,7 @@ public class ProcessorStateManagerTest {
         );
         checkpoint.write(offsets);
 
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
         contextRegistersStateStore(stateMgr);
 
         try {
@@ -977,7 +978,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldNotThrowTaskCorruptedExceptionAfterCheckpointing() {
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
         contextRegistersStateStore(stateMgr);
 
         try {
@@ -1012,7 +1013,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void 
shouldThrowIllegalStateIfInitializingOffsetsForCorruptedTasks() {
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
 
         try {
             stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
@@ -1027,7 +1028,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldBeAbleToCloseWithoutRegisteringAnyStores() {
-        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true);
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
 
         stateMgr.close();
     }
@@ -1147,12 +1148,69 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    @Test
+    public void shouldWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsPre43() 
throws IOException {
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, false, UpgradeFromValues.UPGRADE_FROM_42);
+
+        contextRegistersStateStore(stateMgr);
+        persistentStore.init(context, persistentStore);
+        stateMgr.initializeStoreOffsets(false);
+
+        // update the offset for the persistent store
+        stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 
100L));
+
+        stateMgr.close();
+
+        // verify the legacy per-task checkpoint was written
+        final File legacyFile = new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), 
LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+        assertTrue(legacyFile.exists());
+        final Map<TopicPartition, Long> written = new 
OffsetCheckpoint(legacyFile).read();
+        assertEquals(100L, written.get(persistentStorePartition));
+    }
+
+    @Test
+    public void 
shouldNotWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsNull() throws 
IOException {
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+
+        contextRegistersStateStore(stateMgr);
+        persistentStore.init(context, persistentStore);
+        stateMgr.initializeStoreOffsets(false);
+
+        stateMgr.updateChangelogOffsets(singletonMap(persistentStorePartition, 
100L));
+        stateMgr.close();
+
+        final File legacyFile = new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), 
LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+        assertFalse(legacyFile.exists());
+    }
+
+    @Test
+    public void shouldExcludeCorruptedStoresFromDowngradeCheckpoint() throws 
IOException {
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, false, UpgradeFromValues.UPGRADE_FROM_42);
+
+        contextRegistersStateStore(stateMgr);
+        persistentStore.init(context, persistentStore);
+        persistentStoreTwo.init(context, persistentStoreTwo);
+        stateMgr.initializeStoreOffsets(false);
 
+        stateMgr.updateChangelogOffsets(mkMap(
+            mkEntry(persistentStorePartition, 100L),
+            mkEntry(persistentStoreTwoPartition, 200L)
+        ));
 
+        // mark the first store as corrupted
+        
stateMgr.markChangelogAsCorrupted(singletonList(persistentStorePartition));
 
+        stateMgr.close();
 
+        final File legacyFile = new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), 
LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME);
+        assertTrue(legacyFile.exists());
+        final Map<TopicPartition, Long> written = new 
OffsetCheckpoint(legacyFile).read();
+        // only the non-corrupted store should be in the checkpoint
+        assertFalse(written.containsKey(persistentStorePartition));
+        assertEquals(200L, written.get(persistentStoreTwoPartition));
+    }
 
-    private ProcessorStateManager getStateManager(final Task.TaskType 
taskType, final boolean eosEnabled) {
+    private ProcessorStateManager getStateManager(final Task.TaskType 
taskType, final boolean eosEnabled, final UpgradeFromValues upgradeFrom) {
         return new ProcessorStateManager(
             taskId,
             taskType,
@@ -1164,11 +1222,12 @@ public class ProcessorStateManagerTest {
                 mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
                 mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
             ),
-            emptySet());
+            emptySet(),
+            upgradeFrom);
     }
 
     private ProcessorStateManager getStateManager(final Task.TaskType 
taskType) {
-        return getStateManager(taskType, false);
+        return getStateManager(taskType, false, null);
     }
 
     private void contextRegistersStateStore(final StateManager stateManager) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
index 9b80801b706..16bc126de10 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.internals.UpgradeFromValues;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
@@ -651,6 +652,87 @@ public class LegacyCheckpointingStateStoreTest {
                 LOG_PREFIX, stateDirectory, taskId, 
Collections.singletonMap(partition, throwingStore)));
     }
 
+    // =====================================================================
+    // maybeDowngradeOffsets()
+    // =====================================================================
+
+    @Test
+    public void shouldWriteDowngradeCheckpointWhenUpgradeFromIsPre43() throws 
IOException {
+        final Map<TopicPartition, Long> offsets = 
Collections.singletonMap(partition, 100L);
+
+        LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+            LOG_PREFIX, UpgradeFromValues.UPGRADE_FROM_42, stateDirectory, 
taskId, offsets);
+
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        assertTrue(legacyFile.exists());
+        final Map<TopicPartition, Long> written = new 
OffsetCheckpoint(legacyFile).read();
+        assertEquals(100L, written.get(partition));
+    }
+
+    @Test
+    public void shouldBeNoOpWhenUpgradeFromIsNull() {
+        LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+            LOG_PREFIX, null, stateDirectory, taskId, 
Collections.singletonMap(partition, 100L));
+
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        assertFalse(legacyFile.exists());
+    }
+
+    @Test
+    public void shouldWriteNullOffsetsAsOffsetUnknownInDowngradeCheckpoint() 
throws IOException {
+        final TopicPartition otherPartition = new 
TopicPartition("other-topic", 0);
+        final Map<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(partition, 100L);
+        offsets.put(otherPartition, null);
+
+        LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+            LOG_PREFIX, UpgradeFromValues.UPGRADE_FROM_42, stateDirectory, 
taskId, offsets);
+
+        final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+        final Map<TopicPartition, Long> written = new 
OffsetCheckpoint(legacyFile).read();
+        assertEquals(2, written.size());
+        assertEquals(100L, written.get(partition));
+        assertEquals(OffsetCheckpoint.OFFSET_UNKNOWN, 
written.get(otherPartition));
+    }
+
+    @Test
+    public void shouldWriteDowngradeCheckpointForGlobalStore() throws 
IOException {
+        // ensure global state dir exists
+        stateDirectory.globalStateDir().mkdirs();
+
+        final Map<TopicPartition, Long> offsets = 
Collections.singletonMap(partition, 200L);
+
+        LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+            LOG_PREFIX, UpgradeFromValues.UPGRADE_FROM_40, stateDirectory, 
null, offsets);
+
+        final File legacyGlobalFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, null, null);
+        assertTrue(legacyGlobalFile.exists());
+        final Map<TopicPartition, Long> written = new 
OffsetCheckpoint(legacyGlobalFile).read();
+        assertEquals(200L, written.get(partition));
+    }
+
+    @Test
+    public void shouldWriteDowngradeCheckpointForAllPre43Versions() throws 
IOException {
+        for (final UpgradeFromValues version : UpgradeFromValues.values()) {
+            // clean up any existing checkpoint from previous iteration
+            final File legacyFile = 
LegacyCheckpointingStateStore.checkpointFileFor(stateDirectory, taskId, null);
+            if (legacyFile.exists()) {
+                legacyFile.delete();
+            }
+
+            LegacyCheckpointingStateStore.maybeDowngradeOffsets(
+                LOG_PREFIX, version, stateDirectory, taskId, 
Collections.singletonMap(partition, 100L));
+
+            if (version.ordinal() <= 
UpgradeFromValues.UPGRADE_FROM_42.ordinal()) {
+                assertTrue(legacyFile.exists(),
+                    "Expected downgrade checkpoint for " + version);
+            } else {
+                assertFalse(legacyFile.exists(),
+                    "Expected no downgrade checkpoint for " + version);
+            }
+        }
+    }
+
     // =====================================================================
     // Helpers
     // =====================================================================


Reply via email to