showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833872517



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, 
final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // for non-revoking active tasks, we should not enforce checkpoint
+        // as it's EOS enabled in which case no checkpoint should be written 
while
+        // the task is in RUNNING tate
+        for (final Task task : activeTasksNeedCommit) {
+            if (!dirtyTasks.contains(task)) {
+                try {
+                    task.postCommit(false);

Review comment:
       Seriously, I'm not quite sure if we should commit checkpoint here or 
not. When entering this phase, the task might be `RESTORING` state or `RUNNING` 
state or others, maybe. We can checkpoint for `RESTORING` state, but for 
`RUNNING` state, we might not have to. So, I was thinking we did something like 
this:
   ```
   task.postCommit(!task.state().equals("RUNNING"));
   ```
   But I checked again the discussion thread in JIRA, @ableegoldman  suggest we 
did checkpoint after committing. So, I'm wondering if we should just force to 
`true` here or not?
   
https://issues.apache.org/jira/browse/KAFKA-13295?focusedCommentId=17429067&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429067
   
   cc @guozhangwang @ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to