Copilot commented on code in PR #20458:
URL: https://github.com/apache/kafka/pull/20458#discussion_r2332567380
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -84,13 +80,24 @@ private LinkedList<TaskId> taskIds(final TopologyDescriber
topologyDescriber, fi
private void initialize(final GroupSpec groupSpec, final TopologyDescriber
topologyDescriber) {
localState = new LocalState();
- localState.allTasks = 0;
+ localState.numStandbyReplicas =
+ groupSpec.assignmentConfigs().isEmpty() ? 0
+ :
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+
+ // Helpers for computing active tasks per member, and tasks per member
+ localState.totalActiveTasks = 0;
+ localState.totalTasks = 0;
for (final String subtopology : topologyDescriber.subtopologies()) {
final int numberOfPartitions =
topologyDescriber.maxNumInputPartitions(subtopology);
- localState.allTasks += numberOfPartitions;
+ localState.totalTasks += numberOfPartitions;
+ localState.totalActiveTasks += numberOfPartitions;
+ if (topologyDescriber.isStateful(subtopology))
+ localState.totalTasks += numberOfPartitions *
localState.numStandbyReplicas;
Review Comment:
Missing braces around the if statement body. According to Java coding
standards, single-line if statements should include braces for clarity and to
prevent errors during future modifications.
```suggestion
if (topologyDescriber.isStateful(subtopology)) {
localState.totalTasks += numberOfPartitions *
localState.numStandbyReplicas;
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -298,43 +317,49 @@ private String memberWithLeastLoad(final ProcessState
processWithLeastLoad) {
return memberWithLeastLoad.orElse(null);
}
- private boolean hasUnfulfilledQuota(final Member member) {
- return
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
< localState.tasksPerMember;
+ private boolean hasUnfulfilledActiveTaskQuota(final ProcessState process,
final Member member) {
+ return process.memberToTaskCounts().get(member.memberId) <
localState.activeTasksPerMember;
}
- private void assignStandby(final LinkedList<TaskId> standbyTasks, int
numStandbyReplicas) {
- final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
+ private boolean hasUnfulfilledTaskQuota(final ProcessState process, final
Member member) {
+ return process.memberToTaskCounts().get(member.memberId) <
localState.tasksPerMember;
+ }
+ private void assignStandby(final LinkedList<TaskId> standbyTasks) {
+ final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * localState.numStandbyReplicas);
+
Review Comment:
Trailing whitespace should be removed from line 330.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]