Copilot commented on code in PR #20458:
URL: https://github.com/apache/kafka/pull/20458#discussion_r2318899271


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -103,19 +113,19 @@ private void initialize(final GroupSpec groupSpec, final 
TopologyDescriber topol
             localState.processIdToState.get(processId).addMember(memberId);
 
             // prev active tasks
-            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {
-                Set<Integer> partitionNoSet = entry.getValue();
-                for (int partitionNo : partitionNoSet) {
+            for (final Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {
+                final Set<Integer> partitionNoSet = entry.getValue();
+                for (final int partitionNo : partitionNoSet) {
                     localState.activeTaskToPrevMember.put(new 
TaskId(entry.getKey(), partitionNo), member);
                 }
             }
 
             // prev standby tasks
-            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.standbyTasks().entrySet()) {
-                Set<Integer> partitionNoSet = entry.getValue();
-                for (int partitionNo : partitionNoSet) {
-                    TaskId taskId = new TaskId(entry.getKey(), partitionNo);
-                    localState.standbyTaskToPrevMember.putIfAbsent(taskId, new 
HashSet<>());
+            for (final Map.Entry<String, Set<Integer>> entry : 
memberSpec.standbyTasks().entrySet()) {
+                final Set<Integer> partitionNoSet = entry.getValue();
+                for (final int partitionNo : partitionNoSet) {
+                    final TaskId taskId = new TaskId(entry.getKey(), 
partitionNo);
+                    localState.standbyTaskToPrevMember.putIfAbsent(taskId, new 
ArrayList<>());

Review Comment:
   The change from `Set<Member>` to `ArrayList<Member>` for 
`standbyTaskToPrevMember` allows duplicate members to be added. This could lead 
to incorrect behavior when multiple standby replicas point to the same member 
for a task.
   ```suggestion
                       localState.standbyTaskToPrevMember.putIfAbsent(taskId, 
new HashSet<>());
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -156,143 +166,207 @@ private GroupAssignment buildGroupAssignment(final 
Set<String> members) {
     }
 
     private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> 
taskIds) {
-        Map<String, Set<Integer>> ret = new HashMap<>();
-        for (TaskId taskId : taskIds) {
+        final Map<String, Set<Integer>> ret = new HashMap<>();
+        for (final TaskId taskId : taskIds) {
             ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>());
             ret.get(taskId.subtopologyId()).add(taskId.partition());
         }
         return ret;
     }
 
-    private void assignActive(final Set<TaskId> activeTasks) {
+    private void assignActive(final Deque<TaskId> activeTasks) {
 
         // 1. re-assigning existing active tasks to clients that previously 
had the same active tasks
-        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+        for (final Iterator<TaskId> it = activeTasks.iterator(); 
it.hasNext();) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
-            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
-                it.remove();
+            if (prevMember != null) {
+                final ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+                    processState.addTask(prevMember.memberId, task, true);
+                    
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+                    it.remove();
+                }
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
-        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+        for (final Iterator<TaskId> it = activeTasks.iterator(); 
it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
-            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
-                it.remove();
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
+            if (prevMember != null) {
+                final ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+                    processState.addTask(prevMember.memberId, task, true);
+                    
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+                    it.remove();
+                }
             }
         }
 
         // 3. assign any remaining unassigned tasks
-        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+        final PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());
+        for (final Iterator<TaskId> it = activeTasks.iterator(); 
it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> allMembers = 
localState.processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
-                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
-            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            final ProcessState processWithLeastLoad = processByLoad.poll();
+            if (processWithLeastLoad == null) {
+                throw new TaskAssignorException("No process available to 
assign active task {}." + task);
+            }
+            final String member = memberWithLeastLoad(processWithLeastLoad);
             if (member == null) {
-                log.error("Unable to assign active task {} to any member.", 
task);
                 throw new TaskAssignorException("No member available to assign 
active task {}." + task);
             }
-            
localState.processIdToState.get(member.processId).addTask(member.memberId, 
task, true);
+            processWithLeastLoad.addTask(member, task, true);
             it.remove();
-            updateHelpers(member, true);
+            
maybeUpdateActiveTasksPerMember(processWithLeastLoad.memberToTaskCounts().get(member));
+            processByLoad.add(processWithLeastLoad); // Add it back to the 
queue after updating its state
+        }
+    }
+
+    private void maybeUpdateActiveTasksPerMember(final int activeTasksNo) {
+        if (activeTasksNo == localState.activeTasksPerMember) {
+            localState.totalMembersWithActiveTaskCapacity--;
+            localState.totalActiveTasks -= activeTasksNo;
+            localState.activeTasksPerMember = 
computeTasksPerMember(localState.totalActiveTasks, 
localState.totalMembersWithActiveTaskCapacity);
+        }
+    }
 
+    private void maybeUpdateTasksPerMember(final int taskNo) {
+        if (taskNo == localState.tasksPerMember) {
+            localState.totalMembersWithTaskCapacity--;
+            localState.totalTasks -= taskNo;
+            localState.tasksPerMember = 
computeTasksPerMember(localState.totalTasks, 
localState.totalMembersWithTaskCapacity);
         }
     }
 
-    private void maybeUpdateTasksPerMember(final int activeTasksNo) {
-        if (activeTasksNo == localState.tasksPerMember) {
-            localState.totalCapacity--;
-            localState.allTasks -= activeTasksNo;
-            localState.tasksPerMember = 
computeTasksPerMember(localState.allTasks, localState.totalCapacity);
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        final ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            final String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);
         }
+        queue.add(processWithLeastLoad); // Add it back to the queue after 
updating its state
+        return found;
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    /**
+     * Finds the previous member with the least load for a given task.
+     *
+     * @param members The list of previous members owning the task.
+     * @param taskId  The taskId, to check if the previous member already has 
the task. Can be null, if we assign it
+     *                for the first time (e.g., during active task assignment).
+     *
+     * @return Previous member with the least load that deoes not have the 
task, or null if no such member exists.

Review Comment:
   There's a typo in the documentation comment: 'deoes' should be 'does'.
   ```suggestion
        * @return Previous member with the least load that does not have the 
task, or null if no such member exists.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to