ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623081838


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
             final int maxPairs = taskCount * (taskCount - 1) / 2;
             this.taskPairs = new TaskPairs(maxPairs);
 
-            this.newTaskLocations = new HashMap<>();
-            this.newAssignments = new HashMap<>();
+            this.newTaskLocations = previousActiveAssignment.keySet().stream()
+                .collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+            this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+                KafkaStreamsState::processId,
+                state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+            ));
         }
 
         public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {

Review Comment:
   Overlooked this earlier I guess, these should all be private methods right? 
Pretty much everything other than #assign ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
             final int maxPairs = taskCount * (taskCount - 1) / 2;
             this.taskPairs = new TaskPairs(maxPairs);
 
-            this.newTaskLocations = new HashMap<>();
-            this.newAssignments = new HashMap<>();
+            this.newTaskLocations = previousActiveAssignment.keySet().stream()
+                .collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+            this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+                KafkaStreamsState::processId,
+                state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+            ));
         }
 
         public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-            newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-            newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client)
-                .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
             taskPairs.addPairs(taskId, newAssignmentsForClient);
-            newAssignments.get(client).add(new AssignedTask(taskId, type));
-            newTaskLocations.get(taskId).add(client);
+
+            newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+            newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
         }
 
-        public Map<ProcessId, KafkaStreamsAssignment> 
buildKafkaStreamsAssignments() {
-            final Map<ProcessId, KafkaStreamsAssignment> 
kafkaStreamsAssignments = new HashMap<>();
-            for (final Map.Entry<ProcessId, Set<AssignedTask>> entry : 
newAssignments.entrySet()) {
-                final ProcessId processId = entry.getKey();
-                final Set<AssignedTask> assignedTasks = 
newAssignments.get(processId);
-                final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-                kafkaStreamsAssignments.put(processId, assignment);
-            }
-            return kafkaStreamsAssignments;
+        public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {
+            return newAssignments;
         }
 
         public void processOptimizedAssignments(final Map<ProcessId, 
KafkaStreamsAssignment> optimizedAssignments) {

Review Comment:
   now that `#processOptimizedAssignments` is only updating the 
`newTaskLocations`, can we simplify things further by just keeping the 
`newTaskLocations` map up to date as we move tasks around? That way we can get 
rid of `processOptimizedAssignments` altogether



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
             final int maxPairs = taskCount * (taskCount - 1) / 2;
             this.taskPairs = new TaskPairs(maxPairs);
 
-            this.newTaskLocations = new HashMap<>();
-            this.newAssignments = new HashMap<>();
+            this.newTaskLocations = previousActiveAssignment.keySet().stream()
+                .collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+            this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+                KafkaStreamsState::processId,
+                state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+            ));
         }
 
         public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-            newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-            newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client)
-                .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
             taskPairs.addPairs(taskId, newAssignmentsForClient);
-            newAssignments.get(client).add(new AssignedTask(taskId, type));
-            newTaskLocations.get(taskId).add(client);
+
+            newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+            newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
         }
 
-        public Map<ProcessId, KafkaStreamsAssignment> 
buildKafkaStreamsAssignments() {
-            final Map<ProcessId, KafkaStreamsAssignment> 
kafkaStreamsAssignments = new HashMap<>();
-            for (final Map.Entry<ProcessId, Set<AssignedTask>> entry : 
newAssignments.entrySet()) {
-                final ProcessId processId = entry.getKey();
-                final Set<AssignedTask> assignedTasks = 
newAssignments.get(processId);
-                final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-                kafkaStreamsAssignments.put(processId, assignment);
-            }
-            return kafkaStreamsAssignments;
+        public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {
+            return newAssignments;
         }
 
         public void processOptimizedAssignments(final Map<ProcessId, 
KafkaStreamsAssignment> optimizedAssignments) {

Review Comment:
   On a similar note, if KafkaStreamsAssignment is mutable then we should (in 
theory) be able to just add/remove tasks in place, so that `newAssignments` 
becomes a final variable and we don't need to make copies of the assignments 
map just to override the `newAssignments`. 
   
   I realize this is a slightly bigger change so I'm happy to punt this to a 
followup PR and stick with the changes already present for the current PR. Just 
want to save some unnecessary copies



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
             final int maxPairs = taskCount * (taskCount - 1) / 2;
             this.taskPairs = new TaskPairs(maxPairs);
 
-            this.newTaskLocations = new HashMap<>();
-            this.newAssignments = new HashMap<>();
+            this.newTaskLocations = previousActiveAssignment.keySet().stream()
+                .collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+            this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+                KafkaStreamsState::processId,
+                state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+            ));
         }
 
         public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-            newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-            newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client)
-                .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+            final Set<TaskId> newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
             taskPairs.addPairs(taskId, newAssignmentsForClient);
-            newAssignments.get(client).add(new AssignedTask(taskId, type));
-            newTaskLocations.get(taskId).add(client);
+
+            newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+            newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
         }
 
-        public Map<ProcessId, KafkaStreamsAssignment> 
buildKafkaStreamsAssignments() {
-            final Map<ProcessId, KafkaStreamsAssignment> 
kafkaStreamsAssignments = new HashMap<>();
-            for (final Map.Entry<ProcessId, Set<AssignedTask>> entry : 
newAssignments.entrySet()) {
-                final ProcessId processId = entry.getKey();
-                final Set<AssignedTask> assignedTasks = 
newAssignments.get(processId);
-                final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-                kafkaStreamsAssignments.put(processId, assignment);
-            }
-            return kafkaStreamsAssignments;
+        public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {

Review Comment:
   no-op method now, can be deleted



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -331,7 +328,7 @@ public Set<ProcessId> findClientsWithoutAssignedTask(final 
TaskId taskId) {
 
         public double clientLoad(final ProcessId processId) {
             final int capacity = clients.get(processId).numProcessingThreads();
-            final double totalTaskCount = 
newAssignments.getOrDefault(processId, new HashSet<>()).size();
+            final double totalTaskCount = 
newAssignments.getOrDefault(processId, KafkaStreamsAssignment.of(processId, new 
HashSet<>())).tasks().size();

Review Comment:
   just a thought: since we require that all ProcessIds present in the input 
(ie the ApplicationState#getKafkaStreamsState map) are also present in the 
output assignment, we can just initialize the `newAssignments` field to a map 
with keyset equal to the #getKafkaStreamsState keyset, and initialized to only 
an empty map of tasks. That way we don't need this `getOrDefault` at all and 
should be able to always be safe calling get on any ProcessId -- does that make 
sense?
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -344,7 +341,7 @@ public ProcessId findLeastLoadedClient(final TaskId taskId, 
final Set<ProcessId>
                 }
 
                 if (leastLoaded == null || thisClientLoad < 
clientLoad(leastLoaded)) {
-                    final Set<TaskId> assignedTasks = 
newAssignments.getOrDefault(processId, new HashSet<>())
+                    final Set<TaskId> assignedTasks = 
newAssignments.getOrDefault(processId, KafkaStreamsAssignment.of(processId, new 
HashSet<>())).tasks().values()

Review Comment:
   See above, would be nice to clean up some of the getOrDefaults if possible 
(but it's not a huge deal, they don't add that much complexity)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to