This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2da54180407 KAFKA-16876: TaskManager.handleRevocation doesn't handle 
errors thrown from task.prepareCommit (#22282)
2da54180407 is described below

commit 2da541804077dee04025fae47070ffa5d1b029b3
Author: gabriellafu <[email protected]>
AuthorDate: Tue May 26 14:48:28 2026 -0400

    KAFKA-16876: TaskManager.handleRevocation doesn't handle errors thrown from 
task.prepareCommit (#22282)
    
    1. Fix TaskManager.handleRevocation to always suspend revoked tasks,
    even when prepareCommit throws (e.g. TaskMigratedException from
    producer.send during cache flush). Previously the exception propagated
    uncaught, skipping the suspend loop entirely. This left tasks in RUNNING
    state, which caused a downstream IllegalStateException when
    handleAssignment tried to close them.
    2. Wrap prepare/commit/postCommit in try-finally so the suspend loop and
    task unlock are guaranteed to execute regardless of where an exception
    occurs.
    3. Preserve all exceptions via addSuppressed instead of silently
    dropping later exceptions. The first exception remains the primary
    thrown exception for backward compatibility, but subsequent exceptions
    (e.g. the IllegalStateException from closing an unsuspended task) are
    now attached as suppressed exceptions instead of lost.
    4.  Updated all callers in shutdown(), tryCloseCleanActiveTasks(), and
    handleRevocation to use maybeSetFirstException consistently. This is a
    behavior change: secondary failures during shutdown and task close are
    now visible as suppressed exceptions instead of being lost.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   | 187 ++++++++++++---------
 .../processor/internals/TaskManagerTest.java       |  50 ++++++
 2 files changed, 153 insertions(+), 84 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 337e70492c0..a25e0e32f88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -430,8 +430,8 @@ public class TaskManager {
                     if (exception instanceof TaskMigratedException) {
                         lastTaskMigrated = (TaskMigratedException) exception;
                     } else if (exception instanceof TaskCorruptedException) {
-                        log.warn("Encounter corrupted task " + taskId + ", 
will group it with other corrupted tasks " +
-                            "and handle together", exception);
+                        log.warn("Encounter corrupted task {}, will group it 
with other corrupted tasks " +
+                            "and handle together", taskId, exception);
                         aggregatedCorruptedTaskIds.add(taskId);
                     } else {
                         ((StreamsException) exception).setTaskId(taskId);
@@ -1035,105 +1035,120 @@ public class TaskManager {
         final Set<TaskId> lockedTaskIds = 
activeRunningTaskIterable().stream().map(Task::id).collect(Collectors.toSet());
         maybeLockTasks(lockedTaskIds);
 
+        // After locking, everything must be inside try-finally to guarantee 
suspend and unlock.
+        final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        final Set<Task> tasksToSkipPostCommit = new 
TreeSet<>(Comparator.comparing(Task::id));
         boolean revokedTasksNeedCommit = false;
-        for (final StreamTask task : activeRunningTaskIterable()) {
-            if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
-                // when the task input partitions are included in the revoked 
list,
-                // this is an active task and should be revoked
+        boolean prepareCommitSucceeded = false;
+        try {
+            for (final StreamTask task : activeRunningTaskIterable()) {
+                if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
+                    // when the task input partitions are included in the 
revoked list,
+                    // this is an active task and should be revoked
 
-                revokedActiveTasks.add(task);
-                remainingRevokedPartitions.removeAll(task.inputPartitions());
+                    revokedActiveTasks.add(task);
+                    
remainingRevokedPartitions.removeAll(task.inputPartitions());
 
-                revokedTasksNeedCommit |= task.commitNeeded();
-            } else if (task.commitNeeded()) {
-                commitNeededActiveTasks.add(task);
+                    revokedTasksNeedCommit |= task.commitNeeded();
+                } else if (task.commitNeeded()) {
+                    commitNeededActiveTasks.add(task);
+                }
             }
-        }
 
-        revokeTasksInStateUpdater(remainingRevokedPartitions);
+            revokeTasksInStateUpdater(remainingRevokedPartitions);
 
-        if (!remainingRevokedPartitions.isEmpty()) {
-            log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
-                          + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
-                         "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
-        }
-
-        if (revokedTasksNeedCommit) {
-            prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
-            // if we need to commit any revoking task then we just commit all 
of those needed committing together
-            prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
-        }
-
-        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
-        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
-        // as such we just need to skip those dirty tasks in the checkpoint
-        final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
-        try {
-            if (revokedTasksNeedCommit) {
-                // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
-                // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to 
make sure we don't skip the
-                // offset commit because we are in a rebalance
-                
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+            if (!remainingRevokedPartitions.isEmpty()) {
+                log.debug("The following revoked partitions {} are missing 
from the current task partitions. It could "
+                              + "potentially be due to race condition of 
consumer detecting the heartbeat failure, or the tasks " +
+                             "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
             }
-        } catch (final TaskCorruptedException e) {
-            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
-                     e.corruptedTasks());
-
-            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
-            closeDirtyAndRevive(dirtyTasks, true);
-        } catch (final TimeoutException e) {
-            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
-
-            // If we hit a TimeoutException it must be ALOS, just close dirty 
and revive without wiping the state
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-            closeDirtyAndRevive(dirtyTasks, false);
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-        }
 
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
-        // going to be closed we would checkpoint by then
-        for (final Task task : revokedActiveTasks) {
-            if (!dirtyTasks.contains(task)) {
+            if (revokedTasksNeedCommit) {
                 try {
-                    task.postCommit(true);
+                    prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
+                    // if we need to commit any revoking task then we just 
commit all of those needed committing together
+                    prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
+                    prepareCommitSucceeded = true;
                 } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + 
task.id(), e);
-                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                    log.error("Exception caught while preparing to commit 
revoked tasks {} and commit-needed tasks {}", revokedActiveTasks, 
commitNeededActiveTasks, e);
+                    maybeSetFirstException(false, e, firstException);
+                    tasksToSkipPostCommit.addAll(revokedActiveTasks);
+                    tasksToSkipPostCommit.addAll(commitNeededActiveTasks);
                 }
             }
-        }
 
-        if (revokedTasksNeedCommit) {
-            for (final Task task : commitNeededActiveTasks) {
-                if (!dirtyTasks.contains(task)) {
+            try {
+                if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+                    // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
+                    // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() 
to make sure we don't skip the
+                    // offset commit because we are in a rebalance
+                    
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+                }
+            } catch (final TaskCorruptedException e) {
+                log.warn("Some tasks were corrupted when trying to commit 
offsets, these will be cleaned and revived: {}",
+                         e.corruptedTasks());
+
+                // If we hit a TaskCorruptedException it must be EOS, just 
handle the cleanup for those corrupted tasks right here
+                dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+                closeDirtyAndRevive(dirtyTasks, true);
+            } catch (final TimeoutException e) {
+                log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+                // If we hit a TimeoutException it must be ALOS, just close 
dirty and revive without wiping the state
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+                closeDirtyAndRevive(dirtyTasks, false);
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while committing those revoked 
tasks {}", revokedActiveTasks, e);
+                maybeSetFirstException(false, e, firstException);
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+            }
+
+            // we enforce checkpointing upon suspending a task: if it is 
resumed later we just proceed normally, if it is
+            // going to be closed we would checkpoint by then
+            for (final Task task : revokedActiveTasks) {
+                if (!dirtyTasks.contains(task) && 
!tasksToSkipPostCommit.contains(task)) {
                     try {
-                        // for non-revoking active tasks, we should not 
enforce checkpoint
-                        // since if it is EOS enabled, no checkpoint should be 
written while
-                        // the task is in RUNNING tate
-                        task.postCommit(false);
+                        task.postCommit(true);
                     } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
+                        log.error("Exception caught while post-committing task 
{}", task.id(), e);
                         maybeSetFirstException(false, 
maybeWrapTaskException(e, task.id()), firstException);
                     }
                 }
             }
-        }
 
-        for (final Task task : revokedActiveTasks) {
+            if (revokedTasksNeedCommit) {
+                for (final Task task : commitNeededActiveTasks) {
+                    if (!dirtyTasks.contains(task) && 
!tasksToSkipPostCommit.contains(task)) {
+                        try {
+                            // for non-revoking active tasks, we should not 
enforce checkpoint
+                            // since if it is EOS enabled, no checkpoint 
should be written while
+                            // the task is in RUNNING state
+                            task.postCommit(false);
+                        } catch (final RuntimeException e) {
+                            log.error("Exception caught while post-committing 
task {}", task.id(), e);
+                            maybeSetFirstException(false, 
maybeWrapTaskException(e, task.id()), firstException);
+                        }
+                    }
+                }
+            }
+        } finally {
+            for (final Task task : revokedActiveTasks) {
+                try {
+                    task.suspend();
+                } catch (final RuntimeException e) {
+                    log.error("Caught the following exception while trying to 
suspend revoked task {}", task.id(), e);
+                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                }
+            }
+
             try {
-                task.suspend();
+                maybeUnlockTasks(lockedTaskIds);
             } catch (final RuntimeException e) {
-                log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
-                maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                log.error("Exception caught while unlocking tasks {}", 
lockedTaskIds, e);
+                maybeSetFirstException(false, e, firstException);
             }
         }
 
-        maybeUnlockTasks(lockedTaskIds);
-
         if (firstException.get() != null) {
             throw firstException.get();
         }
@@ -1385,14 +1400,14 @@ public class TaskManager {
         executeAndMaybeSwallow(
             clean,
             () -> closeAndCleanUpTasks(activeTasks, standbyTasks, clean),
-            e -> firstException.compareAndSet(null, e),
+            e -> maybeSetFirstException(false, e, firstException),
             e -> log.warn("Ignoring an exception while unlocking remaining 
task directories.", e)
         );
 
         executeAndMaybeSwallow(
             clean,
             activeTaskCreator::close,
-            e -> firstException.compareAndSet(null, e),
+            e -> maybeSetFirstException(false, e, firstException),
             e -> log.warn("Ignoring an exception while closing thread 
producer.", e)
         );
 
@@ -1403,7 +1418,7 @@ public class TaskManager {
         executeAndMaybeSwallow(
             clean,
             this::releaseLockedUnassignedTaskDirectories,
-            e -> firstException.compareAndSet(null, e),
+            e -> maybeSetFirstException(false, e, firstException),
             e -> log.warn("Ignoring an exception while unlocking remaining 
task directories.", e)
         );
 
@@ -1512,10 +1527,10 @@ public class TaskManager {
                 tasksToCloseDirty.add(task);
             } catch (final StreamsException e) {
                 e.setTaskId(task.id());
-                firstException.compareAndSet(null, e);
+                maybeSetFirstException(false, e, firstException);
                 tasksToCloseDirty.add(task);
             } catch (final RuntimeException e) {
-                firstException.compareAndSet(null, new StreamsException(e, 
task.id()));
+                maybeSetFirstException(false, new StreamsException(e, 
task.id()), firstException);
                 tasksToCloseDirty.add(task);
             }
         }
@@ -1528,7 +1543,7 @@ public class TaskManager {
             try {
                 
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
-                log.error("Exception caught while committing tasks " + 
consumedOffsetsAndMetadataPerTask.keySet(), e);
+                log.error("Exception caught while committing tasks {}", 
consumedOffsetsAndMetadataPerTask.keySet(), e);
                 // TODO: should record the task ids when handling this 
exception
                 maybeSetFirstException(false, e, firstException);
 
@@ -1552,7 +1567,7 @@ public class TaskManager {
                 try {
                     task.postCommit(true);
                 } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + 
task.id(), e);
+                    log.error("Exception caught while post-committing task 
{}", task.id(), e);
                     maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
                     tasksToCloseDirty.add(task);
                     tasksToCloseClean.remove(task);
@@ -2049,7 +2064,11 @@ public class TaskManager {
                                         final RuntimeException exception,
                                         final 
AtomicReference<RuntimeException> firstException) {
         if (!ignoreTaskMigrated || !(exception instanceof 
TaskMigratedException)) {
-            firstException.compareAndSet(null, exception);
+            if (!firstException.compareAndSet(null, exception)) {
+                if (exception != firstException.get()) {
+                    firstException.get().addSuppressed(exception);
+                }
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 74dd97050ee..3a84f779e42 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -78,6 +78,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -2993,6 +2994,55 @@ public class TaskManagerTest {
         verify(task00).suspend();
     }
 
+    @Test
+    public void shouldSuspendRevokedTasksWhenPrepareCommitThrows() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
+
+        when(task00.commitNeeded()).thenReturn(true);
+        when(task00.prepareCommit(true)).thenThrow(new 
TaskMigratedException("task migrated"));
+
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        final StreamsException thrown = assertThrows(StreamsException.class,
+            () -> taskManager.handleRevocation(taskId00Partitions));
+
+        assertInstanceOf(TaskMigratedException.class, thrown);
+        assertEquals(Optional.of(taskId00), thrown.taskId());
+
+        verify(task00).suspend();
+        verify(task00, never()).postCommit(anyBoolean());
+    }
+
+    @Test
+    public void 
shouldAttachSuppressedExceptionWhenPrepareCommitAndSuspendBothFailDuringRevocation()
 {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
+
+        when(task00.commitNeeded()).thenReturn(true);
+        when(task00.prepareCommit(true)).thenThrow(new 
TaskMigratedException("task migrated"));
+        doThrow(new RuntimeException("suspend failed")).when(task00).suspend();
+
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        final StreamsException thrown = assertThrows(StreamsException.class,
+            () -> taskManager.handleRevocation(taskId00Partitions));
+
+        assertInstanceOf(TaskMigratedException.class, thrown);
+        assertEquals(1, thrown.getSuppressed().length);
+        assertInstanceOf(StreamsException.class, thrown.getSuppressed()[0]);
+    }
+
     @Test
     public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
         // task being revoked, needs commit

Reply via email to