rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619937439
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws
PartitionAssignorException {
}
}
- // The minimum required quota that each member needs to meet for a
balanced assignment.
- // This is the same for all members.
- final int numberOfMembers = groupSpec.members().size();
- final int minQuota = totalPartitionsCount / numberOfMembers;
+ // Compute the minimum required quota per member and the number of
members
+ // who should receive an extra partition.
+ int numberOfMembers = groupSpec.members().size();
+ minimumMemberQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount %
numberOfMembers;
- groupSpec.members().keySet().forEach(memberId ->
- targetAssignment.put(memberId, new MemberAssignment(new
HashMap<>())
- ));
-
- potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
- unassignedPartitionsRoundRobinAssignment();
+ // Revoke the partitions which are either not part of the
subscriptions or above
+ // the maximum quota.
+ maybeRevokePartitions();
- if (!unassignedPartitions.isEmpty()) {
- throw new PartitionAssignorException("Partitions were left
unassigned");
- }
+ // Assign the unassigned partitions to the members with space.
+ assignRemainingPartitions();
return new GroupAssignment(targetAssignment);
}
- /**
- * Retains a set of partitions from the existing assignment and includes
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and
subscriptions are considered.
- *
- * <p> For each member:
- * <ol>
- * <li> Find the valid current assignment considering topic
subscriptions and metadata</li>
- * <li> If the current assignment exists, retain partitions up to the
minimum quota.</li>
- * <li> If the current assignment size is greater than the minimum
quota and
- * there are members that could get an extra partition, assign
the next partition as well.</li>
- * <li> Finally, if the member's current assignment size is less than
the minimum quota,
- * add them to the potentially unfilled members map and track the
number of remaining
- * partitions required to meet the quota.</li>
- * </ol>
- * </p>
- *
- * @return Members mapped to the remaining number of partitions needed to
meet the minimum quota,
- * including members that are eligible to receive an extra
partition.
- */
- private Map<String, Integer> assignStickyPartitions(int minQuota) {
- Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
-
- groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
- List<TopicIdPartition> validCurrentMemberAssignment =
validCurrentMemberAssignment(
- assignmentMemberSpec.assignedPartitions()
- );
-
- int currentAssignmentSize = validCurrentMemberAssignment.size();
- // Number of partitions required to meet the minimum quota.
- int remaining = minQuota - currentAssignmentSize;
-
- if (currentAssignmentSize > 0) {
- int retainedPartitionsCount = min(currentAssignmentSize,
minQuota);
- IntStream.range(0, retainedPartitionsCount).forEach(i -> {
- TopicIdPartition topicIdPartition =
validCurrentMemberAssignment.get(i);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- });
-
- if (remaining < 0) {
- // The extra partition is located at the last index from
the previous step.
- if (remainingMembersToGetAnExtraPartition > 0) {
- TopicIdPartition topicIdPartition =
validCurrentMemberAssignment.get(retainedPartitionsCount++);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- remainingMembersToGetAnExtraPartition--;
+ private void maybeRevokePartitions() {
+ for (Map.Entry<String, AssignmentMemberSpec> entry :
groupSpec.members().entrySet()) {
+ String memberId = entry.getKey();
+ AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+ Map<Uuid, Set<Integer>> oldAssignment =
assignmentMemberSpec.assignedPartitions();
+ Map<Uuid, Set<Integer>> newAssignment = null;
+
+ // The assignor expects to receive the assignment as an immutable
map. It leverages
+ // this knowledge in order to avoid having to copy all assignments.
+ if (!isImmutableMap(oldAssignment)) {
+ throw new IllegalStateException("The assignor expect an
immutable map.");
+ }
+
+ int quota = minimumMemberQuota;
+ if (remainingMembersToGetAnExtraPartition > 0) {
+ quota++;
+ remainingMembersToGetAnExtraPartition--;
+ }
+
+ for (Map.Entry<Uuid, Set<Integer>> topicPartitions :
oldAssignment.entrySet()) {
+ Uuid topicId = topicPartitions.getKey();
+ Set<Integer> partitions = topicPartitions.getValue();
+
+ if (subscribedTopicIds.contains(topicId)) {
+ if (partitions.size() <= quota) {
+ quota -= partitions.size();
+ } else {
+ for (Integer partition : partitions) {
+ if (quota > 0) {
+ quota--;
+ } else {
+ if (newAssignment == null) {
+ // If the new assignment is null, we
create a deep copy of the
+ // original assignment so that we can
alter it.
+ newAssignment = deepCopy(oldAssignment);
+ }
+ // Remove the partition from the new
assignment.
+ Set<Integer> parts =
newAssignment.get(topicId);
+ parts.remove(partition);
+ if (parts.isEmpty()) {
+ newAssignment.remove(topicId);
+ }
+ // Add the partition to the unassigned set to
be re-assigned later on.
+ unassignedPartitions.add(new
TopicIdPartition(topicId, partition));
+ }
+ }
}
- // Any previously owned partitions that weren't retained
due to the quotas
- // are added to the unassigned partitions set.
- if (retainedPartitionsCount < currentAssignmentSize) {
-
unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
- retainedPartitionsCount,
- currentAssignmentSize
- ));
+ } else {
+ if (newAssignment == null) {
+ // If the new assignment is null, we create a deep
copy of the
+ // original assignment so that we can alter it.
+ newAssignment = deepCopy(oldAssignment);
}
+ // Remove the entire topic.
+ newAssignment.remove(topicId);
}
}
- if (remaining >= 0) {
- potentiallyUnfilledMembers.put(memberId, remaining);
+ if (quota > 0) {
+ potentiallyUnfilledMembers.add(new
MemberWithRemainingQuota(memberId, quota));
}
- });
- return potentiallyUnfilledMembers;
- }
-
- /**
- * Filters the current assignment of partitions for a given member based
on certain criteria.
- *
- * Any partition that still belongs to the member's subscribed topics list
is considered valid.
- *
- * @param currentMemberAssignment The map of topics to partitions
currently assigned to the member.
- *
- * @return List of valid partitions after applying the filters.
- */
- private List<TopicIdPartition> validCurrentMemberAssignment(
- Map<Uuid, Set<Integer>> currentMemberAssignment
- ) {
- List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
- currentMemberAssignment.forEach((topicId, partitions) -> {
- if (subscribedTopicIds.contains(topicId)) {
- partitions.forEach(partition -> {
- TopicIdPartition topicIdPartition = new
TopicIdPartition(topicId, partition);
- validCurrentAssignmentList.add(topicIdPartition);
- });
+ if (newAssignment == null) {
+ targetAssignment.put(memberId, new
MemberAssignment(oldAssignment));
} else {
- LOG.debug("The topic " + topicId + " is no longer present in
the subscribed topics list");
+ targetAssignment.put(memberId, new
MemberAssignment(newAssignment));
}
- });
-
- return validCurrentAssignmentList;
+ }
}
- /**
- * Allocates the unassigned partitions to unfilled members in a
round-robin fashion.
- */
- private void unassignedPartitionsRoundRobinAssignment() {
- Queue<String> roundRobinMembers = new
LinkedList<>(potentiallyUnfilledMembers.keySet());
-
- // Partitions are sorted to ensure an even topic wise distribution
across members.
- // This not only balances the load but also makes partition-to-member
mapping more predictable.
- List<TopicIdPartition> sortedPartitionsList =
unassignedPartitions.stream()
-
.sorted(Comparator.comparing(TopicIdPartition::topicId).thenComparing(TopicIdPartition::partitionId))
- .collect(Collectors.toList());
-
- for (TopicIdPartition topicIdPartition : sortedPartitionsList) {
- boolean assigned = false;
-
- for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
- String memberId = roundRobinMembers.poll();
- if (potentiallyUnfilledMembers.containsKey(memberId)) {
- assigned = maybeAssignPartitionToMember(memberId,
topicIdPartition);
- }
- // Only re-add the member to the end of the queue if it's
still available for assignment.
- if (potentiallyUnfilledMembers.containsKey(memberId)) {
- roundRobinMembers.add(memberId);
- }
+ private void assignRemainingPartitions() {
Review Comment:
nit: java docs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]