Copilot commented on code in PR #20458:
URL: https://github.com/apache/kafka/pull/20458#discussion_r2318899271
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -103,19 +113,19 @@ private void initialize(final GroupSpec groupSpec, final
TopologyDescriber topol
localState.processIdToState.get(processId).addMember(memberId);
// prev active tasks
- for (Map.Entry<String, Set<Integer>> entry :
memberSpec.activeTasks().entrySet()) {
- Set<Integer> partitionNoSet = entry.getValue();
- for (int partitionNo : partitionNoSet) {
+ for (final Map.Entry<String, Set<Integer>> entry :
memberSpec.activeTasks().entrySet()) {
+ final Set<Integer> partitionNoSet = entry.getValue();
+ for (final int partitionNo : partitionNoSet) {
localState.activeTaskToPrevMember.put(new
TaskId(entry.getKey(), partitionNo), member);
}
}
// prev standby tasks
- for (Map.Entry<String, Set<Integer>> entry :
memberSpec.standbyTasks().entrySet()) {
- Set<Integer> partitionNoSet = entry.getValue();
- for (int partitionNo : partitionNoSet) {
- TaskId taskId = new TaskId(entry.getKey(), partitionNo);
- localState.standbyTaskToPrevMember.putIfAbsent(taskId, new
HashSet<>());
+ for (final Map.Entry<String, Set<Integer>> entry :
memberSpec.standbyTasks().entrySet()) {
+ final Set<Integer> partitionNoSet = entry.getValue();
+ for (final int partitionNo : partitionNoSet) {
+ final TaskId taskId = new TaskId(entry.getKey(),
partitionNo);
+ localState.standbyTaskToPrevMember.putIfAbsent(taskId, new
ArrayList<>());
Review Comment:
The change from `Set<Member>` to `ArrayList<Member>` for
`standbyTaskToPrevMember` allows duplicate members to be added. This could lead
to incorrect behavior when multiple standby replicas point to the same member
for a task.
```suggestion
localState.standbyTaskToPrevMember.putIfAbsent(taskId,
new HashSet<>());
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -156,143 +166,207 @@ private GroupAssignment buildGroupAssignment(final
Set<String> members) {
}
private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId>
taskIds) {
- Map<String, Set<Integer>> ret = new HashMap<>();
- for (TaskId taskId : taskIds) {
+ final Map<String, Set<Integer>> ret = new HashMap<>();
+ for (final TaskId taskId : taskIds) {
ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>());
ret.get(taskId.subtopologyId()).add(taskId.partition());
}
return ret;
}
- private void assignActive(final Set<TaskId> activeTasks) {
+ private void assignActive(final Deque<TaskId> activeTasks) {
// 1. re-assigning existing active tasks to clients that previously
had the same active tasks
- for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+ for (final Iterator<TaskId> it = activeTasks.iterator();
it.hasNext();) {
final TaskId task = it.next();
final Member prevMember =
localState.activeTaskToPrevMember.get(task);
- if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
task, true);
- updateHelpers(prevMember, true);
- it.remove();
+ if (prevMember != null) {
+ final ProcessState processState =
localState.processIdToState.get(prevMember.processId);
+ if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+ processState.addTask(prevMember.memberId, task, true);
+
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+ it.remove();
+ }
}
}
// 2. re-assigning tasks to clients that previously have seen the same
task (as standby task)
- for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+ for (final Iterator<TaskId> it = activeTasks.iterator();
it.hasNext();) {
final TaskId task = it.next();
- final Set<Member> prevMembers =
localState.standbyTaskToPrevMember.get(task);
- final Member prevMember = findMemberWithLeastLoad(prevMembers,
task, true);
- if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
task, true);
- updateHelpers(prevMember, true);
- it.remove();
+ final ArrayList<Member> prevMembers =
localState.standbyTaskToPrevMember.get(task);
+ final Member prevMember = findPrevMemberWithLeastLoad(prevMembers,
null);
+ if (prevMember != null) {
+ final ProcessState processState =
localState.processIdToState.get(prevMember.processId);
+ if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+ processState.addTask(prevMember.memberId, task, true);
+
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+ it.remove();
+ }
}
}
// 3. assign any remaining unassigned tasks
- for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+ final PriorityQueue<ProcessState> processByLoad = new
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+ processByLoad.addAll(localState.processIdToState.values());
+ for (final Iterator<TaskId> it = activeTasks.iterator();
it.hasNext();) {
final TaskId task = it.next();
- final Set<Member> allMembers =
localState.processIdToState.entrySet().stream().flatMap(entry ->
entry.getValue().memberToTaskCounts().keySet().stream()
- .map(memberId -> new Member(entry.getKey(),
memberId))).collect(Collectors.toSet());
- final Member member = findMemberWithLeastLoad(allMembers, task,
false);
+ final ProcessState processWithLeastLoad = processByLoad.poll();
+ if (processWithLeastLoad == null) {
+ throw new TaskAssignorException("No process available to
assign active task {}." + task);
+ }
+ final String member = memberWithLeastLoad(processWithLeastLoad);
if (member == null) {
- log.error("Unable to assign active task {} to any member.",
task);
throw new TaskAssignorException("No member available to assign
active task {}." + task);
}
-
localState.processIdToState.get(member.processId).addTask(member.memberId,
task, true);
+ processWithLeastLoad.addTask(member, task, true);
it.remove();
- updateHelpers(member, true);
+
maybeUpdateActiveTasksPerMember(processWithLeastLoad.memberToTaskCounts().get(member));
+ processByLoad.add(processWithLeastLoad); // Add it back to the
queue after updating its state
+ }
+ }
+
+ private void maybeUpdateActiveTasksPerMember(final int activeTasksNo) {
+ if (activeTasksNo == localState.activeTasksPerMember) {
+ localState.totalMembersWithActiveTaskCapacity--;
+ localState.totalActiveTasks -= activeTasksNo;
+ localState.activeTasksPerMember =
computeTasksPerMember(localState.totalActiveTasks,
localState.totalMembersWithActiveTaskCapacity);
+ }
+ }
+ private void maybeUpdateTasksPerMember(final int taskNo) {
+ if (taskNo == localState.tasksPerMember) {
+ localState.totalMembersWithTaskCapacity--;
+ localState.totalTasks -= taskNo;
+ localState.tasksPerMember =
computeTasksPerMember(localState.totalTasks,
localState.totalMembersWithTaskCapacity);
}
}
- private void maybeUpdateTasksPerMember(final int activeTasksNo) {
- if (activeTasksNo == localState.tasksPerMember) {
- localState.totalCapacity--;
- localState.allTasks -= activeTasksNo;
- localState.tasksPerMember =
computeTasksPerMember(localState.allTasks, localState.totalCapacity);
+ private boolean
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId
taskId) {
+ final ProcessState processWithLeastLoad = queue.poll();
+ if (processWithLeastLoad == null) {
+ return false;
+ }
+ boolean found = false;
+ if (!processWithLeastLoad.hasTask(taskId)) {
+ final String memberId = memberWithLeastLoad(processWithLeastLoad);
+ if (memberId != null) {
+ processWithLeastLoad.addTask(memberId, taskId, false);
+ found = true;
+ }
+ } else if (!queue.isEmpty()) {
+ found = assignStandbyToMemberWithLeastLoad(queue, taskId);
}
+ queue.add(processWithLeastLoad); // Add it back to the queue after
updating its state
+ return found;
}
- private Member findMemberWithLeastLoad(final Set<Member> members, TaskId
taskId, final boolean returnSameMember) {
+ /**
+ * Finds the previous member with the least load for a given task.
+ *
+ * @param members The list of previous members owning the task.
+ * @param taskId The taskId, to check if the previous member already has
the task. Can be null, if we assign it
+ * for the first time (e.g., during active task assignment).
+ *
+ * @return Previous member with the least load that deoes not have the
task, or null if no such member exists.
Review Comment:
There's a typo in the documentation comment: 'deoes' should be 'does'.
```suggestion
* @return Previous member with the least load that does not have the
task, or null if no such member exists.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]