bbejeck commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2344838718


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -253,10 +250,10 @@ private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
         }
         boolean found = false;
         if (!processWithLeastLoad.hasTask(taskId)) {
-            final String memberId = memberWithLeastLoad(processWithLeastLoad);
-            if (memberId != null) {
-                processWithLeastLoad.addTask(memberId, taskId, false);
+            final int newTaskCount = 
processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
+            if (newTaskCount != -1) {
                 found = true;
+                maybeUpdateTasksPerMember(newTaskCount);

Review Comment:
   I'm not sure if I follow completely - does Copilot have a point? I'm 
suspecting this isn't an issue.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>> 
assignedStandbyTasksByMember() {
     }
 
     public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        addTaskInternal(memberId, taskId, isActive);

Review Comment:
   why `addTaskInternal` when the signature is the same?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -99,9 +98,9 @@ private void initialize(final GroupSpec groupSpec, final 
TopologyDescriber topol
         localState.activeTasksPerMember = 
computeTasksPerMember(localState.totalActiveTasks, 
localState.totalMembersWithActiveTaskCapacity);
         localState.tasksPerMember = 
computeTasksPerMember(localState.totalTasks, 
localState.totalMembersWithTaskCapacity);
 
-        localState.processIdToState = new HashMap<>();
-        localState.activeTaskToPrevMember = new HashMap<>();
-        localState.standbyTaskToPrevMember = new HashMap<>();
+        localState.processIdToState = new 
HashMap<>(localState.totalMembersWithActiveTaskCapacity);
+        localState.activeTaskToPrevMember = new 
HashMap<>(localState.totalActiveTasks);
+        localState.standbyTaskToPrevMember = new 
HashMap<>(localState.numStandbyReplicas > 0 ? (localState.totalTasks - 
localState.totalActiveTasks) / localState.numStandbyReplicas : 0);

Review Comment:
   If `localState.numStandbyReplicas==0` do we need this Map, or could this 
represent the state before any standbys are assigned?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>> 
assignedStandbyTasksByMember() {
     }
 
     public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        addTaskInternal(memberId, taskId, isActive);
+        membersByLoad = null; // reset, since it may not be sorted anymore

Review Comment:
   Good comment - this is the first thing I thought of when I saw 
`membersByLoad = null;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to