dajac commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1620268966


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -39,4 +41,20 @@ public interface GroupSpec {
      *         False, otherwise.
      */
     boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+    /**
+     * Gets the member subscription specification for a member.
+     *
+     * @param memberId          The member Id.
+     * @return The member's subscription metadata.
+     */
+    MemberSubscriptionSpec memberSubscriptionSpec(String memberId);
+
+    /**
+     * Gets the current assignment for a member.
+     *
+     * @param memberId          The member Id.
+     * @return A map of topic Ids to sets of partition numbers.
+     */
+    Map<Uuid, Set<Integer>> currentMemberAssignment(String memberId);

Review Comment:
   nit: Let's call it `memberAssignment`. The current is redundant in my 
opinion because it is implicit here. Same for the unknown member id case in the 
javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -39,4 +41,20 @@ public interface GroupSpec {
      *         False, otherwise.
      */
     boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+    /**
+     * Gets the member subscription specification for a member.
+     *
+     * @param memberId          The member Id.
+     * @return The member's subscription metadata.
+     */
+    MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   nit: Let's call it `memberSubscription`. We should also precise in the 
javadoc what we do if the member id does not exist. I suppose that we return 
null.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -18,48 +18,60 @@
 
 import org.apache.kafka.common.Uuid;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * The assignment specification for a consumer group.
  */
 public class GroupSpecImpl implements GroupSpec {
     /**
-     * The member metadata keyed by member Id.
+     * Member subscription metadata keyed by member Id.
      */
-    private final Map<String, AssignmentMemberSpec> members;
+    private final Map<String, MemberSubscriptionSpec> memberSubscriptions;
 
     /**
-     * The subscription type followed by the group.
+     * The subscription type of the group.
      */
     private final SubscriptionType subscriptionType;
 
+    /**
+     * Partitions currently assigned to each member keyed by topicId.
+     */
+    private final Map<String, Map<Uuid, Set<Integer>>> currentAssignment;
+
     /**
      * Reverse lookup map representing topic partitions with
      * their current member assignments.
      */
-    private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+    private final Map<Uuid, Map<Integer, String>> invertedCurrentAssignment;
 
     public GroupSpecImpl(
-        Map<String, AssignmentMemberSpec> members,
+        Map<String, MemberSubscriptionSpec> members,
         SubscriptionType subscriptionType,
-        Map<Uuid, Map<Integer, String>> invertedTargetAssignment
+        Map<String, Map<Uuid, Set<Integer>>> currentAssignment,
+        Map<Uuid, Map<Integer, String>> invertedCurrentAssignment
     ) {
         Objects.requireNonNull(members);
         Objects.requireNonNull(subscriptionType);
-        Objects.requireNonNull(invertedTargetAssignment);
-        this.members = members;
+        Objects.requireNonNull(currentAssignment);
+        Objects.requireNonNull(invertedCurrentAssignment);
+        this.memberSubscriptions = members;
         this.subscriptionType = subscriptionType;
-        this.invertedTargetAssignment = invertedTargetAssignment;
+        this.currentAssignment = currentAssignment;
+        this.invertedCurrentAssignment = invertedCurrentAssignment;

Review Comment:
   While we are here, could we do `this.memberSubscriptions = 
Objects.requireNonNull(members);?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -85,19 +86,20 @@ public static void assertAssignment(
     }
 
     /**
-     * Generate a reverse look up map of partition to member target 
assignments from the given member spec.
+     * Generate a reverse look up map of partition to member target 
assignments from the given metadata.
      *
-     * @param memberSpec        A map where the key is the member Id and the 
value is an
-     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @param assignedPartitions              Partition assignments per member.
+     * @param memberSubscriptionSpec          A map where the key is the 
member Id and the value is a
+     *                                        MemberSubscriptionSpec object 
containing the member's subscription info.
      * @return Map of topic partition to member assignments.
      */
     public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
-        Map<String, AssignmentMemberSpec> memberSpec
+        Map<String, Map<Uuid, Set<Integer>>> assignedPartitions,
+        Map<String, MemberSubscriptionSpec> memberSubscriptionSpec

Review Comment:
   Do we really need `memberSubscriptionSpec`? It seems that we only use it to 
get the member ids but we also have them in `assignedPartitions`, no?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -339,6 +352,7 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
             new GroupSpecImpl(
                 Collections.unmodifiableMap(memberSpecs),
                 subscriptionType,
+                currentTargetAssignment,
                 invertedTargetAssignment

Review Comment:
   Should we just use `targetAssignment` and `invertedTargetAssignment`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -33,37 +35,38 @@
 
 public class GroupSpecImplTest {
 
-    private Map<String, AssignmentMemberSpec> members;
+    private Map<String, MemberSubscriptionSpec> members;
     private SubscriptionType subscriptionType;
+    private Map<String, Map<Uuid, Set<Integer>>> assignedPartitions;
     private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;

Review Comment:
   We are not consistent here. Let's use `assignment` and `invertedAssignment`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpec.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Optional;
+import java.util.Set;
+
+public interface MemberSubscriptionSpec {

Review Comment:
   nit: Could we add java doc for the interface?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -18,48 +18,60 @@
 
 import org.apache.kafka.common.Uuid;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * The assignment specification for a consumer group.
  */
 public class GroupSpecImpl implements GroupSpec {
     /**
-     * The member metadata keyed by member Id.
+     * Member subscription metadata keyed by member Id.
      */
-    private final Map<String, AssignmentMemberSpec> members;
+    private final Map<String, MemberSubscriptionSpec> memberSubscriptions;
 
     /**
-     * The subscription type followed by the group.
+     * The subscription type of the group.
      */
     private final SubscriptionType subscriptionType;
 
+    /**
+     * Partitions currently assigned to each member keyed by topicId.
+     */
+    private final Map<String, Map<Uuid, Set<Integer>>> currentAssignment;

Review Comment:
   nit: Should we use `assignment` and `invertedAssignment`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -293,14 +295,22 @@ public TargetAssignmentBuilder removeMember(
      * @throws PartitionAssignorException if the target assignment cannot be 
computed.
      */
     public TargetAssignmentResult build() throws PartitionAssignorException {
-        Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+        Map<String, MemberSubscriptionSpec> memberSpecs = new HashMap<>();

Review Comment:
   Should we also init the size here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -293,14 +295,22 @@ public TargetAssignmentBuilder removeMember(
      * @throws PartitionAssignorException if the target assignment cannot be 
computed.
      */
     public TargetAssignmentResult build() throws PartitionAssignorException {
-        Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+        Map<String, MemberSubscriptionSpec> memberSpecs = new HashMap<>();
+        Map<String, Map<Uuid, Set<Integer>>> currentTargetAssignment = new 
HashMap<>(members.size());

Review Comment:
   This is a tad annoying. I think that we can keep it as it is for now but we 
need to find a better way. I also wonder if this has an impact on performance.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -148,23 +143,27 @@ public void testFirstAssignmentTwoMembersTwoTopics() {
             mkMapOfPartitionRacks(6)
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-        members.put(memberA, new AssignmentMemberSpec(
-            Optional.empty(),
+        Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+        Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new 
HashMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>();
+        assignedPartitions.put(memberA, currentAssignmentForA);
+        members.put(memberA, new MemberSubscriptionSpecImpl(
             Optional.empty(),
-            mkSet(topic1Uuid, topic3Uuid),
-            Collections.emptyMap()
+            new HashSet<>(Arrays.asList(topic1Uuid, topic3Uuid))
         ));
-        members.put(memberB, new AssignmentMemberSpec(
-            Optional.empty(),
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>();
+        assignedPartitions.put(memberB, currentAssignmentForB);

Review Comment:
   I see many cases like this. I let you double check them.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java:
##########
@@ -18,105 +18,63 @@
 
 import org.apache.kafka.common.Uuid;
 
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
 /**
- * The assignment specification for a consumer group member.
+ * Implementation of the {@link MemberSubscriptionSpec} interface.
  */
-public class AssignmentMemberSpec {
-    /**
-     * The instance ID if provided.
-     */
-    private final Optional<String> instanceId;
-
-    /**
-     * The rack ID if provided.
-     */
+public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
     private final Optional<String> rackId;
-
-    /**
-     * Topics Ids that the member is subscribed to.
-     */
     private final Set<Uuid> subscribedTopicIds;
 
     /**
-     * Partitions assigned keyed by topicId.
-     */
-    private final Map<Uuid, Set<Integer>> assignedPartitions;
-
-    /**
-     * @return The instance ID as an Optional.
+     * Constructs a new {@code MemberSubscriptionSpecImpl}.
+     *
+     * @param rackId                The rack Id.
+     * @param subscribedTopicIds    The set of subscribed topic Ids.
      */
-    public Optional<String> instanceId() {
-        return instanceId;
+    public MemberSubscriptionSpecImpl(
+        Optional<String> rackId,
+        Set<Uuid> subscribedTopicIds
+    ) {
+        Objects.requireNonNull(rackId);
+        Objects.requireNonNull(subscribedTopicIds);
+        this.rackId = rackId;
+        this.subscribedTopicIds = subscribedTopicIds;

Review Comment:
   ditto about the format here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -148,23 +143,27 @@ public void testFirstAssignmentTwoMembersTwoTopics() {
             mkMapOfPartitionRacks(6)
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-        members.put(memberA, new AssignmentMemberSpec(
-            Optional.empty(),
+        Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+        Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new 
HashMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>();
+        assignedPartitions.put(memberA, currentAssignmentForA);
+        members.put(memberA, new MemberSubscriptionSpecImpl(
             Optional.empty(),
-            mkSet(topic1Uuid, topic3Uuid),
-            Collections.emptyMap()
+            new HashSet<>(Arrays.asList(topic1Uuid, topic3Uuid))

Review Comment:
   Let's keep `mkSet`. It is shorter and easier to read.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -148,23 +143,27 @@ public void testFirstAssignmentTwoMembersTwoTopics() {
             mkMapOfPartitionRacks(6)
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-        members.put(memberA, new AssignmentMemberSpec(
-            Optional.empty(),
+        Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+        Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new 
HashMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>();

Review Comment:
   nit: Could we use `Collections.emptyMap()`? Could we get rid of the variable?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -81,4 +84,22 @@ void testIsPartitionAssigned() {
         assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
         assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
     }
+
+    @Test
+    void testMemberSubscriptionSpec() {
+        assertEquals(members.get(testMember), 
groupSpec.memberSubscriptionSpec(testMember));

Review Comment:
   Should we test unknown member here too?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -585,34 +586,34 @@ public void 
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
         ));
 
         // Initial subscriptions were [T1, T2]
-        Map<String, AssignmentMemberSpec> members = new HashMap<>();
+        Map<String, MemberSubscriptionSpec> members = new TreeMap<>();
+        Map<String, Map<Uuid, Set<Integer>>> assignedPartitions = new 
HashMap<>();
 
         Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 2),
             mkTopicAssignment(topic2Uuid, 1, 3)
         );
-        members.put(memberA, new AssignmentMemberSpec(
+        assignedPartitions.put(memberA, currentAssignmentForA);
+        members.put(memberA, new MemberSubscriptionSpecImpl(
             Optional.empty(),
-            Optional.empty(),
-            Collections.singleton(topic1Uuid),
-            currentAssignmentForA
+            Collections.singleton(topic1Uuid)
         ));
 
         Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
             mkTopicAssignment(topic1Uuid, 1),
             mkTopicAssignment(topic2Uuid, 0, 2, 4)
         );
-        members.put(memberB, new AssignmentMemberSpec(
-            Optional.empty(),
+        assignedPartitions.put(memberB, currentAssignmentForB);
+        members.put(memberB, new MemberSubscriptionSpecImpl(
             Optional.empty(),
-            mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            new HashSet<>(Arrays.asList(topic1Uuid, topic2Uuid))
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
             members,
             HETEROGENEOUS,
-            invertedTargetAssignment(members)
+            assignedPartitions,
+            invertedTargetAssignment(assignedPartitions, members)
         );
         SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
 

Review Comment:
   The data preparation has become more complicated. I wonder if we could 
introduce a GroupSpecBuilder to unify and to simplify it. We can consider this 
as a follow-up. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to