lucasbru commented on code in PR #20458:
URL: https://github.com/apache/kafka/pull/20458#discussion_r2315959967
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -185,11 +195,13 @@ private void assignActive(final Set<TaskId> activeTasks) {
final TaskId task = it.next();
final ArrayList<Member> prevMembers =
localState.standbyTaskToPrevMember.get(task);
final Member prevMember = findPrevMemberWithLeastLoad(prevMembers,
null);
- if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+ if (prevMember != null) {
final ProcessState processState =
localState.processIdToState.get(prevMember.processId);
- processState.addTask(prevMember.memberId, task, true);
- maybeUpdateTasksPerMember(processState.activeTaskCount());
- it.remove();
+ if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
Review Comment:
as a minor optimization, we passed `processState` into this function (to
avoid fetching it again inside), so the if construct changed here slightly.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -53,24 +55,19 @@ public GroupAssignment assign(final GroupSpec groupSpec,
final TopologyDescriber
}
private GroupAssignment doAssign(final GroupSpec groupSpec, final
TopologyDescriber topologyDescriber) {
- //active
- final Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+ final Deque<TaskId> activeTasks = taskIds(topologyDescriber, true);
assignActive(activeTasks);
- //standby
- final int numStandbyReplicas =
- groupSpec.assignmentConfigs().isEmpty() ? 0
- :
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
- if (numStandbyReplicas > 0) {
- final Set<TaskId> statefulTasks = taskIds(topologyDescriber,
false);
- assignStandby(statefulTasks, numStandbyReplicas);
+ if (localState.numStandbyReplicas > 0) {
+ final Deque<TaskId> statefulTasks = taskIds(topologyDescriber,
false);
+ assignStandby(statefulTasks);
}
return buildGroupAssignment(groupSpec.members().keySet());
}
- private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber,
final boolean isActive) {
- final Set<TaskId> ret = new HashSet<>();
+ private Deque<TaskId> taskIds(final TopologyDescriber topologyDescriber,
final boolean isActive) {
Review Comment:
We want to assign standby tasks in reverse, so I'm using a Deque here, which
provides a reverseIterator.
The reason why we want to traverse standby tasks in reverse is the example
that I added in the unit tests of both LegacyStickTaskAssignor and the new
StickyTaskAssignor.
Assume we have
Node 1: Active task 0,1, Standby task 2,3
Node 2: Active task 2,3, Standby task 0,1
Node 3: - (new)
Then we don't want to assign active tasks and standby tasks in the same
order.
Suppose we try to assign active tasks in increasing order, we will get:
Node 1: Active task 0,1
Node 2: Active task 2
Node 3: Active task 3
Since task 3 is the last task we will assign, and at that point, the quota
for active tasks is 1, so it can only be assigned to Node 3.
Suppose now we assign standby tasks in the same order, we will get this:
Node 1: Active task 0,1, Standby task 2, 3
Node 2: Active task 2, Standby task 0, 1
Node 3: Active task 3
The reason is that we first assign tasks 0,1,2, which all can be assigned to
the previous member that owned it. Finally, we want to assign standby task 3,
but it cannot be assigned to Node 3, so we have to assign it to Node 1 or Node
2. Using reverse order means, when I have new nodes, they will get the
numerically last few active tasks, and the numerically first standby tasks,
which should avoid this problem.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -84,13 +81,24 @@ private Set<TaskId> taskIds(final TopologyDescriber
topologyDescriber, final boo
private void initialize(final GroupSpec groupSpec, final TopologyDescriber
topologyDescriber) {
localState = new LocalState();
- localState.allTasks = 0;
Review Comment:
`allTasks` was previously just referring to active tasks.
I renamed it to `totalActiveTasks`, and also added `totalTasks` that counts
the number of tasks including standby tasks.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -58,12 +58,9 @@ private GroupAssignment doAssign(final GroupSpec groupSpec,
final TopologyDescri
assignActive(activeTasks);
//standby
- final int numStandbyReplicas =
Review Comment:
This variable just moved to "localState" to avoid passing it around.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]