lucasbru commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2340658751
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -94,8 +102,38 @@ public void addTask(final String memberId, final TaskId
taskId, final boolean is
assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
assignedStandbyTasks.get(memberId).add(taskId);
}
- memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+ int newTaskCount = memberToTaskCounts.get(memberId) + 1;
+ memberToTaskCounts.put(memberId, newTaskCount);
computeLoad();
+ return newTaskCount;
+ }
+
+ public int addTaskToLeastLoadedMember(final TaskId taskId, final boolean
isActive) {
+ if (memberToTaskCounts.isEmpty()) {
+ return -1;
+ }
+ if (memberToTaskCounts.size() == 1) {
+ return
addTaskInternal(memberToTaskCounts.keySet().iterator().next(), taskId,
isActive);
+ }
+ if (membersByLoad == null) {
+ membersByLoad = new PriorityQueue<>(
+ memberToTaskCounts.size(),
+ Map.Entry.comparingByValue()
+ );
+ for (Map.Entry<String, Integer> entry :
memberToTaskCounts.entrySet()) {
+ // Copy here, since map entry objects are allowed to be reused
by the underlying map implementation.
+ membersByLoad.add(new
AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));
Review Comment:
How is a custom wrapper class more efficient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]