bbejeck commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2330289507
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java:
##########
@@ -1263,6 +1263,119 @@ public void
shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy(f
}
}
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
+ public void
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final
String rackAwareStrategy) {
+ setUp(rackAwareStrategy);
+
+ // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+ // Node 2 has active tasks 2,3 and standby tasks 0,1
+ final ClientState node1 = createClientWithPreviousActiveTasks(PID_1,
1, TASK_0_0, TASK_0_1);
+ node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3));
+
+ final ClientState node2 = createClientWithPreviousActiveTasks(PID_2,
1, TASK_0_2, TASK_0_3);
+ node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1));
+
+ // Node 3 joins as new client
+ final ClientState node3 = createClient(PID_3, 1);
+
+ final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy,
TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
+ assertThat(probingRebalanceNeeded, is(false));
+
+ // Verify all active tasks are assigned
+ final Set<TaskId> allAssignedActiveTasks = new HashSet<>();
+ allAssignedActiveTasks.addAll(node1.activeTasks());
+ allAssignedActiveTasks.addAll(node2.activeTasks());
+ allAssignedActiveTasks.addAll(node3.activeTasks());
+ assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1,
TASK_0_2, TASK_0_3)));
+
+ // Verify all standby tasks are assigned
+ final Set<TaskId> allAssignedStandbyTasks = new HashSet<>();
+ allAssignedStandbyTasks.addAll(node1.standbyTasks());
+ allAssignedStandbyTasks.addAll(node2.standbyTasks());
+ allAssignedStandbyTasks.addAll(node3.standbyTasks());
+ assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1,
TASK_0_2, TASK_0_3)));
+
+ // Verify each client has 1-2 active tasks and at most 3 tasks total
+ assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1));
+ assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2));
+ assertThat(node1.activeTasks().size() + node1.standbyTasks().size(),
lessThanOrEqualTo(3));
+
+ assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1));
+ assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2));
+ assertThat(node2.activeTasks().size() + node2.standbyTasks().size(),
lessThanOrEqualTo(3));
+
+ assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
+ assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
+ assertThat(node3.activeTasks().size() + node3.standbyTasks().size(),
lessThanOrEqualTo(3));
Review Comment:
same as my comment above - this is covered by another test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]