lucasbru commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2348334784


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -99,9 +98,9 @@ private void initialize(final GroupSpec groupSpec, final 
TopologyDescriber topol
         localState.activeTasksPerMember = 
computeTasksPerMember(localState.totalActiveTasks, 
localState.totalMembersWithActiveTaskCapacity);
         localState.tasksPerMember = 
computeTasksPerMember(localState.totalTasks, 
localState.totalMembersWithTaskCapacity);
 
-        localState.processIdToState = new HashMap<>();
-        localState.activeTaskToPrevMember = new HashMap<>();
-        localState.standbyTaskToPrevMember = new HashMap<>();
+        localState.processIdToState = new 
HashMap<>(localState.totalMembersWithActiveTaskCapacity);
+        localState.activeTaskToPrevMember = new 
HashMap<>(localState.totalActiveTasks);
+        localState.standbyTaskToPrevMember = new 
HashMap<>(localState.numStandbyReplicas > 0 ? (localState.totalTasks - 
localState.totalActiveTasks) / localState.numStandbyReplicas : 0);

Review Comment:
   In the corner case where standby tasks were assigned before but 
`numStandbyReplicas` was now set to 0, we'd still fill this map, but not use 
it. I think that should be fine. We are just setting the initial capacity here, 
so in such a corner case, Java would just grow the map.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -253,10 +250,10 @@ private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
         }
         boolean found = false;
         if (!processWithLeastLoad.hasTask(taskId)) {
-            final String memberId = memberWithLeastLoad(processWithLeastLoad);
-            if (memberId != null) {
-                processWithLeastLoad.addTask(memberId, taskId, false);
+            final int newTaskCount = 
processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
+            if (newTaskCount != -1) {
                 found = true;
+                maybeUpdateTasksPerMember(newTaskCount);

Review Comment:
   What copilot is saying is a bit unclear, but with a bit of guessing, maybe 
they mean this: We could fill the quota for "total tasks" already by assigning 
only "active tasks", but during active task assignment, we don't call 
`maybeUpdateTasksPerMember`. I don't think there is a case where this makes a 
difference, but I updated it anyways to maybe make things easier to understand. 
We just call `maybeUpdateTasksPerMember` everytime we assign any task.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>> 
assignedStandbyTasksByMember() {
     }
 
     public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        addTaskInternal(memberId, taskId, isActive);
+        membersByLoad = null; // reset, since it may not be sorted anymore

Review Comment:
   Added the comment to the code



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>> 
assignedStandbyTasksByMember() {
     }
 
     public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        addTaskInternal(memberId, taskId, isActive);

Review Comment:
   Because this one resets the heap, while the internal one doesn't



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to