apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624682684


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
             final int maxPairs = taskCount * (taskCount - 1) / 2;
             this.taskPairs = new TaskPairs(maxPairs);
 
-            this.newTaskLocations = new HashMap<>();
-            this.newAssignments = new HashMap<>();
+            this.newTaskLocations = previousActiveAssignment.keySet().stream()
+                .collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+            this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+                KafkaStreamsState::processId,
+                state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+            ));
         }
 
         public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-            newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-            newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client)
-                .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
             taskPairs.addPairs(taskId, newAssignmentsForClient);
-            newAssignments.get(client).add(new AssignedTask(taskId, type));
-            newTaskLocations.get(taskId).add(client);
+
+            newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+            newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
         }
 
-        public Map<ProcessId, KafkaStreamsAssignment> 
buildKafkaStreamsAssignments() {
-            final Map<ProcessId, KafkaStreamsAssignment> 
kafkaStreamsAssignments = new HashMap<>();
-            for (final Map.Entry<ProcessId, Set<AssignedTask>> entry : 
newAssignments.entrySet()) {
-                final ProcessId processId = entry.getKey();
-                final Set<AssignedTask> assignedTasks = 
newAssignments.get(processId);
-                final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-                kafkaStreamsAssignments.put(processId, assignment);
-            }
-            return kafkaStreamsAssignments;
+        public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {
+            return newAssignments;
         }
 
         public void processOptimizedAssignments(final Map<ProcessId, 
KafkaStreamsAssignment> optimizedAssignments) {

Review Comment:
   > now that `#processOptimizedAssignments` is only updating the 
`newTaskLocations`, can we simplify things further by just keeping the 
`newTaskLocations` map up to date as we move tasks around? That way we can get 
rid of `processOptimizedAssignments` altogether
   
   Since the task assignments are mutated rather than "moved" around, and that 
mutation happens frequently outside of the context of this class (by 
`TaskAssignmentUtils` for instance), we can't easily make sure both variables 
change together. We would need this `processOptimizedAssignments` call to 
happen regardless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to