Copilot commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2324686649
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
Review Comment:
The import of LinkedList is added but there's no corresponding documentation
explaining why LinkedList is preferred over the original Set<TaskId> for
maintaining insertion order in range-style assignments.
```suggestion
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,14 @@ private boolean hasUnfulfilledQuota(final Member member) {
return
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
< localState.tasksPerMember;
}
- private void assignStandby(final Set<TaskId> standbyTasks, final int
numStandbyReplicas) {
+ private void assignStandby(final LinkedList<TaskId> standbyTasks, int
numStandbyReplicas) {
final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
- for (final TaskId task : standbyTasks) {
+
+ // Assuming our current assignment is range-based, we want to sort by
partition first.
+
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
+
+ for (Iterator<TaskId> it = standbyTasks.descendingIterator();
it.hasNext(); ) {
Review Comment:
The logic sorts standbyTasks in reverse order and then iterates using
descendingIterator(), which effectively processes tasks in the original order.
This double reversal is confusing and could be simplified by removing both the
.reversed() and using a regular iterator.
```suggestion
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
for (Iterator<TaskId> it = standbyTasks.iterator(); it.hasNext(); ) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -329,6 +340,10 @@ private void assignStandby(final Set<TaskId> standbyTasks,
final int numStandbyR
}
}
+ // To achieve a range-based assignment, sort by subtopology
+ toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x ->
x.taskId.subtopologyId())
+ .thenComparing(x -> x.taskId.partition()).reversed());
Review Comment:
The sorting logic for standby assignments uses .reversed() which contradicts
the goal of range-style assignment. Range-style assignment should process
subtopologies and partitions in ascending order, not descending order.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,14 @@ private boolean hasUnfulfilledQuota(final Member member) {
return
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
< localState.tasksPerMember;
}
- private void assignStandby(final Set<TaskId> standbyTasks, final int
numStandbyReplicas) {
+ private void assignStandby(final LinkedList<TaskId> standbyTasks, int
numStandbyReplicas) {
final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
- for (final TaskId task : standbyTasks) {
+
+ // Assuming our current assignment is range-based, we want to sort by
partition first.
+
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
+
+ for (Iterator<TaskId> it = standbyTasks.descendingIterator();
it.hasNext(); ) {
Review Comment:
The logic sorts standbyTasks in reverse order and then iterates using
descendingIterator(), which effectively processes tasks in the original order.
This double reversal is confusing and could be simplified by removing both the
.reversed() and using a regular iterator.
```suggestion
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
for (Iterator<TaskId> it = standbyTasks.iterator(); it.hasNext(); ) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]