Copilot commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2340625415


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -94,8 +102,38 @@ public void addTask(final String memberId, final TaskId 
taskId, final boolean is
             assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
             assignedStandbyTasks.get(memberId).add(taskId);
         }
-        memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+        int newTaskCount = memberToTaskCounts.get(memberId) + 1;
+        memberToTaskCounts.put(memberId, newTaskCount);
         computeLoad();
+        return newTaskCount;
+    }
+
+    public int addTaskToLeastLoadedMember(final TaskId taskId, final boolean 
isActive) {
+        if (memberToTaskCounts.isEmpty()) {
+            return -1;
+        }
+        if (memberToTaskCounts.size() == 1) {
+            return 
addTaskInternal(memberToTaskCounts.keySet().iterator().next(), taskId, 
isActive);
+        }
+        if (membersByLoad == null) {
+            membersByLoad = new PriorityQueue<>(
+                memberToTaskCounts.size(),
+                Map.Entry.comparingByValue()
+            );
+            for (Map.Entry<String, Integer> entry : 
memberToTaskCounts.entrySet()) {
+                // Copy here, since map entry objects are allowed to be reused 
by the underlying map implementation.
+                membersByLoad.add(new 
AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));

Review Comment:
   Creating new AbstractMap.SimpleEntry objects for each entry is inefficient. 
Consider using a custom wrapper class or storing member IDs directly with their 
task counts in a separate data structure.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -94,8 +102,38 @@ public void addTask(final String memberId, final TaskId 
taskId, final boolean is
             assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
             assignedStandbyTasks.get(memberId).add(taskId);
         }
-        memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+        int newTaskCount = memberToTaskCounts.get(memberId) + 1;
+        memberToTaskCounts.put(memberId, newTaskCount);
         computeLoad();
+        return newTaskCount;
+    }
+
+    public int addTaskToLeastLoadedMember(final TaskId taskId, final boolean 
isActive) {
+        if (memberToTaskCounts.isEmpty()) {
+            return -1;
+        }
+        if (memberToTaskCounts.size() == 1) {
+            return 
addTaskInternal(memberToTaskCounts.keySet().iterator().next(), taskId, 
isActive);
+        }
+        if (membersByLoad == null) {
+            membersByLoad = new PriorityQueue<>(
+                memberToTaskCounts.size(),
+                Map.Entry.comparingByValue()
+            );
+            for (Map.Entry<String, Integer> entry : 
memberToTaskCounts.entrySet()) {
+                // Copy here, since map entry objects are allowed to be reused 
by the underlying map implementation.
+                membersByLoad.add(new 
AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));
+            }
+        }
+        Map.Entry<String, Integer> member = membersByLoad.poll();
+        if (member != null) {
+            int newTaskCount = addTaskInternal(member.getKey(), taskId, 
isActive);
+            member.setValue(newTaskCount);
+            membersByLoad.add(member); // Reinsert the updated member back 
into the priority queue
+            return newTaskCount;
+        } else {
+            throw new TaskAssignorException("No members available to assign 
task " + taskId);
+        }

Review Comment:
   This exception will never be thrown because the method already checks for 
empty memberToTaskCounts at the beginning and returns -1. If memberToTaskCounts 
is not empty, poll() should never return null.
   ```suggestion
           int newTaskCount = addTaskInternal(member.getKey(), taskId, 
isActive);
           member.setValue(newTaskCount);
           membersByLoad.add(member); // Reinsert the updated member back into 
the priority queue
           return newTaskCount;
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -253,10 +250,10 @@ private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
         }
         boolean found = false;
         if (!processWithLeastLoad.hasTask(taskId)) {
-            final String memberId = memberWithLeastLoad(processWithLeastLoad);
-            if (memberId != null) {
-                processWithLeastLoad.addTask(memberId, taskId, false);
+            final int newTaskCount = 
processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
+            if (newTaskCount != -1) {
                 found = true;
+                maybeUpdateTasksPerMember(newTaskCount);

Review Comment:
   The maybeUpdateTasksPerMember call is missing in the original addTask flow 
but present here. This creates inconsistent behavior between addTask and 
addTaskToLeastLoadedMember paths for standby task assignment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>> 
assignedStandbyTasksByMember() {
     }
 
     public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        addTaskInternal(memberId, taskId, isActive);
+        membersByLoad = null; // reset, since it may not be sorted anymore

Review Comment:
   Setting membersByLoad to null on every addTask call negates the heap 
optimization benefits. The heap should be maintained incrementally rather than 
rebuilt each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to