lucasbru commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2324753920
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -166,7 +166,10 @@ private Map<String, Set<Integer>> toCompactedTaskIds(final
Set<TaskId> taskIds)
return ret;
}
- private void assignActive(final Set<TaskId> activeTasks) {
+ private void assignActive(final LinkedList<TaskId> activeTasks) {
+
+ // Assuming our current assignment pairs same partitions
(range-based), we want to sort by partition first
+
activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
Review Comment:
I added the sorting here. The old assignor did not do the sorting
explicitly, but randomly ran into the "good case".
The point is this:
Normally, we want to assign an active task like a range-assignor, when we
have two subtopologies with two partitions and two clients, we will assign
Client1: 0_0, 1_0
Client2: 0_1, 1_1
The reason being, heuristically, if we'd have the assignment
Client1: 0_0, 0_1
Client2: 1_0, 1_1
and the first subtopology has large state and the second subtopology has
small state, then one client gets most of the state.
The sorting here helps to also achieve this kind of range assignment when
scaling up. Assume we have now all tasks assigned to the first member:
Client1: 0_0, 0_1, 1_0, 1_1
Client2: -
Now, we will first assign the previous tasks, we want to start with all 0
partitions, before doing all 1 partitions, until Client1 fills up:
Client1: 0_0, 1_0
Client2:
Then filling up client2 the usual way.
Client1: 0_0, 1_0
Client2: 1_0, 1_1
This is corner case, but seems like a useful improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]