This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6c83a29a5e3 KAFKA-20489: Add enable.transactional.statestores (#22141)
6c83a29a5e3 is described below

commit 6c83a29a5e39dda753166042eb9ad5ea658c3fa2
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 15 22:19:18 2026 +0100

    KAFKA-20489: Add enable.transactional.statestores (#22141)
    
    Introduces the `enable.transactional.statestores` config (default
    false). When enabled, uncommitted writes are held in an in-memory buffer
    per store and are not flushed to the underlying base store until the
    Kafka transaction commits, making staged writes invisible to IQ reads at
    the committed isolation level until the containing commit completes.
    
    The config is threaded through TopologyConfig, TaskConfig, task
    creators, and ProcessorStateManager. When transactional stores are
    active, the EOS state wipe on unclean shutdown is suppressed: since
    uncommitted data never reaches the base store, there is nothing to wipe;
    corruption is handled explicitly via markChangelogAsCorrupted.
    
    StreamTask.postCommit is extended to flush the pending write buffer on
    every commit interval, not only on task revocation or close. Under EOS
    the normal commit-interval path previously skipped maybeCheckpoint, the
    only path that calls stateMgr.commit() to flush each store's buffer.
    Without this, READ_COMMITTED IQ readers see no new data mid-run, and the
    uncommitted buffer grows unbounded between task lifecycle events.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 13 +++++
 .../org/apache/kafka/streams/TopologyConfig.java   | 12 ++++-
 .../processor/internals/ActiveTaskCreator.java     |  1 +
 .../processor/internals/ProcessorStateManager.java | 14 +++--
 .../streams/processor/internals/StandbyTask.java   |  3 ++
 .../processor/internals/StandbyTaskCreator.java    |  1 +
 .../processor/internals/StateManagerUtil.java      |  8 ++-
 .../streams/processor/internals/StreamTask.java    |  5 +-
 .../streams/processor/internals/TaskManager.java   |  1 +
 .../apache/kafka/streams/StreamsConfigTest.java    | 13 +++++
 .../internals/ProcessorStateManagerTest.java       | 62 ++++++++++++++++++++++
 .../processor/internals/StandbyTaskTest.java       | 23 ++++++++
 .../processor/internals/StateManagerUtilTest.java  | 47 +++++++++++++---
 .../processor/internals/StreamTaskTest.java        | 45 +++++++++++++++-
 .../processor/internals/TaskManagerTest.java       | 10 ++--
 .../StreamThreadStateStoreProviderTest.java        |  1 +
 .../apache/kafka/streams/TopologyTestDriver.java   |  1 +
 17 files changed, 240 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a1c06e15221..b56b97407ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -575,6 +575,14 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code enable.metrics.push} */
     @SuppressWarnings("WeakerAccess")
     public static  final String ENABLE_METRICS_PUSH_CONFIG = 
CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
+
+    /** {@code enable.transactional.statestores} */
+    public static final String TRANSACTIONAL_STATE_STORES_CONFIG = 
"enable.transactional.statestores";
+    private static final String TRANSACTIONAL_STATE_STORES_DOC = "Whether to 
enable transactional state stores. " +
+            "When enabled, state stores will buffer writes in a transaction 
buffer (if supported by the state store implementation), " +
+            "before committing them when the corresponding Kafka changelog 
transaction has committed. \n" +
+            "Under EOS, state stores will no longer be wiped on-error and 
rebuilt from scratch. " +
+            "In the event of an error (under either EOS or ALOS), only the 
writes since the last successful commit will be lost and replayed through the 
topology.";
     @Deprecated
     public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable 
pushing of internal client metrics for (main, restore, and global) consumers, 
producers, and admin clients." +
         " The cluster must have a client metrics subscription which 
corresponds to a client.";
@@ -1117,6 +1125,11 @@ public class StreamsConfig extends AbstractConfig {
                     true,
                     Importance.LOW,
                     ENABLE_METRICS_PUSH_DOC)
+            .define(TRANSACTIONAL_STATE_STORES_CONFIG,
+                    Type.BOOLEAN,
+                    false,
+                    Importance.LOW,
+                    TRANSACTIONAL_STATE_STORES_DOC)
             .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
                     Type.INT,
                     null,
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index fd76f07686a..45c0653c84b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -195,6 +195,7 @@ public final class TopologyConfig extends AbstractConfig {
     public final Supplier<ProcessingExceptionHandler> 
processingExceptionHandlerSupplier;
 
     public final boolean ensureExplicitInternalResourceNaming;
+    public final boolean transactionalStateStoresEnabled;
 
     public TopologyConfig(final StreamsConfig configs) {
         this(null, configs, mkObjectProperties(configs.originals()));
@@ -306,6 +307,9 @@ public final class TopologyConfig extends AbstractConfig {
         }
 
         ensureExplicitInternalResourceNaming = 
globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
+        this.transactionalStateStoresEnabled = Boolean.parseBoolean(
+            String.valueOf(globalAppConfigs.originals()
+                .getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, 
"false")));
     }
 
     @Deprecated
@@ -349,7 +353,8 @@ public final class TopologyConfig extends AbstractConfig {
             timestampExtractorSupplier.get(),
             deserializationExceptionHandlerSupplier.get(),
             processingExceptionHandlerSupplier.get(),
-            eosEnabled
+            eosEnabled,
+            transactionalStateStoresEnabled
         );
     }
 
@@ -361,6 +366,7 @@ public final class TopologyConfig extends AbstractConfig {
         public final DeserializationExceptionHandler 
deserializationExceptionHandler;
         public final ProcessingExceptionHandler processingExceptionHandler;
         public final boolean eosEnabled;
+        public final boolean transactionalStateStoresEnabled;
 
         private TaskConfig(final long maxTaskIdleMs,
                            final long taskTimeoutMs,
@@ -368,7 +374,8 @@ public final class TopologyConfig extends AbstractConfig {
                            final TimestampExtractor timestampExtractor,
                            final DeserializationExceptionHandler 
deserializationExceptionHandler,
                            final ProcessingExceptionHandler 
processingExceptionHandler,
-                           final boolean eosEnabled) {
+                           final boolean eosEnabled,
+                           final boolean transactionalStateStoresEnabled) {
             this.maxTaskIdleMs = maxTaskIdleMs;
             this.taskTimeoutMs = taskTimeoutMs;
             this.maxBufferedSize = maxBufferedSize;
@@ -376,6 +383,7 @@ public final class TopologyConfig extends AbstractConfig {
             this.deserializationExceptionHandler = 
deserializationExceptionHandler;
             this.processingExceptionHandler = processingExceptionHandler;
             this.eosEnabled = eosEnabled;
+            this.transactionalStateStoresEnabled = 
transactionalStateStoresEnabled;
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 96590df0793..5e1a69d29e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -149,6 +149,7 @@ class ActiveTaskCreator {
                 taskId,
                 Task.TaskType.ACTIVE,
                 eosEnabled(applicationConfig),
+                
applicationConfig.getBoolean(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG),
                 logContext,
                 stateDirectory,
                 topology.storeToChangelogTopic(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f25d5268751..d6a5a2fea3c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -196,6 +196,7 @@ public class ProcessorStateManager implements StateManager {
 
     private final TaskId taskId;
     private final boolean eosEnabled;
+    private final boolean transactionalStateStoresEnabled;
     private final Collection<TopicPartition> sourcePartitions;
     private final Map<String, String> storeToChangelogTopic;
 
@@ -225,6 +226,7 @@ public class ProcessorStateManager implements StateManager {
     public ProcessorStateManager(final TaskId taskId,
                                  final TaskType taskType,
                                  final boolean eosEnabled,
+                                 final boolean transactionalStateStoresEnabled,
                                  final LogContext logContext,
                                  final StateDirectory stateDirectory,
                                  final Map<String, String> 
storeToChangelogTopic,
@@ -236,6 +238,7 @@ public class ProcessorStateManager implements StateManager {
         this.taskId = taskId;
         this.taskType = taskType;
         this.eosEnabled = eosEnabled;
+        this.transactionalStateStoresEnabled = transactionalStateStoresEnabled;
         this.sourcePartitions = sourcePartitions;
         this.upgradeFrom = upgradeFrom;
 
@@ -253,11 +256,12 @@ public class ProcessorStateManager implements 
StateManager {
     public ProcessorStateManager(final TaskId taskId,
                                  final TaskType taskType,
                                  final boolean eosEnabled,
+                                 final boolean transactionalStateStoresEnabled,
                                  final LogContext logContext,
                                  final StateDirectory stateDirectory,
                                  final Map<String, String> 
storeToChangelogTopic,
                                  final Collection<TopicPartition> 
sourcePartitions) throws ProcessorStateException {
-        this(taskId, taskType, eosEnabled, logContext, stateDirectory, 
storeToChangelogTopic, sourcePartitions, null);
+        this(taskId, taskType, eosEnabled, transactionalStateStoresEnabled, 
logContext, stateDirectory, storeToChangelogTopic, sourcePartitions, null);
     }
 
     /**
@@ -271,7 +275,7 @@ public class ProcessorStateManager implements StateManager {
                                                                final 
StateDirectory stateDirectory,
                                                                final 
Map<String, String> storeToChangelogTopic,
                                                                final 
Set<TopicPartition> sourcePartitions) {
-        return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, 
logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
+        return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, 
false, logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
     }
 
     void registerStateStores(final List<StateStore> allStores, final 
InternalProcessorContext<?, ?> processorContext) {
@@ -337,7 +341,7 @@ public class ProcessorStateManager implements StateManager {
                     // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
                     // and hence we are uncertain that the current local state 
only contains committed data;
                     // in that case we need to treat it as a task-corrupted 
exception
-                    if (eosEnabled && !storeDirIsEmpty) {
+                    if (eosEnabled && !storeDirIsEmpty && 
!transactionalStateStoresEnabled) {
                         log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
                                 "since under EOS it has the risk of getting 
uncommitted data in stores we have to " +
                                 "treat it as a task corruption error and wipe 
out the local state of task {} " +
@@ -419,6 +423,10 @@ public class ProcessorStateManager implements StateManager 
{
         return Collections.unmodifiableSet(changelogOffsets().keySet());
     }
 
+    boolean hasCorruptedStores() {
+        return stores.values().stream().anyMatch(m -> m.corrupted);
+    }
+
     void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) 
{
         final Collection<TopicPartition> partitionsToMarkAsCorrupted = new 
LinkedList<>(partitions);
         for (final StateStoreMetadata storeMetadata : stores.values()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 46015532ba5..1b8f23dae98 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -45,6 +45,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
  */
 public class StandbyTask extends AbstractTask implements Task {
     private final boolean eosEnabled;
+    private final boolean transactionalStateStoresEnabled;
     private final Sensor closeTaskSensor;
     private final Sensor updateSensor;
     private final StreamsMetricsImpl streamsMetrics;
@@ -86,6 +87,7 @@ public class StandbyTask extends AbstractTask implements Task 
{
         closeTaskSensor = 
ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
         updateSensor = 
TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), 
streamsMetrics);
         this.eosEnabled = config.eosEnabled;
+        this.transactionalStateStoresEnabled = 
config.transactionalStateStoresEnabled;
     }
 
     @Override
@@ -271,6 +273,7 @@ public class StandbyTask extends AbstractTask implements 
Task {
                         logPrefix,
                         clean,
                         eosEnabled,
+                        transactionalStateStoresEnabled,
                         stateMgr,
                         stateDirectory,
                         TaskType.STANDBY
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index e990d588fdd..9083ff054e2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -82,6 +82,7 @@ class StandbyTaskCreator {
                     taskId,
                     Task.TaskType.STANDBY,
                     eosEnabled(applicationConfig),
+                    
applicationConfig.getBoolean(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG),
                     getLogContext(taskId),
                     stateDirectory,
                     topology.storeToChangelogTopic(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 6a02fba3cde..42c0da1c519 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -158,11 +158,15 @@ final class StateManagerUtil {
                                   final String logPrefix,
                                   final boolean closeClean,
                                   final boolean eosEnabled,
+                                  final boolean 
transactionalStateStoresEnabled,
                                   final ProcessorStateManager stateMgr,
                                   final StateDirectory stateDirectory,
                                   final TaskType taskType) {
-        // if EOS is enabled, wipe out the whole state store for unclean close 
since it is now invalid
-        final boolean wipeStateStore = !closeClean && eosEnabled;
+        // if EOS is enabled, wipe out the whole state store for unclean close 
since it is now invalid.
+        // With transactional state stores, uncommitted data is never written 
to the base store,
+        // so wiping is only needed when stores have been marked as corrupted 
(e.g. InvalidOffsetException).
+        final boolean wipeStateStore = !closeClean && eosEnabled
+            && (!transactionalStateStoresEnabled || 
stateMgr.hasCorruptedStores());
 
         final TaskId id = stateMgr.taskId();
         log.trace("Closing state manager for {} task {}", taskType, id);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 495098f2a06..3b65e677674 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -80,6 +80,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     // there's still an optimization that requires this info to be
     // leaked into this class, which is to checkpoint after committing if EOS 
is not enabled.
     private final boolean eosEnabled;
+    private final boolean transactionalStateStoresEnabled;
 
     private final int maxBufferedSize;
     private final AbstractPartitionGroup partitionGroup;
@@ -155,6 +156,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         this.time = time;
         this.recordCollector = recordCollector;
         this.eosEnabled = config.eosEnabled;
+        this.transactionalStateStoresEnabled = 
config.transactionalStateStoresEnabled;
 
         final String threadId = Thread.currentThread().getName();
         this.streamsMetrics = streamsMetrics;
@@ -526,7 +528,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 break;
 
             case RUNNING:
-                if (enforceCheckpoint || !eosEnabled) {
+                if (enforceCheckpoint || !eosEnabled || 
transactionalStateStoresEnabled) {
                     maybeCheckpoint();
                 }
                 log.debug("Finalized commit for {} task with eos {} enforce 
checkpoint {}", state(), eosEnabled, enforceCheckpoint);
@@ -670,6 +672,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                         logPrefix,
                         clean,
                         eosEnabled,
+                        transactionalStateStoresEnabled,
                         stateMgr,
                         stateDirectory,
                         TaskType.ACTIVE
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e8a3442ec13..337e70492c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -280,6 +280,7 @@ public class TaskManager {
 
                     // we need to enforce a checkpoint that removes the 
corrupted partitions
                     if (markAsCorrupted) {
+                        
task.markChangelogAsCorrupted(task.changelogPartitions());
                         task.postCommit(true);
                     }
                 } catch (final RuntimeException swallow) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index fa54bb6c78c..c8f358d9fe6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -83,6 +83,7 @@ import static 
org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFF
 import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
@@ -1913,6 +1914,18 @@ public class StreamsConfigTest {
         }
     }
 
+    @Test
+    public void shouldDisableTransactionalStateStoresByDefault() {
+        
assertFalse(streamsConfig.getBoolean(TRANSACTIONAL_STATE_STORES_CONFIG));
+    }
+
+    @Test
+    public void shouldEnableTransactionalStateStoresWhenConfigured() {
+        props.put(TRANSACTIONAL_STATE_STORES_CONFIG, true);
+        streamsConfig = new StreamsConfig(props);
+        
assertTrue(streamsConfig.getBoolean(TRANSACTIONAL_STATE_STORES_CONFIG));
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean 
isKey) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 63e7e7c2e23..3e697138ad4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -203,6 +203,7 @@ public class ProcessorStateManagerTest {
             taskId,
             Task.TaskType.STANDBY,
             false,
+            false,
             logContext,
             stateDirectory,
             mkMap(
@@ -223,6 +224,7 @@ public class ProcessorStateManagerTest {
             taskId,
             Task.TaskType.STANDBY,
             false,
+            false,
             logContext,
             stateDirectory,
             mkMap(
@@ -398,6 +400,7 @@ public class ProcessorStateManagerTest {
             taskId,
             Task.TaskType.ACTIVE,
             false,
+            false,
             logContext,
             stateDirectory,
             emptyMap(),
@@ -683,6 +686,7 @@ public class ProcessorStateManagerTest {
             taskId,
             Task.TaskType.STANDBY,
             false,
+            false,
             logContext,
             stateDirectory,
             emptyMap(),
@@ -1261,11 +1265,69 @@ public class ProcessorStateManagerTest {
         assertEquals(200L, written.get(persistentStoreTwoPartition));
     }
 
+    @Test
+    public void shouldReportHasCorruptedStores() throws IOException {
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, null);
+        try {
+            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
+            assertFalse(stateMgr.hasCorruptedStores());
+            
stateMgr.markChangelogAsCorrupted(Collections.singleton(persistentStorePartition));
+            assertTrue(stateMgr.hasCorruptedStores());
+        } finally {
+            stateMgr.close();
+        }
+    }
+
+    @Test
+    public void 
shouldNotThrowTaskCorruptedWithoutCheckpointAndNonEmptyDirWhenTransactional() 
throws IOException {
+        // With transactional state stores + EOS, a missing checkpoint on a 
non-empty store dir should NOT
+        // be treated as corruption — uncommitted data is never written to the 
base store.
+        final long checkpointOffset = 10L;
+
+        final Map<TopicPartition, Long> offsets = mkMap(
+            mkEntry(persistentStorePartition, checkpointOffset),
+            mkEntry(nonPersistentStorePartition, checkpointOffset),
+            mkEntry(irrelevantPartition, 999L)
+        );
+        checkpoint.write(offsets);
+
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE, true, true, null);
+
+        try {
+            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
+            stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback, null);
+            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
+
+            // should not throw TaskCorruptedException
+            stateMgr.initializeStoreOffsets(false);
+        } finally {
+            stateMgr.close();
+        }
+    }
+
+    private ProcessorStateManager getStateManager(final Task.TaskType 
taskType, final boolean eosEnabled, final boolean 
transactionalStateStoresEnabled, final UpgradeFromValues upgradeFrom) {
+        return new ProcessorStateManager(
+            taskId,
+            taskType,
+            eosEnabled,
+            transactionalStateStoresEnabled,
+            logContext,
+            stateDirectory,
+            mkMap(
+                mkEntry(persistentStoreName, persistentStoreTopicName),
+                mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
+                mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
+            ),
+            emptySet(),
+            upgradeFrom);
+    }
+
     private ProcessorStateManager getStateManager(final Task.TaskType 
taskType, final boolean eosEnabled, final UpgradeFromValues upgradeFrom) {
         return new ProcessorStateManager(
             taskId,
             taskType,
             eosEnabled,
+            false,
             logContext,
             stateDirectory,
             mkMap(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8cd65e7aa2d..84d1f218cfc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -390,6 +390,29 @@ public class StandbyTaskTest {
         assertEquals(Task.State.CLOSED, task.state());
     }
 
+    @Test
+    public void 
shouldNotWipeStateDirOnDirtyCloseWithEosAndTransactionalStateStores() {
+        doNothing().when(stateManager).close();
+        when(stateManager.hasCorruptedStores()).thenReturn(false);
+
+        config = new StreamsConfig(mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2),
+            mkEntry(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, "true")
+        )));
+
+        task = createStandbyTask();
+
+        task.suspend();
+        task.closeDirty();
+
+        assertEquals(Task.State.CLOSED, task.state());
+        // With transactional state stores, the state dir should NOT be wiped 
on dirty close
+        // unless stores are specifically marked as corrupted.
+        verify(stateManager, never()).baseDir();
+    }
+
     @Test
     public void shouldPrepareRecycleSuspendedTask() {
         task = createStandbyTask();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 415f6dc52a1..65b779247ee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -121,7 +121,7 @@ public class StateManagerUtilTest {
         when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
-            "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
+            "logPrefix:", true, false, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
         inOrder.verify(stateManager).close();
         inOrder.verify(stateDirectory).unlock(taskId);
@@ -136,7 +136,7 @@ public class StateManagerUtilTest {
 
         final ProcessorStateException thrown = assertThrows(
             ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE));
+                "logPrefix:", true, false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
@@ -154,7 +154,7 @@ public class StateManagerUtilTest {
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
-                logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
+                logger, "logPrefix:", false, false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
         verify(stateDirectory).unlock(taskId);
     }
@@ -168,7 +168,7 @@ public class StateManagerUtilTest {
         
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
-            "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
+            "logPrefix:", false, true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
         inOrder.verify(stateManager).close();
         inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
@@ -187,7 +187,7 @@ public class StateManagerUtilTest {
 
         try (MockedStatic<Utils> ignored = mockStatic(Utils.class)) {
             assertThrows(ProcessorStateException.class, () ->
-                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, false, stateManager, stateDirectory, TaskType.ACTIVE));
         }
 
         verify(stateDirectory).unlock(taskId);
@@ -206,7 +206,7 @@ public class StateManagerUtilTest {
 
             final ProcessorStateException thrown = assertThrows(
                     ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
-                            "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE));
+                            "logPrefix:", false, true, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
             assertEquals(IOException.class, thrown.getCause().getClass());
         }
@@ -224,7 +224,7 @@ public class StateManagerUtilTest {
         when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-                logger, "logPrefix:", true, false, stateManager, 
stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, false, stateManager, 
stateDirectory, TaskType.ACTIVE);
 
         inOrder.verify(stateManager).taskId();
         inOrder.verify(stateDirectory).lock(taskId);
@@ -241,7 +241,7 @@ public class StateManagerUtilTest {
         when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-                logger, "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", false, true, false, stateManager, 
stateDirectory, TaskType.ACTIVE);
 
         inOrder.verify(stateManager).taskId();
         inOrder.verify(stateDirectory).lock(taskId);
@@ -250,4 +250,35 @@ public class StateManagerUtilTest {
         verify(stateDirectory, never()).unlock(taskId);
         verifyNoMoreInteractions(stateManager, stateDirectory);
     }
+
+    @Test
+    public void 
testCloseStateManagerTransactionalDoesNotWipeWhenNoCorruptedStores() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.hasCorruptedStores()).thenReturn(false);
+
+        StateManagerUtil.closeStateManager(logger,
+            "logPrefix:", false, true, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
+
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verify(stateDirectory, never()).removeTaskOffsets(taskId);
+    }
+
+    @Test
+    public void 
testCloseStateManagerTransactionalWipesWhenStoresAreCorrupted() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.hasCorruptedStores()).thenReturn(true);
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
+
+        StateManagerUtil.closeStateManager(logger,
+            "logPrefix:", false, true, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
+
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
+        inOrder.verify(stateDirectory).unlock(taskId);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f466493d52e..48639fad16a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -307,6 +307,16 @@ public class StreamTaskTest {
         final Class<? extends DeserializationExceptionHandler> 
deserializationExceptionHandler,
         final Class<? extends ProcessingExceptionHandler> 
processingExceptionHandler,
         final Class<? extends TimestampExtractor> timestampExtractor) {
+        return createConfig(eosConfig, enforcedProcessingValue, 
deserializationExceptionHandler, processingExceptionHandler, 
timestampExtractor, false);
+    }
+
+    private static StreamsConfig createConfig(
+        final String eosConfig,
+        final String enforcedProcessingValue,
+        final Class<? extends DeserializationExceptionHandler> 
deserializationExceptionHandler,
+        final Class<? extends ProcessingExceptionHandler> 
processingExceptionHandler,
+        final Class<? extends TimestampExtractor> timestampExtractor,
+        final boolean transactionalStateStores) {
         final String canonicalPath;
         try {
             canonicalPath = BASE_DIR.getCanonicalPath();
@@ -324,7 +334,8 @@ public class StreamTaskTest {
             mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 
enforcedProcessingValue),
             
mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
deserializationExceptionHandler.getName()),
             mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
processingExceptionHandler.getName()),
-            mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
timestampExtractor.getName())
+            mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
timestampExtractor.getName()),
+            mkEntry(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, 
String.valueOf(transactionalStateStores))
         )));
     }
 
@@ -2910,6 +2921,38 @@ public class StreamTaskTest {
         verify(recordCollector, never()).offsets();
     }
 
+    @Test
+    public void 
shouldNotCheckpointOnPostCommitInRunningStateWithEosAndNotEnforced() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        // completeRestoration does not call commit() under EOS (verified by 
shouldNotCommitAfterRestorationWhenExactlyOnceEnabled)
+        task.completeRestoration(noOpResetter -> { });
+        task.postCommit(false);
+        // total commit() invocations should remain 0
+        verify(processorStateManager, never()).commit();
+    }
+
+    @Test
+    public void 
shouldCheckpointOnPostCommitInRunningStateWithEosAndTransactionalStateStores() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(
+            createConfig(EXACTLY_ONCE_V2, "100", 
LogAndFailExceptionHandler.class, LogAndFailProcessingExceptionHandler.class, 
FailOnInvalidTimestamp.class, true),
+            true,
+            processorStateManager
+        );
+        task.initializeIfNeeded();
+        // completeRestoration does not call commit() under EOS
+        task.completeRestoration(noOpResetter -> { });
+        task.postCommit(false);
+        // transactionalStateStoresEnabled=true triggers maybeCheckpoint() on 
postCommit even under EOS
+        verify(processorStateManager).commit();
+    }
+
     @Test
     public void 
punctuateShouldNotHandleFailProcessingExceptionAndThrowStreamsException() {
         when(stateManager.taskId()).thenReturn(taskId);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index baee5286cae..74dd97050ee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2471,6 +2471,7 @@ public class TaskManagerTest {
         when(corruptedActive.prepareCommit(false)).thenReturn(emptyMap());
         
when(corruptedActive.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
         doNothing().when(corruptedActive).suspend();
+        
doNothing().when(corruptedActive).markChangelogAsCorrupted(taskId00ChangelogPartitions);
         doNothing().when(corruptedActive).postCommit(true);
         doNothing().when(corruptedActive).closeDirty();
         doNothing().when(corruptedActive).revive();
@@ -2481,21 +2482,24 @@ public class TaskManagerTest {
 
         taskManager.handleCorruption(singleton(taskId00));
 
-        // 1. verify corrupted task was closed dirty and revived
+        // 1. verify corrupted task was closed dirty and revived; 
markChangelogAsCorrupted precedes postCommit
         final InOrder corruptedOrder = inOrder(corruptedActive, tasks);
         corruptedOrder.verify(corruptedActive).prepareCommit(false);
         corruptedOrder.verify(corruptedActive).suspend();
+        
corruptedOrder.verify(corruptedActive).markChangelogAsCorrupted(taskId00ChangelogPartitions);
         corruptedOrder.verify(corruptedActive).postCommit(true);
         corruptedOrder.verify(corruptedActive).closeDirty();
         corruptedOrder.verify(tasks).removeTask(corruptedActive);
         corruptedOrder.verify(corruptedActive).revive();
         
corruptedOrder.verify(tasks).addPendingTasksToInit(Set.of(corruptedActive));
 
-        // 2. verify uncorrupted task attempted commit, failed with timeout, 
then was closed dirty and revived
+        // 2. verify uncorrupted task attempted commit, failed with timeout; 
EOS converts TimeoutException to
+        //    TaskCorruptedException so it also ends up in the corrupted path 
(markAsCorrupted=true)
         final InOrder uncorruptedOrder = inOrder(uncorruptedActive, producer, 
tasks);
         uncorruptedOrder.verify(uncorruptedActive).prepareCommit(true);
-        uncorruptedOrder.verify(producer).commitTransaction(offsets, 
groupMetadata); // tries to commit, throws TimeoutException
+        uncorruptedOrder.verify(producer).commitTransaction(offsets, 
groupMetadata); // throws TimeoutException → TaskCorruptedException
         uncorruptedOrder.verify(uncorruptedActive).suspend();
+        
uncorruptedOrder.verify(uncorruptedActive).markChangelogAsCorrupted(taskId01ChangelogPartitions);
         uncorruptedOrder.verify(uncorruptedActive).postCommit(true);
         uncorruptedOrder.verify(uncorruptedActive).closeDirty();
         uncorruptedOrder.verify(tasks).removeTask(uncorruptedActive);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 1eaf63d19c8..6bbe268b34f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -655,6 +655,7 @@ public class StreamThreadStateStoreProviderTest {
             taskId,
             Task.TaskType.ACTIVE,
             StreamsConfigUtils.eosEnabled(streamsConfig),
+            false,
             logContext,
             stateDirectory,
             topology.storeToChangelogTopic(),
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 687825b1d10..2738458062a 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -486,6 +486,7 @@ public class TopologyTestDriver implements Closeable {
                 TASK_ID,
                 Task.TaskType.ACTIVE,
                 
StreamsConfig.EXACTLY_ONCE_V2.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+                
streamsConfig.getBoolean(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG),
                 logContext,
                 stateDirectory,
                 processorTopology.storeToChangelogTopic(),


Reply via email to