apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624657249


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
             final int maxPairs = taskCount * (taskCount - 1) / 2;
             this.taskPairs = new TaskPairs(maxPairs);
 
-            this.newTaskLocations = new HashMap<>();
-            this.newAssignments = new HashMap<>();
+            this.newTaskLocations = previousActiveAssignment.keySet().stream()
+                .collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+            this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+                KafkaStreamsState::processId,
+                state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+            ));
         }
 
         public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-            newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-            newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client)
-                .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
             taskPairs.addPairs(taskId, newAssignmentsForClient);
-            newAssignments.get(client).add(new AssignedTask(taskId, type));
-            newTaskLocations.get(taskId).add(client);
+
+            newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+            newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
         }
 
-        public Map<ProcessId, KafkaStreamsAssignment> 
buildKafkaStreamsAssignments() {
-            final Map<ProcessId, KafkaStreamsAssignment> 
kafkaStreamsAssignments = new HashMap<>();
-            for (final Map.Entry<ProcessId, Set<AssignedTask>> entry : 
newAssignments.entrySet()) {
-                final ProcessId processId = entry.getKey();
-                final Set<AssignedTask> assignedTasks = 
newAssignments.get(processId);
-                final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-                kafkaStreamsAssignments.put(processId, assignment);
-            }
-            return kafkaStreamsAssignments;
+        public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {
+            return newAssignments;
         }
 
         public void processOptimizedAssignments(final Map<ProcessId, 
KafkaStreamsAssignment> optimizedAssignments) {

Review Comment:
   This would involve changing `TaskAssignmentUtils`. Currently it's not 
obvious that those methods mutate the input (and until recently, it wasn't). 
The public methods all return `Map<ProcessId, KafkaStreamsAssignment>` so I 
suggest we change those to void and assume mutation everywhere uphill of those 
calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to