lucasbru commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2340645556
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -213,19 +212,17 @@ private void assignActive(final LinkedList<TaskId>
activeTasks) {
// 3. assign any remaining unassigned tasks
final PriorityQueue<ProcessState> processByLoad = new
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
processByLoad.addAll(localState.processIdToState.values());
- for (final Iterator<TaskId> it = activeTasks.iterator();
it.hasNext();) {
- final TaskId task = it.next();
+ for (final TaskId task: activeTasks) {
final ProcessState processWithLeastLoad = processByLoad.poll();
if (processWithLeastLoad == null) {
throw new TaskAssignorException(String.format("No process
available to assign active task %s.", task));
}
- final String member = memberWithLeastLoad(processWithLeastLoad);
- if (member == null) {
+ final int newTaskCount =
processWithLeastLoad.addTaskToLeastLoadedMember(task, true);
+ if (newTaskCount != -1) {
+ maybeUpdateActiveTasksPerMember(newTaskCount);
+ } else {
throw new TaskAssignorException(String.format("No member
available to assign active task %s.", task));
}
- processWithLeastLoad.addTask(member, task, true);
- it.remove();
Review Comment:
I removed the iterator here, since we don't really need to remove from
`activeTasks` - we will not use it afterwards. So a foreach loop is simpler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]