dajac commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1620268966
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -39,4 +41,20 @@ public interface GroupSpec {
* False, otherwise.
*/
boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+ /**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId The member Id.
+ * @return The member's subscription metadata.
+ */
+ MemberSubscriptionSpec memberSubscriptionSpec(String memberId);
+
+ /**
+ * Gets the current assignment for a member.
+ *
+ * @param memberId The member Id.
+ * @return A map of topic Ids to sets of partition numbers.
+ */
+ Map<Uuid, Set<Integer>> currentMemberAssignment(String memberId);
Review Comment:
nit: Let's call it `memberAssignment`. The current is redundant in my
opinion because it is implicit here. Same for the unknown member id case in the
javadoc.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -39,4 +41,20 @@ public interface GroupSpec {
* False, otherwise.
*/
boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+ /**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId The member Id.
+ * @return The member's subscription metadata.
+ */
+ MemberSubscriptionSpec memberSubscriptionSpec(String memberId);
Review Comment:
nit: Let's call it `memberSubscription`. We should also precise in the
javadoc what we do if the member id does not exist. I suppose that we return
null.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -18,48 +18,60 @@
import org.apache.kafka.common.Uuid;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
/**
* The assignment specification for a consumer group.
*/
public class GroupSpecImpl implements GroupSpec {
/**
- * The member metadata keyed by member Id.
+ * Member subscription metadata keyed by member Id.
*/
- private final Map<String, AssignmentMemberSpec> members;
+ private final Map<String, MemberSubscriptionSpec> memberSubscriptions;
/**
- * The subscription type followed by the group.
+ * The subscription type of the group.
*/
private final SubscriptionType subscriptionType;
+ /**
+ * Partitions currently assigned to each member keyed by topicId.
+ */
+ private final Map<String, Map<Uuid, Set<Integer>>> currentAssignment;
+
/**
* Reverse lookup map representing topic partitions with
* their current member assignments.
*/
- private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+ private final Map<Uuid, Map<Integer, String>> invertedCurrentAssignment;
public GroupSpecImpl(
- Map<String, AssignmentMemberSpec> members,
+ Map<String, MemberSubscriptionSpec> members,
SubscriptionType subscriptionType,
- Map<Uuid, Map<Integer, String>> invertedTargetAssignment
+ Map<String, Map<Uuid, Set<Integer>>> currentAssignment,
+ Map<Uuid, Map<Integer, String>> invertedCurrentAssignment
) {
Objects.requireNonNull(members);
Objects.requireNonNull(subscriptionType);
- Objects.requireNonNull(invertedTargetAssignment);
- this.members = members;
+ Objects.requireNonNull(currentAssignment);
+ Objects.requireNonNull(invertedCurrentAssignment);
+ this.memberSubscriptions = members;
this.subscriptionType = subscriptionType;
- this.invertedTargetAssignment = invertedTargetAssignment;
+ this.currentAssignment = currentAssignment;
+ this.invertedCurrentAssignment = invertedCurrentAssignment;
Review Comment:
While we are here, could we do `this.memberSubscriptions =
Objects.requireNonNull(members);?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -85,19 +86,20 @@ public static void assertAssignment(
}
/**
- * Generate a reverse look up map of partition to member target
assignments from the given member spec.
+ * Generate a reverse look up map of partition to member target
assignments from the given metadata.
*
- * @param memberSpec A map where the key is the member Id and the
value is an
- * AssignmentMemberSpec object containing the
member's partition assignments.
+ * @param assignedPartitions Partition assignments per member.
+ * @param memberSubscriptionSpec A map where the key is the
member Id and the value is a
+ * MemberSubscriptionSpec object
containing the member's subscription info.
* @return Map of topic partition to member assignments.
*/
public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
- Map<String, AssignmentMemberSpec> memberSpec
+ Map<String, Map<Uuid, Set<Integer>>> assignedPartitions,
+ Map<String, MemberSubscriptionSpec> memberSubscriptionSpec
Review Comment:
Do we really need `memberSubscriptionSpec`? It seems that we only use it to
get the member ids but we also have them in `assignedPartitions`, no?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -339,6 +352,7 @@ public TargetAssignmentResult build() throws
PartitionAssignorException {
new GroupSpecImpl(
Collections.unmodifiableMap(memberSpecs),
subscriptionType,
+ currentTargetAssignment,
invertedTargetAssignment
Review Comment:
Should we just use `targetAssignment` and `invertedTargetAssignment`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -33,37 +35,38 @@
public class GroupSpecImplTest {
- private Map<String, AssignmentMemberSpec> members;
+ private Map<String, MemberSubscriptionSpec> members;
private SubscriptionType subscriptionType;
+ private Map<String, Map<Uuid, Set<Integer>>> assignedPartitions;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
Review Comment:
We are not consistent here. Let's use `assignment` and `invertedAssignment`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpec.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Optional;
+import java.util.Set;
+
+public interface MemberSubscriptionSpec {
Review Comment:
nit: Could we add java doc for the interface?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -18,48 +18,60 @@
import org.apache.kafka.common.Uuid;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
/**
* The assignment specification for a consumer group.
*/
public class GroupSpecImpl implements GroupSpec {
/**
- * The member metadata keyed by member Id.
+ * Member subscription metadata keyed by member Id.
*/
- private final Map<String, AssignmentMemberSpec> members;
+ private final Map<String, MemberSubscriptionSpec> memberSubscriptions;
/**
- * The subscription type followed by the group.
+ * The subscription type of the group.
*/
private final SubscriptionType subscriptionType;
+ /**
+ * Partitions currently assigned to each member keyed by topicId.
+ */
+ private final Map<String, Map<Uuid, Set<Integer>>> currentAssignment;
Review Comment:
nit: Should we use `assignment` and `invertedAssignment`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -293,14 +295,22 @@ public TargetAssignmentBuilder removeMember(
* @throws PartitionAssignorException if the target assignment cannot be
computed.
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
- Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+ Map<String, MemberSubscriptionSpec> memberSpecs = new HashMap<>();
Review Comment:
Should we also init the size here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -293,14 +295,22 @@ public TargetAssignmentBuilder removeMember(
* @throws PartitionAssignorException if the target assignment cannot be
computed.
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
- Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+ Map<String, MemberSubscriptionSpec> memberSpecs = new HashMap<>();
+ Map<String, Map<Uuid, Set<Integer>>> currentTargetAssignment = new
HashMap<>(members.size());
Review Comment:
This is a tad annoying. I think that we can keep it as it is for now but we
need to find a better way. I also wonder if this has an impact on performance.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -148,23 +143,27 @@ public void testFirstAssignmentTwoMembersTwoTopics() {
mkMapOfPartitionRacks(6)
));
- Map<String, AssignmentMemberSpec> members = new TreeMap<>();
- members.put(memberA, new AssignmentMemberSpec(
- Optional.empty(),
+ Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+ Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new
HashMap<>();
+
+ Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>();
+ assignedPartitions.put(memberA, currentAssignmentForA);
+ members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
- mkSet(topic1Uuid, topic3Uuid),
- Collections.emptyMap()
+ new HashSet<>(Arrays.asList(topic1Uuid, topic3Uuid))
));
- members.put(memberB, new AssignmentMemberSpec(
- Optional.empty(),
+
+ Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>();
+ assignedPartitions.put(memberB, currentAssignmentForB);
Review Comment:
I see many cases like this. I let you double check them.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java:
##########
@@ -18,105 +18,63 @@
import org.apache.kafka.common.Uuid;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
- * The assignment specification for a consumer group member.
+ * Implementation of the {@link MemberSubscriptionSpec} interface.
*/
-public class AssignmentMemberSpec {
- /**
- * The instance ID if provided.
- */
- private final Optional<String> instanceId;
-
- /**
- * The rack ID if provided.
- */
+public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
private final Optional<String> rackId;
-
- /**
- * Topics Ids that the member is subscribed to.
- */
private final Set<Uuid> subscribedTopicIds;
/**
- * Partitions assigned keyed by topicId.
- */
- private final Map<Uuid, Set<Integer>> assignedPartitions;
-
- /**
- * @return The instance ID as an Optional.
+ * Constructs a new {@code MemberSubscriptionSpecImpl}.
+ *
+ * @param rackId The rack Id.
+ * @param subscribedTopicIds The set of subscribed topic Ids.
*/
- public Optional<String> instanceId() {
- return instanceId;
+ public MemberSubscriptionSpecImpl(
+ Optional<String> rackId,
+ Set<Uuid> subscribedTopicIds
+ ) {
+ Objects.requireNonNull(rackId);
+ Objects.requireNonNull(subscribedTopicIds);
+ this.rackId = rackId;
+ this.subscribedTopicIds = subscribedTopicIds;
Review Comment:
ditto about the format here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -148,23 +143,27 @@ public void testFirstAssignmentTwoMembersTwoTopics() {
mkMapOfPartitionRacks(6)
));
- Map<String, AssignmentMemberSpec> members = new TreeMap<>();
- members.put(memberA, new AssignmentMemberSpec(
- Optional.empty(),
+ Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+ Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new
HashMap<>();
+
+ Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>();
+ assignedPartitions.put(memberA, currentAssignmentForA);
+ members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
- mkSet(topic1Uuid, topic3Uuid),
- Collections.emptyMap()
+ new HashSet<>(Arrays.asList(topic1Uuid, topic3Uuid))
Review Comment:
Let's keep `mkSet`. It is shorter and easier to read.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -148,23 +143,27 @@ public void testFirstAssignmentTwoMembersTwoTopics() {
mkMapOfPartitionRacks(6)
));
- Map<String, AssignmentMemberSpec> members = new TreeMap<>();
- members.put(memberA, new AssignmentMemberSpec(
- Optional.empty(),
+ Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+ Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new
HashMap<>();
+
+ Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>();
Review Comment:
nit: Could we use `Collections.emptyMap()`? Could we get rid of the variable?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -81,4 +84,22 @@ void testIsPartitionAssigned() {
assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
}
+
+ @Test
+ void testMemberSubscriptionSpec() {
+ assertEquals(members.get(testMember),
groupSpec.memberSubscriptionSpec(testMember));
Review Comment:
Should we test unknown member here too?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -585,34 +586,34 @@ public void
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
));
// Initial subscriptions were [T1, T2]
- Map<String, AssignmentMemberSpec> members = new HashMap<>();
+ Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+ Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new
HashMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 1, 3)
);
- members.put(memberA, new AssignmentMemberSpec(
+ assignedPartitions.put(memberA, currentAssignmentForA);
+ members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
- Optional.empty(),
- Collections.singleton(topic1Uuid),
- currentAssignmentForA
+ Collections.singleton(topic1Uuid)
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 0, 2, 4)
);
- members.put(memberB, new AssignmentMemberSpec(
- Optional.empty(),
+ assignedPartitions.put(memberB, currentAssignmentForB);
+ members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
- mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForB
+ new HashSet<>(Arrays.asList(topic1Uuid, topic2Uuid))
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
- invertedTargetAssignment(members)
+ assignedPartitions,
+ invertedTargetAssignment(assignedPartitions, members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new
SubscribedTopicMetadata(topicMetadata);
Review Comment:
The data preparation has become more complicated. I wonder if we could
introduce a GroupSpecBuilder to unify and to simplify it. We can consider this
as a follow-up. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]