rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1607621132
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
}
+ /**
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+ public Map<Uuid, Map<Integer, String>> partitionAssignments() {
+ return Collections.unmodifiableMap(partitionAssignments);
+ }
+
/**
* Updates target assignment of a member.
*
* @param memberId The member id.
* @param newTargetAssignment The new target assignment.
*/
public void updateTargetAssignment(String memberId, Assignment
newTargetAssignment) {
+ updatePartitionAssignments(
+ memberId,
+ targetAssignment.getOrDefault(memberId, new
Assignment(Collections.emptyMap())),
+ newTargetAssignment
+ );
targetAssignment.put(memberId, newTargetAssignment);
}
+ /**
+ * Updates partition assignments of the topics.
+ *
+ * @param memberId The member Id.
+ * @param oldTargetAssignment The old target assignment.
+ * @param newTargetAssignment The new target assignment.
+ *
+ * Package private for testing.
+ */
+ void updatePartitionAssignments(
+ String memberId,
+ Assignment oldTargetAssignment,
+ Assignment newTargetAssignment
+ ) {
+ // Combine keys from both old and new assignments.
+ Set<Uuid> allTopicIds = new HashSet<>();
+ allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+ allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+ for (Uuid topicId : allTopicIds) {
+ Set<Integer> oldPartitions =
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+ Set<Integer> newPartitions =
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+ TimelineHashMap<Integer, String> topicPartitionAssignment =
partitionAssignments.computeIfAbsent(
+ topicId, k -> new TimelineHashMap<>(snapshotRegistry,
Math.max(oldPartitions.size(), newPartitions.size()))
+ );
+
+ // Remove partitions that aren't present in the new assignment.
+ for (Integer partition : oldPartitions) {
+ if (!newPartitions.contains(partition) &&
memberId.equals(topicPartitionAssignment.get(partition))) {
Review Comment:
Imagine there are two members A,B. We remove a partition (P0) from A and
assign it to B. If B heartbeats first and we see that it's new target
assignment has P0 now we will update the map to 1,B. This way when A
heartbeats, we don't remove it from the map unless it's currently assigned to A
still.
In the case where we just use a byte array with each index as a partition
number, it was possible that we would update the assignment as 1,true when B
heartbeats and then unset it to false when A heartbeats even though it is
currently assigned to B. We need to make sure we only remove assignments if the
current member still has ownership
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]