bbejeck commented on code in PR #20172:
URL: https://github.com/apache/kafka/pull/20172#discussion_r2310750473


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));

Review Comment:
   nit: make `final`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);
+        }
+        queue.add(processWithLeastLoad); // Add it back to the queue after 
updating its state
+        return found;
+    }
+
+    /**
+     * Finds the previous member with the least load for a given task.
+     *
+     * @param members The list of previous members owning the task.
+     * @param taskId  The taskId, to check if the previous member already has 
the task. Can be null, if we assign it
+     *                for the first time (e.g., during active task assignment).
+     *
+     * @return Previous member with the least load that deoes not have the 
task, or null if no such member exists.
+     */
+    private Member findPrevMemberWithLeastLoad(final ArrayList<Member> 
members, final TaskId taskId) {
         if (members == null || members.isEmpty()) {
             return null;
         }
-        Optional<ProcessState> processWithLeastLoad = members.stream()
-            .map(member  -> localState.processIdToState.get(member.processId))
-            .min(Comparator.comparingDouble(ProcessState::load));
-
-        // if the same exact former member is needed
-        if (returnSameMember) {
-            return localState.standbyTaskToPrevMember.get(taskId).stream()
-                .filter(standby -> 
standby.processId.equals(processWithLeastLoad.get().processId()))
-                .findFirst()
-                .orElseGet(() -> 
memberWithLeastLoad(processWithLeastLoad.get()));
+
+        Member candidate = members.get(0);

Review Comment:
   nit: `final` on all of these



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);

Review Comment:
   Overall I agree with the changes here, but I'd like to add a test or two 
looking for edge conditions i.e. large number of tasks and odd number of 
streams clients etc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> allMembers = 
localState.processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
-                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
-            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            ProcessState processWithLeastLoad = processByLoad.poll();

Review Comment:
   nit: `final`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {

Review Comment:
   nit: make parameters `final`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> allMembers = 
localState.processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
-                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
-            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            ProcessState processWithLeastLoad = processByLoad.poll();
+            if (processWithLeastLoad == null) {
+                throw new TaskAssignorException("No process available to 
assign active task {}." + task);
+            }
+            String member = memberWithLeastLoad(processWithLeastLoad);

Review Comment:
   same here



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();

Review Comment:
   here too



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);
+        }
+        queue.add(processWithLeastLoad); // Add it back to the queue after 
updating its state
+        return found;
+    }
+
+    /**
+     * Finds the previous member with the least load for a given task.
+     *
+     * @param members The list of previous members owning the task.
+     * @param taskId  The taskId, to check if the previous member already has 
the task. Can be null, if we assign it
+     *                for the first time (e.g., during active task assignment).
+     *
+     * @return Previous member with the least load that deoes not have the 
task, or null if no such member exists.
+     */
+    private Member findPrevMemberWithLeastLoad(final ArrayList<Member> 
members, final TaskId taskId) {
         if (members == null || members.isEmpty()) {
             return null;
         }
-        Optional<ProcessState> processWithLeastLoad = members.stream()
-            .map(member  -> localState.processIdToState.get(member.processId))
-            .min(Comparator.comparingDouble(ProcessState::load));
-
-        // if the same exact former member is needed
-        if (returnSameMember) {
-            return localState.standbyTaskToPrevMember.get(taskId).stream()
-                .filter(standby -> 
standby.processId.equals(processWithLeastLoad.get().processId()))
-                .findFirst()
-                .orElseGet(() -> 
memberWithLeastLoad(processWithLeastLoad.get()));
+
+        Member candidate = members.get(0);
+        ProcessState candidateProcessState = 
localState.processIdToState.get(candidate.processId);
+        double candidateProcessLoad = candidateProcessState.load();
+        double candidateMemberLoad = 
candidateProcessState.memberToTaskCounts().get(candidate.memberId);
+        for (int i = 1; i < members.size(); i++) {
+            Member member = members.get(i);
+            ProcessState processState = 
localState.processIdToState.get(member.processId);
+            double newProcessLoad = processState.load();
+            if (newProcessLoad < candidateProcessLoad && (taskId == null || 
!processState.hasTask(taskId))) {
+                double newMemberLoad = 
processState.memberToTaskCounts().get(member.memberId);
+                if (newMemberLoad < candidateMemberLoad) {
+                    candidateProcessLoad = newProcessLoad;
+                    candidateMemberLoad = newMemberLoad;
+                    candidate = member;
+                }
+            }
         }
-        return memberWithLeastLoad(processWithLeastLoad.get());
+
+        if (taskId == null || !candidateProcessState.hasTask(taskId)) {
+            return candidate;
+        }
+        return null;
     }
 
-    private Member memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+    private String memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+        Map<String, Integer> members = 
processWithLeastLoad.memberToTaskCounts();
+        if (members.isEmpty()) {
+            return null;
+        }
+        if (members.size() == 1) {
+            return members.keySet().iterator().next();
+        }
         Optional<String> memberWithLeastLoad = 
processWithLeastLoad.memberToTaskCounts().entrySet().stream()
             .min(Map.Entry.comparingByValue())
             .map(Map.Entry::getKey);
-        return memberWithLeastLoad.map(memberId -> new 
Member(processWithLeastLoad.processId(), memberId)).orElse(null);
+        return memberWithLeastLoad.orElse(null);
     }
 
     private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
     private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+        ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
         for (TaskId task : standbyTasks) {
             for (int i = 0; i < numStandbyReplicas; i++) {
 
-                final Set<String> availableProcesses = 
localState.processIdToState.values().stream()
-                    .filter(process -> !process.hasTask(task))
-                    .map(ProcessState::processId)
-                    .collect(Collectors.toSet());
-
-                if (availableProcesses.isEmpty()) {
-                    log.warn("{} There is not enough available capacity. " +
-                            "You should increase the number of threads and/or 
application instances to maintain the requested number of standby replicas.",
-                        errorMessage(numStandbyReplicas, i, task));
-                    break;
-                }
-                Member standby = null;
-
                 // prev active task
                 Member prevMember = 
localState.activeTaskToPrevMember.get(task);
-                if (prevMember != null && 
availableProcesses.contains(prevMember.processId) && 
isLoadBalanced(prevMember.processId)) {
-                    standby = prevMember;
+                if (prevMember != null) {
+                    ProcessState prevMemberProcessState = 
localState.processIdToState.get(prevMember.processId);

Review Comment:
   `final` here too



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);
+        }
+        queue.add(processWithLeastLoad); // Add it back to the queue after 
updating its state
+        return found;
+    }
+
+    /**
+     * Finds the previous member with the least load for a given task.
+     *
+     * @param members The list of previous members owning the task.
+     * @param taskId  The taskId, to check if the previous member already has 
the task. Can be null, if we assign it
+     *                for the first time (e.g., during active task assignment).
+     *
+     * @return Previous member with the least load that deoes not have the 
task, or null if no such member exists.
+     */
+    private Member findPrevMemberWithLeastLoad(final ArrayList<Member> 
members, final TaskId taskId) {
         if (members == null || members.isEmpty()) {
             return null;
         }
-        Optional<ProcessState> processWithLeastLoad = members.stream()
-            .map(member  -> localState.processIdToState.get(member.processId))
-            .min(Comparator.comparingDouble(ProcessState::load));
-
-        // if the same exact former member is needed
-        if (returnSameMember) {
-            return localState.standbyTaskToPrevMember.get(taskId).stream()
-                .filter(standby -> 
standby.processId.equals(processWithLeastLoad.get().processId()))
-                .findFirst()
-                .orElseGet(() -> 
memberWithLeastLoad(processWithLeastLoad.get()));
+
+        Member candidate = members.get(0);
+        ProcessState candidateProcessState = 
localState.processIdToState.get(candidate.processId);
+        double candidateProcessLoad = candidateProcessState.load();
+        double candidateMemberLoad = 
candidateProcessState.memberToTaskCounts().get(candidate.memberId);
+        for (int i = 1; i < members.size(); i++) {
+            Member member = members.get(i);
+            ProcessState processState = 
localState.processIdToState.get(member.processId);
+            double newProcessLoad = processState.load();
+            if (newProcessLoad < candidateProcessLoad && (taskId == null || 
!processState.hasTask(taskId))) {
+                double newMemberLoad = 
processState.memberToTaskCounts().get(member.memberId);
+                if (newMemberLoad < candidateMemberLoad) {
+                    candidateProcessLoad = newProcessLoad;
+                    candidateMemberLoad = newMemberLoad;
+                    candidate = member;
+                }
+            }
         }
-        return memberWithLeastLoad(processWithLeastLoad.get());
+
+        if (taskId == null || !candidateProcessState.hasTask(taskId)) {
+            return candidate;
+        }
+        return null;
     }
 
-    private Member memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+    private String memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+        Map<String, Integer> members = 
processWithLeastLoad.memberToTaskCounts();
+        if (members.isEmpty()) {
+            return null;
+        }
+        if (members.size() == 1) {
+            return members.keySet().iterator().next();
+        }
         Optional<String> memberWithLeastLoad = 
processWithLeastLoad.memberToTaskCounts().entrySet().stream()
             .min(Map.Entry.comparingByValue())
             .map(Map.Entry::getKey);
-        return memberWithLeastLoad.map(memberId -> new 
Member(processWithLeastLoad.processId(), memberId)).orElse(null);
+        return memberWithLeastLoad.orElse(null);
     }
 
     private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
     private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+        ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
         for (TaskId task : standbyTasks) {
             for (int i = 0; i < numStandbyReplicas; i++) {
 
-                final Set<String> availableProcesses = 
localState.processIdToState.values().stream()
-                    .filter(process -> !process.hasTask(task))
-                    .map(ProcessState::processId)
-                    .collect(Collectors.toSet());
-
-                if (availableProcesses.isEmpty()) {
-                    log.warn("{} There is not enough available capacity. " +
-                            "You should increase the number of threads and/or 
application instances to maintain the requested number of standby replicas.",
-                        errorMessage(numStandbyReplicas, i, task));
-                    break;
-                }
-                Member standby = null;
-
                 // prev active task
                 Member prevMember = 
localState.activeTaskToPrevMember.get(task);
-                if (prevMember != null && 
availableProcesses.contains(prevMember.processId) && 
isLoadBalanced(prevMember.processId)) {
-                    standby = prevMember;
+                if (prevMember != null) {
+                    ProcessState prevMemberProcessState = 
localState.processIdToState.get(prevMember.processId);
+                    if (!prevMemberProcessState.hasTask(task) && 
isLoadBalanced(prevMemberProcessState)) {
+                        prevMemberProcessState.addTask(prevMember.memberId, 
task, false);
+                        continue;
+                    }
                 }
 
                 // prev standby tasks
-                if (standby == null) {
-                    final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-                    if (prevMembers != null && !prevMembers.isEmpty()) {
-                        prevMembers.removeIf(member  -> 
!availableProcesses.contains(member.processId));
-                        prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
-                        if (prevMember != null && 
isLoadBalanced(prevMember.processId)) {
-                            standby = prevMember;
+                final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+                if (prevMembers != null && !prevMembers.isEmpty()) {
+                    prevMember = findPrevMemberWithLeastLoad(prevMembers, 
task);
+                    if (prevMember != null) {
+                        ProcessState prevMemberProcessState = 
localState.processIdToState.get(prevMember.processId);
+                        if (isLoadBalanced(prevMemberProcessState)) {
+                            
prevMemberProcessState.addTask(prevMember.memberId, task, false);
+                            continue;
                         }
                     }
                 }
 
-                // others
-                if (standby == null) {
-                    final Set<Member> availableMembers = 
availableProcesses.stream()
-                        .flatMap(pId -> 
localState.processIdToState.get(pId).memberToTaskCounts().keySet().stream()
-                            .map(mId -> new Member(pId, 
mId))).collect(Collectors.toSet());
-                    standby = findMemberWithLeastLoad(availableMembers, task, 
false);
-                    if (standby == null) {
-                        log.warn("{} Error in standby task assignment!", 
errorMessage(numStandbyReplicas, i, task));
-                        break;
-                    }
-                }
-                
localState.processIdToState.get(standby.processId).addTask(standby.memberId, 
task, false);
-                updateHelpers(standby, false);
+                toLeastLoaded.add(new StandbyToAssign(task, numStandbyReplicas 
- i));
+                break;
             }
+        }
 
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));

Review Comment:
   a couple of `final` here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to