Copilot commented on code in PR #20458:
URL: https://github.com/apache/kafka/pull/20458#discussion_r2332567380


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -84,13 +80,24 @@ private LinkedList<TaskId> taskIds(final TopologyDescriber 
topologyDescriber, fi
 
     private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
         localState = new LocalState();
-        localState.allTasks = 0;
+        localState.numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+
+        // Helpers for computing active tasks per member, and tasks per member
+        localState.totalActiveTasks = 0;
+        localState.totalTasks = 0;
         for (final String subtopology : topologyDescriber.subtopologies()) {
             final int numberOfPartitions = 
topologyDescriber.maxNumInputPartitions(subtopology);
-            localState.allTasks += numberOfPartitions;
+            localState.totalTasks += numberOfPartitions;
+            localState.totalActiveTasks += numberOfPartitions;
+            if (topologyDescriber.isStateful(subtopology))
+                localState.totalTasks += numberOfPartitions * 
localState.numStandbyReplicas;

Review Comment:
   Missing braces around the if statement body. According to Java coding 
standards, single-line if statements should include braces for clarity and to 
prevent errors during future modifications.
   ```suggestion
               if (topologyDescriber.isStateful(subtopology)) {
                   localState.totalTasks += numberOfPartitions * 
localState.numStandbyReplicas;
               }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -298,43 +317,49 @@ private String memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
         return memberWithLeastLoad.orElse(null);
     }
 
-    private boolean hasUnfulfilledQuota(final Member member) {
-        return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
+    private boolean hasUnfulfilledActiveTaskQuota(final ProcessState process, 
final Member member) {
+        return process.memberToTaskCounts().get(member.memberId) < 
localState.activeTasksPerMember;
     }
 
-    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
-        final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
+    private boolean hasUnfulfilledTaskQuota(final ProcessState process, final 
Member member) {
+        return process.memberToTaskCounts().get(member.memberId) < 
localState.tasksPerMember;
+    }
 
+    private void assignStandby(final LinkedList<TaskId> standbyTasks) {
+        final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * localState.numStandbyReplicas);
+        

Review Comment:
   Trailing whitespace should be removed from line 330.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to