dajac commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1624069652


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -75,36 +75,61 @@ public SubscriptionType subscriptionType() {
      */
     @Override
     public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
-        Map<Integer, String> partitionMap = 
invertedTargetAssignment.get(topicId);
+        Map<Integer, String> partitionMap = 
invertedMemberAssignment.get(topicId);
         if (partitionMap == null) {
             return false;
         }
         return partitionMap.containsKey(partitionId);
     }
 
+    /**
+     * {@inheritDoc}
+     * @throws IllegalStateException If the member Id is not found.

Review Comment:
   nit: We can remove it here as it is already in the parent's doc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Implementation of the {@link MemberSubscriptionSpec} interface.
+ */
+public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
+    private final Optional<String> rackId;
+    private final Set<Uuid> subscribedTopicIds;
+    private final Assignment memberAssignment;
+
+    /**
+     * Constructs a new {@code MemberSubscriptionSpecImpl}.
+     *
+     * @param rackId                The rack Id.
+     * @param subscribedTopicIds    The set of subscribed topic Ids.

Review Comment:
   nit: `memberAssignment` misses.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9863,8 +9863,8 @@ public void 
testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw
 
             // Newly joining member 3 bumps the group epoch. A new target 
assignment is computed.
             CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 1),
-            CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
             CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId3, assignor.targetPartitions(memberId3)),
+            CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),

Review Comment:
   Why do we need this change?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##########
@@ -66,7 +66,8 @@ public GroupAssignment assign(
         GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
-        if (groupSpec.members().isEmpty())
+

Review Comment:
   nit: This empty line is not necessary.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -81,4 +82,27 @@ void testIsPartitionAssigned() {
         assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
         assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
     }
+
+    @Test
+    void testMemberSubscriptionSpec() {

Review Comment:
   nit: testMemberSubscription?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -164,37 +164,40 @@ public void prepareMemberAssignment(
         public TargetAssignmentBuilder.TargetAssignmentResult build() {
             TopicsImage topicsImage = topicsImageBuilder.build().topics();
             // Prepare expected member specs.
-            Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+            Map<String, MemberSubscriptionSpecImpl> memberSubscriptions = new 
HashMap<>();
+            Map<String, Map<Uuid, Set<Integer>>> targetAssignment = new 
HashMap<>(members.size());
 
             // All the existing members are prepared.
-            members.forEach((memberId, member) ->
-                memberSpecs.put(memberId, createAssignmentMemberSpec(
+            members.forEach((memberId, member) -> {
+                Assignment assignment = 
this.targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
+                memberSubscriptions.put(memberId, 
createMemberSubscriptionSpecImpl(
                     member,
-                    targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-                    topicsImage
-                )
-            ));
+                    topicsImage,
+                    assignment
+                ));
+            });
 
             // All the updated are added and all the deleted
             // members are removed.
             updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
                 if (updatedMemberOrNull == null) {
-                    memberSpecs.remove(memberId);
+                    memberSubscriptions.remove(memberId);
                 } else {
-                    Assignment assignment = 
targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+                    Assignment assignment = 
this.targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
 
                     // A new static member joins and needs to replace an 
existing departed one.
                     if (updatedMemberOrNull.instanceId() != null) {
                         String previousMemberId = 
staticMembers.get(updatedMemberOrNull.instanceId());
                         if (previousMemberId != null && 
!previousMemberId.equals(memberId)) {
-                            assignment = 
targetAssignment.getOrDefault(previousMemberId, Assignment.EMPTY);
+                            assignment = 
this.targetAssignment.getOrDefault(previousMemberId, Assignment.EMPTY);

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -225,7 +233,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult 
build() {
                 .withStaticMembers(staticMembers)
                 .withSubscriptionMetadata(subscriptionMetadata)
                 .withSubscriptionType(subscriptionType)
-                .withTargetAssignment(targetAssignment)
+                .withTargetAssignment(this.targetAssignment)

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##########
@@ -328,45 +318,43 @@ public void 
testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA
             topic1Uuid,
             topic1Name,
             2,
-            createPartitionRacks(2)
+            Collections.emptyMap()
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
             2,
-            createPartitionRacks(2)
+            Collections.emptyMap()
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
+        Map<Uuid, Set<Integer>> targetAssignment;
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
+        targetAssignment = mkAssignment(

Review Comment:
   Let's inline those too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -293,14 +293,18 @@ public TargetAssignmentBuilder removeMember(
      * @throws PartitionAssignorException if the target assignment cannot be 
computed.
      */
     public TargetAssignmentResult build() throws PartitionAssignorException {
-        Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+        Map<String, MemberSubscriptionSpecImpl> memberSpecs = new 
HashMap<>(members.size());
 
         // Prepare the member spec for all members.
-        members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
-            member,
-            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-            topicsImage
-        )));
+        members.forEach((memberId, member) -> {
+            Assignment assignment = targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY);
+

Review Comment:
   nit: We could inline `targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY)` as it was before.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -393,16 +397,16 @@ private Assignment newMemberAssignment(
         }
     }
 
-    static AssignmentMemberSpec createAssignmentMemberSpec(
+    // private for testing
+    static MemberSubscriptionSpecImpl createMemberSubscriptionSpecImpl(
         ConsumerGroupMember member,
-        Assignment targetAssignment,
-        TopicsImage topicsImage
+        TopicsImage topicsImage,
+        Assignment memberAssignment

Review Comment:
   nit: Let's revert back to the previous order in order to reduce the noise in 
the diff.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -39,4 +41,22 @@ public interface GroupSpec {
      *         False, otherwise.
      */
     boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+    /**
+     * Gets the member subscription specification for a member.
+     *
+     * @param memberId          The member Id.
+     * @return The member's subscription metadata.
+     * @throws IllegalStateException If the member Id is not found.
+     */
+    MemberSubscriptionSpec memberSubscription(String memberId);
+
+    /**
+     * Gets the current target assignment of a member.

Review Comment:
   nit: "Gets the current assignment of the member".



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -164,37 +164,40 @@ public void prepareMemberAssignment(
         public TargetAssignmentBuilder.TargetAssignmentResult build() {
             TopicsImage topicsImage = topicsImageBuilder.build().topics();
             // Prepare expected member specs.
-            Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+            Map<String, MemberSubscriptionSpecImpl> memberSubscriptions = new 
HashMap<>();
+            Map<String, Map<Uuid, Set<Integer>>> targetAssignment = new 
HashMap<>(members.size());
 
             // All the existing members are prepared.
-            members.forEach((memberId, member) ->
-                memberSpecs.put(memberId, createAssignmentMemberSpec(
+            members.forEach((memberId, member) -> {
+                Assignment assignment = 
this.targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
+                memberSubscriptions.put(memberId, 
createMemberSubscriptionSpecImpl(
                     member,
-                    targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-                    topicsImage
-                )
-            ));
+                    topicsImage,
+                    assignment
+                ));
+            });
 
             // All the updated are added and all the deleted
             // members are removed.
             updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
                 if (updatedMemberOrNull == null) {
-                    memberSpecs.remove(memberId);
+                    memberSubscriptions.remove(memberId);
                 } else {
-                    Assignment assignment = 
targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+                    Assignment assignment = 
this.targetAssignment.getOrDefault(memberId, Assignment.EMPTY);

Review Comment:
   I suppose that we could remove `this`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##########
@@ -350,7 +354,7 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
         Map<String, Assignment> newTargetAssignment = new HashMap<>();
 
         memberSpecs.keySet().forEach(memberId -> {
-            Assignment oldMemberAssignment = targetAssignment.get(memberId);
+            Assignment oldMemberAssignment = 
this.targetAssignment.get(memberId);

Review Comment:
   nit: Could we revert this change?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Implementation of the {@link MemberSubscriptionSpec} interface.
+ */
+public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
+    private final Optional<String> rackId;
+    private final Set<Uuid> subscribedTopicIds;
+    private final Assignment memberAssignment;
+
+    /**
+     * Constructs a new {@code MemberSubscriptionSpecImpl}.
+     *
+     * @param rackId                The rack Id.
+     * @param subscribedTopicIds    The set of subscribed topic Ids.
+     */
+    public MemberSubscriptionSpecImpl(
+        Optional<String> rackId,
+        Set<Uuid> subscribedTopicIds,
+        Assignment memberAssignment
+    ) {
+        this.rackId = Objects.requireNonNull(rackId);
+        this.subscribedTopicIds = Objects.requireNonNull(subscribedTopicIds);
+        this.memberAssignment = Objects.requireNonNull(memberAssignment);
+    }
+
+    @Override
+    public Optional<String> rackId() {
+        return rackId;
+    }
+
+    @Override
+    public Set<Uuid> subscribedTopicIds() {
+        return subscribedTopicIds;
+    }
+
+    public Map<Uuid, Set<Integer>> memberAssignment() {
+        return memberAssignment.partitions();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MemberSubscriptionSpecImpl that = (MemberSubscriptionSpecImpl) o;
+        return rackId.equals(that.rackId) &&
+            subscribedTopicIds.equals(that.subscribedTopicIds);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = rackId.hashCode();
+        result = 31 * result + subscribedTopicIds.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "MemberSubscriptionSpecImpl(rackId=" + rackId.orElse("N/A") +
+            ", subscribedTopicIds=" + subscribedTopicIds +
+            ')';

Review Comment:
   `memberAssignment` misses.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -86,21 +86,19 @@ public static void assertAssignment(
     }
 
     /**
-     * Generate a reverse look up map of partition to member target 
assignments from the given member spec.
+     * Generate a reverse look up map of partition to member target 
assignments from the given metadata.
      *
-     * @param memberSpec        A map where the key is the member Id and the 
value is an
-     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @param members       The member subscription specs.
      * @return Map of topic partition to member assignments.
      */
     public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
-        Map<String, AssignmentMemberSpec> memberSpec
+        Map<String, MemberSubscriptionSpecImpl> members
     ) {
         Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new 
HashMap<>();
-        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
memberSpec.entrySet()) {
-            String memberId = memberEntry.getKey();
-            Map<Uuid, Set<Integer>> topicsAndPartitions = 
memberEntry.getValue().assignedPartitions();
+        for (String memberId : members.keySet()) {

Review Comment:
   nit: It looks like that we could keep the previous code for the iteration. 
The only change should be replacing `assignedPartitions` by `memberAssignment`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -271,45 +268,37 @@ public void 
testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen
             mkMapOfPartitionRacks(4)
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
+        Map<Uuid, Set<Integer>> targetAssignment;
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 1, 2)
-            )
+        targetAssignment = mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 1, 2)
         );

Review Comment:
   While we are doing those changes, I think that it would be better to 
directly inline the assignment and avoid the reused local variable 
`targetAssignment`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -17,42 +17,43 @@
 package org.apache.kafka.coordinator.group.assignor;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 public class GroupSpecImplTest {
-
-    private Map<String, AssignmentMemberSpec> members;
+    private Map<String, MemberSubscriptionSpecImpl> members;
     private SubscriptionType subscriptionType;
     private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
     private GroupSpecImpl groupSpec;
     private Uuid topicId;
+    private final String testMember = "test-member";

Review Comment:
   Constants should be name in upper case (e.g. TEST_MEMBER). Moreover, we 
usually put them first in the fields declaration.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java:
##########
@@ -81,4 +82,27 @@ void testIsPartitionAssigned() {
         assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
         assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
     }
+
+    @Test
+    void testMemberSubscriptionSpec() {
+        assertEquals(members.get(testMember), 
groupSpec.memberSubscription(testMember));
+        assertThrows(IllegalStateException.class, () -> 
groupSpec.memberSubscription("unknown-member"));
+    }
+
+    @Test
+    void testCurrentMemberAssignment() {

Review Comment:
   nit: testMemberAssignment?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##########
@@ -294,26 +295,27 @@ public void 
testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment(
             mkMapOfPartitionRacks(3)
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
+        Map<Uuid, Set<Integer>> targetAssignment;
 
-        members.put(memberA, new AssignmentMemberSpec(
-            Optional.empty(),
+        targetAssignment = mkOrderedAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 1),
+            mkTopicAssignment(topic2Uuid, 0, 1)
+        );
+        members.put(memberA, new MemberSubscriptionSpecImpl(
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            mkOrderedAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 1),
-                mkTopicAssignment(topic2Uuid, 0, 1)
-            )
+            new Assignment(targetAssignment)

Review Comment:
   Let's keep them inlined as they were. It avoids the local variable which is 
reused by different members.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -271,45 +268,37 @@ public void 
testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen
             mkMapOfPartitionRacks(4)
         ));
 
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
+        Map<Uuid, Set<Integer>> targetAssignment;
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 1, 2)
-            )
+        targetAssignment = mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 1, 2)
         );
-        members.put(memberA, new AssignmentMemberSpec(
+        members.put(memberA, new MemberSubscriptionSpecImpl(
             Optional.empty(),
-            Optional.of("rack0"),
             Collections.singleton(topic1Uuid),
-            currentAssignmentForA
+            new Assignment(targetAssignment)
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 3),
-                mkTopicAssignment(topic2Uuid, 0)
-            )
+        targetAssignment = mkAssignment(

Review Comment:
   Note that we change from TreeMap to HashMap here. Does it matter?



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##########
@@ -255,39 +259,43 @@ private void simulateIncrementalRebalance() {
         Map<String, MemberAssignment> members = initialAssignment.members();
 
         Map<Uuid, Map<Integer, String>> invertedTargetAssignment = 
AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
+        Map<Uuid, Set<Integer>> targetAssignment;
 
-        Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
+        Map<String, MemberSubscriptionSpecImpl> updatedMemberSpec = new 
HashMap<>();
 
-        groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+        for (String memberId : groupSpec.memberIds()) {
             MemberAssignment memberAssignment = members.getOrDefault(
                 memberId,
                 new MemberAssignment(Collections.emptyMap())
             );
 
-            updatedMembers.put(memberId, new AssignmentMemberSpec(
-                assignmentMemberSpec.instanceId(),
-                assignmentMemberSpec.rackId(),
-                assignmentMemberSpec.subscribedTopicIds(),
-                
Collections.unmodifiableMap(memberAssignment.targetPartitions())
+            targetAssignment = memberAssignment.targetPartitions();

Review Comment:
   nit: Let's inline this one and remove `targetAssignment`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -164,37 +164,40 @@ public void prepareMemberAssignment(
         public TargetAssignmentBuilder.TargetAssignmentResult build() {
             TopicsImage topicsImage = topicsImageBuilder.build().topics();
             // Prepare expected member specs.
-            Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+            Map<String, MemberSubscriptionSpecImpl> memberSubscriptions = new 
HashMap<>();
+            Map<String, Map<Uuid, Set<Integer>>> targetAssignment = new 
HashMap<>(members.size());

Review Comment:
   Could we remove this one?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -164,37 +164,40 @@ public void prepareMemberAssignment(
         public TargetAssignmentBuilder.TargetAssignmentResult build() {
             TopicsImage topicsImage = topicsImageBuilder.build().topics();
             // Prepare expected member specs.
-            Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+            Map<String, MemberSubscriptionSpecImpl> memberSubscriptions = new 
HashMap<>();
+            Map<String, Map<Uuid, Set<Integer>>> targetAssignment = new 
HashMap<>(members.size());
 
             // All the existing members are prepared.
-            members.forEach((memberId, member) ->
-                memberSpecs.put(memberId, createAssignmentMemberSpec(
+            members.forEach((memberId, member) -> {
+                Assignment assignment = 
this.targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
+                memberSubscriptions.put(memberId, 
createMemberSubscriptionSpecImpl(
                     member,
-                    targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-                    topicsImage
-                )
-            ));
+                    topicsImage,
+                    assignment
+                ));

Review Comment:
   Could we keep the previous code and just change the name of the method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to