chia7712 commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1837437347


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -324,6 +324,31 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
+    private Map<Task, Set<TopicPartition>> assignStartupTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
+                                                              final String 
threadLogPrefix,
+                                                              final 
TopologyMetadata topologyMetadata,
+                                                              final 
ChangelogRegister changelogReader) {
+        if (stateDirectory.hasStartupTasks()) {
+            final Map<Task, Set<TopicPartition>> assignedTasks = new 
HashMap<>(tasksToAssign.size());
+            for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
tasksToAssign.entrySet()) {
+                final TaskId taskId = entry.getKey();
+                final Task task = stateDirectory.removeStartupTask(taskId);
+                if (task != null) {
+                    // replace our dummy values with the real ones, now we 
know our thread and assignment
+                    final Set<TopicPartition> inputPartitions = 
entry.getValue();
+                    task.stateManager().assignToStreamThread(new 
LogContext(threadLogPrefix), changelogReader, inputPartitions);
+                    task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(taskId));

Review Comment:
   What happens if the task has topics from intermediate internal topics? Since 
`topologyMetadata` doesn't return topics with a prefix, it could cause a 
topology exception when recycling a standby task to an active task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to