showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833872517
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task>
taskWithChangelogs, fina
}
}
+ private void commitActiveTasks(final Set<Task> activeTasksNeedCommit,
final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+ final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsPerTask = new HashMap<>();
+ prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit,
consumedOffsetsPerTask);
+
+ final Set<Task> dirtyTasks = new HashSet<>();
+ try {
+ taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ } catch (final TaskCorruptedException e) {
+ log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
+ e.corruptedTasks());
+
+ // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
+ dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+ closeDirtyAndRevive(dirtyTasks, true);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while committing active tasks: " +
consumedOffsetsPerTask.keySet(), e);
+ activeTasksCommitException.compareAndSet(null, e);
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ }
+
+ // for non-revoking active tasks, we should not enforce checkpoint
+ // as it's EOS enabled in which case no checkpoint should be written
while
+ // the task is in RUNNING tate
+ for (final Task task : activeTasksNeedCommit) {
+ if (!dirtyTasks.contains(task)) {
+ try {
+ task.postCommit(false);
Review comment:
Seriously, I'm not quite sure if we should commit checkpoint here or
not. When entering this phase, the task might be `RESTORING` state or `RUNNING`
state or others, maybe. We can checkpoint for `RESTORING` state, but for
`RUNNING` state, we might not have to. So, I was thinking we did something like
this:
```
task.postCommit(!task.state().equals("RUNNING"));
```
But I checked again the discussion thread in JIRA, @ableegoldman suggest we
did checkpoint after committing. So, I'm wondering if we should just force to
`true` here or not?
https://issues.apache.org/jira/browse/KAFKA-13295?focusedCommentId=17429067&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429067
cc @guozhangwang @ableegoldman
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]