Copilot commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2340625415
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -94,8 +102,38 @@ public void addTask(final String memberId, final TaskId
taskId, final boolean is
assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
assignedStandbyTasks.get(memberId).add(taskId);
}
- memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+ int newTaskCount = memberToTaskCounts.get(memberId) + 1;
+ memberToTaskCounts.put(memberId, newTaskCount);
computeLoad();
+ return newTaskCount;
+ }
+
+ public int addTaskToLeastLoadedMember(final TaskId taskId, final boolean
isActive) {
+ if (memberToTaskCounts.isEmpty()) {
+ return -1;
+ }
+ if (memberToTaskCounts.size() == 1) {
+ return
addTaskInternal(memberToTaskCounts.keySet().iterator().next(), taskId,
isActive);
+ }
+ if (membersByLoad == null) {
+ membersByLoad = new PriorityQueue<>(
+ memberToTaskCounts.size(),
+ Map.Entry.comparingByValue()
+ );
+ for (Map.Entry<String, Integer> entry :
memberToTaskCounts.entrySet()) {
+ // Copy here, since map entry objects are allowed to be reused
by the underlying map implementation.
+ membersByLoad.add(new
AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));
Review Comment:
Creating new AbstractMap.SimpleEntry objects for each entry is inefficient.
Consider using a custom wrapper class or storing member IDs directly with their
task counts in a separate data structure.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -94,8 +102,38 @@ public void addTask(final String memberId, final TaskId
taskId, final boolean is
assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
assignedStandbyTasks.get(memberId).add(taskId);
}
- memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+ int newTaskCount = memberToTaskCounts.get(memberId) + 1;
+ memberToTaskCounts.put(memberId, newTaskCount);
computeLoad();
+ return newTaskCount;
+ }
+
+ public int addTaskToLeastLoadedMember(final TaskId taskId, final boolean
isActive) {
+ if (memberToTaskCounts.isEmpty()) {
+ return -1;
+ }
+ if (memberToTaskCounts.size() == 1) {
+ return
addTaskInternal(memberToTaskCounts.keySet().iterator().next(), taskId,
isActive);
+ }
+ if (membersByLoad == null) {
+ membersByLoad = new PriorityQueue<>(
+ memberToTaskCounts.size(),
+ Map.Entry.comparingByValue()
+ );
+ for (Map.Entry<String, Integer> entry :
memberToTaskCounts.entrySet()) {
+ // Copy here, since map entry objects are allowed to be reused
by the underlying map implementation.
+ membersByLoad.add(new
AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));
+ }
+ }
+ Map.Entry<String, Integer> member = membersByLoad.poll();
+ if (member != null) {
+ int newTaskCount = addTaskInternal(member.getKey(), taskId,
isActive);
+ member.setValue(newTaskCount);
+ membersByLoad.add(member); // Reinsert the updated member back
into the priority queue
+ return newTaskCount;
+ } else {
+ throw new TaskAssignorException("No members available to assign
task " + taskId);
+ }
Review Comment:
This exception will never be thrown because the method already checks for
empty memberToTaskCounts at the beginning and returns -1. If memberToTaskCounts
is not empty, poll() should never return null.
```suggestion
int newTaskCount = addTaskInternal(member.getKey(), taskId,
isActive);
member.setValue(newTaskCount);
membersByLoad.add(member); // Reinsert the updated member back into
the priority queue
return newTaskCount;
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -253,10 +250,10 @@ private boolean
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
}
boolean found = false;
if (!processWithLeastLoad.hasTask(taskId)) {
- final String memberId = memberWithLeastLoad(processWithLeastLoad);
- if (memberId != null) {
- processWithLeastLoad.addTask(memberId, taskId, false);
+ final int newTaskCount =
processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
+ if (newTaskCount != -1) {
found = true;
+ maybeUpdateTasksPerMember(newTaskCount);
Review Comment:
The maybeUpdateTasksPerMember call is missing in the original addTask flow
but present here. This creates inconsistent behavior between addTask and
addTaskToLeastLoadedMember paths for standby task assignment.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>>
assignedStandbyTasksByMember() {
}
public void addTask(final String memberId, final TaskId taskId, final
boolean isActive) {
+ addTaskInternal(memberId, taskId, isActive);
+ membersByLoad = null; // reset, since it may not be sorted anymore
Review Comment:
Setting membersByLoad to null on every addTask call negates the heap
optimization benefits. The heap should be maintained incrementally rather than
rebuilt each time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]