bbejeck commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2325517754


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -193,6 +196,9 @@ private void assignActive(final Set<TaskId> activeTasks) {
             }
         }
 
+        // To achieve an initially range-based assignment, sort by subtopology
+        
activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));
+

Review Comment:
   So we do the second sort here by `subtopologyId` then `partitions` to get 
the range assignment to distribute optimizing for state across sub-topologies. 
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,13 @@ private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
-    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
         final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
-        for (final TaskId task : standbyTasks) {
+
+        // Assuming our current assignment is range-based, we want to sort by 
partition first.
+        
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());

Review Comment:
   > Using reverse order means, when I have new nodes, they will get the 
numerically last few active tasks, and the numerically first standby tasks,
   
   I was going to ask about this working with the existing HA assignor, but I 
don't think that it applies anymore for KIP-1071, correct?
   
   > and the numerically first standby tasks
   
   If I'm understanding your example correctly, previous ownership will take 
priority when assigning standbys?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java:
##########
@@ -1091,6 +1091,148 @@ public void 
shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() {
         assertEquals(numTasks, allStandbyTasks.size());
     }
 
+    @Test
+    public void 
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
+        // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+        // Node 2 has active tasks 2,3 and standby tasks 0,1
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
+            mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));
+
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));
+
+        // Node 3 joins as new client
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), 
mkEntry("member3", memberSpec3));
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+            new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
+        );
+
+        // Verify all active tasks are assigned
+        final Set<Integer> allAssignedActiveTasks = new HashSet<>();
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
+        assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);
+
+        // Verify all standby tasks are assigned
+        final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member1"));
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member2"));
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member3"));
+        assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);
+
+        // Verify each member has 1-2 active tasks and at most 3 tasks total
+        assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && 
getAllActiveTaskIds(result, "member1").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member1").size() + 
getAllStandbyTaskIds(result, "member1").size() <= 3);
+
+        assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && 
getAllActiveTaskIds(result, "member2").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member2").size() + 
getAllStandbyTaskIds(result, "member2").size() <= 3);
+
+        assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && 
getAllActiveTaskIds(result, "member3").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member3").size() + 
getAllStandbyTaskIds(result, "member3").size() <= 3);

Review Comment:
   Should we also assert that the distribution of task ownership in addition to 
the owned count?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java:
##########
@@ -1263,6 +1263,119 @@ public void 
shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy(f
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+    })
+    public void 
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final
 String rackAwareStrategy) {
+        setUp(rackAwareStrategy);
+
+        // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+        // Node 2 has active tasks 2,3 and standby tasks 0,1
+        final ClientState node1 = createClientWithPreviousActiveTasks(PID_1, 
1, TASK_0_0, TASK_0_1);
+        node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3));
+
+        final ClientState node2 = createClientWithPreviousActiveTasks(PID_2, 
1, TASK_0_2, TASK_0_3);
+        node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1));
+
+        // Node 3 joins as new client
+        final ClientState node3 = createClient(PID_3, 1);
+
+        final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, 
TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
+        assertThat(probingRebalanceNeeded, is(false));
+
+        // Verify all active tasks are assigned
+        final Set<TaskId> allAssignedActiveTasks = new HashSet<>();
+        allAssignedActiveTasks.addAll(node1.activeTasks());
+        allAssignedActiveTasks.addAll(node2.activeTasks());
+        allAssignedActiveTasks.addAll(node3.activeTasks());
+        assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, 
TASK_0_2, TASK_0_3)));
+
+        // Verify all standby tasks are assigned
+        final Set<TaskId> allAssignedStandbyTasks = new HashSet<>();
+        allAssignedStandbyTasks.addAll(node1.standbyTasks());
+        allAssignedStandbyTasks.addAll(node2.standbyTasks());
+        allAssignedStandbyTasks.addAll(node3.standbyTasks());
+        assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, 
TASK_0_2, TASK_0_3)));
+
+        // Verify each client has 1-2 active tasks and at most 3 tasks total
+        assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1));
+        assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2));
+        assertThat(node1.activeTasks().size() + node1.standbyTasks().size(), 
lessThanOrEqualTo(3));
+
+        assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1));
+        assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2));
+        assertThat(node2.activeTasks().size() + node2.standbyTasks().size(), 
lessThanOrEqualTo(3));
+
+        assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
+        assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
+        assertThat(node3.activeTasks().size() + node3.standbyTasks().size(), 
lessThanOrEqualTo(3));

Review Comment:
   same question about membership vs. task count - but I'm not sure if that 
applies in this case



##########
tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java:
##########
@@ -180,8 +180,8 @@ public void 
testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exceptio
     public void testDescribeStreamsGroupWithMembersOption() throws Exception {
         final List<String> expectedHeader = List.of("GROUP", "MEMBER", 
"PROCESS", "CLIENT-ID", "ASSIGNMENTS");
         final Set<List<String>> expectedRows = Set.of(
-            List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0,1];"),
-            List.of(APP_ID, "", "", "", "ACTIVE:", "1:[0,1];"));
+            List.of(APP_ID, "", "", "", "ACTIVE:", "0:[1];", "1:[1];"),
+            List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0];", "1:[0];"));

Review Comment:
   this is confirming the `subtopology_partition` task ids right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to