bbejeck commented on code in PR #20523:
URL: https://github.com/apache/kafka/pull/20523#discussion_r2344838718
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -253,10 +250,10 @@ private boolean
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
}
boolean found = false;
if (!processWithLeastLoad.hasTask(taskId)) {
- final String memberId = memberWithLeastLoad(processWithLeastLoad);
- if (memberId != null) {
- processWithLeastLoad.addTask(memberId, taskId, false);
+ final int newTaskCount =
processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
+ if (newTaskCount != -1) {
found = true;
+ maybeUpdateTasksPerMember(newTaskCount);
Review Comment:
I'm not sure if I follow completely - does Copilot have a point? I'm
suspecting this isn't an issue.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>>
assignedStandbyTasksByMember() {
}
public void addTask(final String memberId, final TaskId taskId, final
boolean isActive) {
+ addTaskInternal(memberId, taskId, isActive);
Review Comment:
why `addTaskInternal` when the signature is the same?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -99,9 +98,9 @@ private void initialize(final GroupSpec groupSpec, final
TopologyDescriber topol
localState.activeTasksPerMember =
computeTasksPerMember(localState.totalActiveTasks,
localState.totalMembersWithActiveTaskCapacity);
localState.tasksPerMember =
computeTasksPerMember(localState.totalTasks,
localState.totalMembersWithTaskCapacity);
- localState.processIdToState = new HashMap<>();
- localState.activeTaskToPrevMember = new HashMap<>();
- localState.standbyTaskToPrevMember = new HashMap<>();
+ localState.processIdToState = new
HashMap<>(localState.totalMembersWithActiveTaskCapacity);
+ localState.activeTaskToPrevMember = new
HashMap<>(localState.totalActiveTasks);
+ localState.standbyTaskToPrevMember = new
HashMap<>(localState.numStandbyReplicas > 0 ? (localState.totalTasks -
localState.totalActiveTasks) / localState.numStandbyReplicas : 0);
Review Comment:
If `localState.numStandbyReplicas==0` do we need this Map, or could this
represent the state before any standbys are assigned?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -85,6 +88,11 @@ public Map<String, Set<TaskId>>
assignedStandbyTasksByMember() {
}
public void addTask(final String memberId, final TaskId taskId, final
boolean isActive) {
+ addTaskInternal(memberId, taskId, isActive);
+ membersByLoad = null; // reset, since it may not be sorted anymore
Review Comment:
Good comment - this is the first thing I thought of when I saw
`membersByLoad = null;`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]