chia7712 commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1837437347
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -324,6 +324,31 @@ private void closeDirtyAndRevive(final Collection<Task>
taskWithChangelogs, fina
}
}
+ private Map<Task, Set<TopicPartition>> assignStartupTasks(final
Map<TaskId, Set<TopicPartition>> tasksToAssign,
+ final String
threadLogPrefix,
+ final
TopologyMetadata topologyMetadata,
+ final
ChangelogRegister changelogReader) {
+ if (stateDirectory.hasStartupTasks()) {
+ final Map<Task, Set<TopicPartition>> assignedTasks = new
HashMap<>(tasksToAssign.size());
+ for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
tasksToAssign.entrySet()) {
+ final TaskId taskId = entry.getKey();
+ final Task task = stateDirectory.removeStartupTask(taskId);
+ if (task != null) {
+ // replace our dummy values with the real ones, now we
know our thread and assignment
+ final Set<TopicPartition> inputPartitions =
entry.getValue();
+ task.stateManager().assignToStreamThread(new
LogContext(threadLogPrefix), changelogReader, inputPartitions);
+ task.updateInputPartitions(inputPartitions,
topologyMetadata.nodeToSourceTopics(taskId));
Review Comment:
What happens if the task has topics from intermediate internal topics? Since
`topologyMetadata` doesn't return topics with a prefix, it could cause a
topology exception when recycling a standby task to an active task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]