squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2867839878


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -804,13 +804,20 @@ public static CoordinatorRecord 
newShareGroupStatePartitionMetadataRecord(
     }
 
     private static 
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
toTopicPartitions(
-        Map<Uuid, Set<Integer>> topicPartitions
-    ) {
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics 
= new ArrayList<>(topicPartitions.size());
-        topicPartitions.forEach((topicId, partitions) ->
+        Map<Uuid, Map<Integer, Integer>> topicPartitionsWithEpochs
+    ) {
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics 
= new ArrayList<>(topicPartitionsWithEpochs.size());
+        topicPartitionsWithEpochs.forEach((topicId, partitionEpochMap) -> {
+            List<Integer> partitionList = new 
ArrayList<>(partitionEpochMap.keySet());
+            Collections.sort(partitionList);

Review Comment:
   I would disregard the copilot comment. The previous implementation was 
non-deterministic too.
   ```suggestion
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -190,12 +193,21 @@ public Builder setState(MemberState state) {
             return this;
         }
 
-        public Builder setAssignedPartitions(Map<Uuid, Set<Integer>> 
assignedPartitions) {
+        public Builder setAssignedPartitions(Map<Uuid, Set<Integer>> 
assignedPartitions, int assignmentEpoch) {
+            this.assignedPartitions = assignedPartitions.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> e.getValue().stream().collect(Collectors.toMap(p -> 
p, p -> assignmentEpoch))
+                ));

Review Comment:
   This operation is equivalent to `AssignmentTestUtil.toEpochsAssignment`. 
Since we're adding a util method for this, we can remove 
`setAssignedPartitions(assignedPartitions, assignmentEpoch)` and have the 
caller use the util method. The util method will need moving to `Utils.java`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -336,16 +376,16 @@ public Optional<String> serverAssignorName() {
     }
 
     /**
-     * @return The set of assigned partitions.
+     * @return The epoch-annotated assigned partitions map.
      */
-    public Map<Uuid, Set<Integer>> assignedPartitions() {
+    public Map<Uuid, Map<Integer, Integer>> assignedPartitions() {
         return assignedPartitions;
     }
 
     /**
-     * @return The set of partitions awaiting revocation from the member.
+     * @return The epoch-annotated pending revocation partitions map.

Review Comment:
   ```suggestion
        * @return The partitions awaiting revocation from this member and their 
assignment epochs.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -417,9 +487,9 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
             .setMemberEpoch(memberEpoch)
             .setMemberId(memberId)
             .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
-                .setTopicPartitions(topicPartitionsFromMap(assignedPartitions, 
image)))
+                
.setTopicPartitions(topicPartitionsFromEpochMap(assignedPartitions, image)))

Review Comment:
   Here we create a new version of `topicPartitionsFromMap()` but in 
`GroupMetadataManager.prepareAssignment` we clone the assignment without 
epochs. We're inconsistent in our approaches.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -432,7 +502,21 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
             .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
     }
 
-    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
+    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromEpochMap(
+        Map<Uuid, Map<Integer, Integer>> partitions,
+        CoordinatorMetadataImage image
+    ) {
+        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
+        partitions.forEach((topicId, partitionEpochMap) -> {
+            image.topicMetadata(topicId).ifPresent(topicMetadata -> 
topicPartitions.add(new ConsumerGroupDescribeResponseData.TopicPartitions()
+                .setTopicId(topicId)
+                .setTopicName(topicMetadata.name())
+                .setPartitions(new ArrayList<>(partitionEpochMap.keySet()))));
+        });
+        return topicPartitions;
+    }
+
+    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromPartitionSet(

Review Comment:
   ```suggestion
       private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromAssignmentWithoutEpochs(
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java:
##########
@@ -294,6 +324,10 @@ public void testAsConsumerGroupDescribeMember(boolean 
withClassicMemberMetadata)
             .build();
 
         ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(targetAssignment, new 
KRaftCoordinatorMetadataImage(metadataImage));
+        // Sort partitions for comparison
+        actual.assignment().topicPartitions().forEach(tp -> 
Collections.sort(tp.partitions()));
+        actual.targetAssignment().topicPartitions().forEach(tp -> 
Collections.sort(tp.partitions()));
+

Review Comment:
   Is this change necessary?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,14 +302,21 @@ public ConsumerGroupMember build() {
     private final String serverAssignorName;
 
     /**
-     * The partitions being revoked by this member.
+     * The classic member metadata if the consumer uses the classic protocol.
      */
-    private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+    private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata;
 
     /**
-     * The classic member metadata if the consumer uses the classic protocol.
+     * The epoch at which each partition was assigned to this member.
+     * Map: topicId -> partitionId -> assignmentEpoch

Review Comment:
   ```suggestion
        * The partitions assigned to this member and their assignment epochs.
        * A map of topic ids to partitions to assignment epochs.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -62,6 +62,37 @@ public static Map<Uuid, Set<Integer>> 
mkAssignment(Map.Entry<Uuid, Set<Integer>>
         return Collections.unmodifiableMap(assignment);
     }
 
+    /**
+     * Converts a regular assignment to an epochs-based assignment using the 
given epoch.
+     */
+    public static Map<Uuid, Map<Integer, Integer>> toEpochsAssignment(
+        Map<Uuid, Set<Integer>> assignment,
+        int epoch
+    ) {
+        Map<Uuid, Map<Integer, Integer>> result = new LinkedHashMap<>();
+        for (Map.Entry<Uuid, Set<Integer>> entry : assignment.entrySet()) {
+            Map<Integer, Integer> partitionEpochs = new HashMap<>();
+            for (Integer partition : entry.getValue()) {
+                partitionEpochs.put(partition, epoch);
+            }
+            result.put(entry.getKey(), 
Collections.unmodifiableMap(partitionEpochs));
+        }
+        return Collections.unmodifiableMap(result);
+    }
+
+    /**
+     * Converts an epochs-based assignment to a regular assignment (without 
epochs).
+     */
+    public static Map<Uuid, Set<Integer>> toPartitionMap(

Review Comment:
   ```suggestion
       /**
        * Discards the assignment epochs from an assignment with epochs.
        */
       public static Map<Uuid, Set<Integer>> toAssignmentWithoutEpochs(
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3874,8 +3891,8 @@ fooTopicName, computeTopicHash(fooTopicName, new 
MetadataImageBuilder()
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
             .setSubscribedTopicNames(List.of("foo", "bar"))
             .setServerAssignorName("range")
-            .setAssignedPartitions(mkAssignment(
-                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .setAssignedPartitions(Map.of(
+                fooTopicId, new TreeMap<>(Map.of(0, 10, 1, 10, 2, 10, 3, 11, 
4, 11, 5, 11))))

Review Comment:
   This is pretty ugly and inconsistent with the other tests.
   
   Can we write this as
   ```
   mkAssignmentWithEpochs(
       mkTopicAssignmentWithEpochs(fooTopicId, 10, 0, 1, 2, 3),
       mkTopicAssignmentWithEpochs(fooTopicId, 11, 4, 5))
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,14 +302,21 @@ public ConsumerGroupMember build() {
     private final String serverAssignorName;
 
     /**
-     * The partitions being revoked by this member.
+     * The classic member metadata if the consumer uses the classic protocol.
      */
-    private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+    private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata;
 
     /**
-     * The classic member metadata if the consumer uses the classic protocol.
+     * The epoch at which each partition was assigned to this member.
+     * Map: topicId -> partitionId -> assignmentEpoch
      */
-    private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata;
+    private final Map<Uuid, Map<Integer, Integer>> assignedPartitions;
+
+    /**
+     * The epoch at which each partition pending revocation was assigned.
+     * Map: topicId -> partitionId -> assignmentEpoch

Review Comment:
   ```suggestion
        * The partitions awaiting revocation from this member and their 
assignment epochs.
        * A map of topic ids to partitions to assignment epochs.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -7602,7 +7606,11 @@ private byte[] prepareAssignment(ConsumerGroupMember 
member) {
         try {
             return ConsumerProtocol.serializeAssignment(
                 toConsumerProtocolAssignment(
-                    member.assignedPartitions(),
+                    member.assignedPartitions().entrySet().stream()
+                        .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            e -> Set.copyOf(e.getValue().keySet())
+                        )),

Review Comment:
   This is the same operation as `AssignmentTestUtil.toPartitionMap`. Can we 
reuse the method?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,19 +302,21 @@ public ConsumerGroupMember build() {
     private final String serverAssignorName;
 
     /**
-     * The partitions assigned to this member.
+     * The classic member metadata if the consumer uses the classic protocol.
      */
-    private final Map<Uuid, Set<Integer>> assignedPartitions;
+    private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata;

Review Comment:
   nit: Could we avoid reordering fields here? It makes it harder to review.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -360,28 +361,29 @@ private ConsumerGroupMember updateCurrentAssignment(
      *
      * @param memberEpoch               The epoch of the member to use. This 
may be different
      *                                  from the epoch in {@link 
CurrentAssignmentBuilder#member}.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitionsWithEpochs  The assigned partitions with 
epochs of the member to use.
      * @return A new ConsumerGroupMember.
      */
     private ConsumerGroupMember computeNextAssignment(
         int memberEpoch,
-        Map<Uuid, Set<Integer>> memberAssignedPartitions
+        Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
 
         boolean hasUnreleasedPartitions = false;
-        Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new 
HashMap<>();
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs = new 
HashMap<>();
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs = new HashMap<>();
         Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new 
HashMap<>();
 
         Set<Uuid> allTopicIds = new 
HashSet<>(targetAssignment.partitions().keySet());
-        allTopicIds.addAll(memberAssignedPartitions.keySet());
+        allTopicIds.addAll(memberAssignedPartitionsWithEpochs.keySet());
 
         for (Uuid topicId : allTopicIds) {
             Set<Integer> target = targetAssignment.partitions()
                 .getOrDefault(topicId, Set.of());
-            Set<Integer> currentAssignedPartitions = memberAssignedPartitions
-                .getOrDefault(topicId, Set.of());
+            Map<Integer, Integer> currentAssignedPartitionsWithEpochs = 
memberAssignedPartitionsWithEpochs
+                .getOrDefault(topicId, Map.of());
+            Set<Integer> currentAssignedPartitions = 
currentAssignedPartitionsWithEpochs.keySet();

Review Comment:
   ```suggestion
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -336,16 +376,16 @@ public Optional<String> serverAssignorName() {
     }
 
     /**
-     * @return The set of assigned partitions.
+     * @return The epoch-annotated assigned partitions map.

Review Comment:
   ```suggestion
        * @return The partitions assigned to this member and their assignment 
epochs.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########


Review Comment:
   ```
   Map<Integer, Integer> assignedPartitions = new 
HashMap<>(currentAssignedPartitionsWithEpochs);
   assignedPartitions.keySet().retainAll(target);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1045,18 +1045,18 @@ private void maybeRemovePartitionEpoch(
     /**
      * Removes the partition epochs based on the provided assignment.
      *
-     * @param assignment    The assignment.
+     * @param assignmentWithEpochs    The assignment with epochs.
      * @param expectedEpoch The expected epoch.
      * package-private for testing.
      */
     void removePartitionEpochs(
-        Map<Uuid, Set<Integer>> assignment,
+        Map<Uuid, Map<Integer, Integer>> assignmentWithEpochs,
         int expectedEpoch
     ) {
-        assignment.forEach((topicId, assignedPartitions) -> {
+        assignmentWithEpochs.forEach((topicId, partitionEpochs) -> {

Review Comment:
   We are inconsistent in the naming of the `Map<partition, assignment epoch>`. 
Here we call them `partitionEpochs`, but elsewhere we call them 
`partitionEpochMap`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -356,7 +396,37 @@ public static boolean hasAssignedPartitionsChanged(
         ConsumerGroupMember member1,
         ConsumerGroupMember member2
     ) {
-        return 
!member1.assignedPartitions().equals(member2.assignedPartitions());
+        return 
!member1.assignedPartitions.equals(member2.assignedPartitions());
+    }
+
+    /**
+     * Gets the assignment epoch for a specific partition.

Review Comment:
   ```suggestion
        * Gets the assignment epoch for an assigned partition.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1045,18 +1045,18 @@ private void maybeRemovePartitionEpoch(
     /**
      * Removes the partition epochs based on the provided assignment.
      *
-     * @param assignment    The assignment.
+     * @param assignmentWithEpochs    The assignment with epochs.
      * @param expectedEpoch The expected epoch.
      * package-private for testing.

Review Comment:
   The javadoc is now ambiguous. It's not clear whether we use the assignment 
epochs or member epoch.
   
   ```suggestion
        * Removes the partition epochs based on the provided assignment and 
member epoch.
        *
        * @param assignment    The assignment with epochs. The assignment 
epochs are ignored.
        * @param expectedEpoch The expected member epoch.
        * package-private for testing.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -93,6 +93,36 @@ public static String assignmentToString(
         return builder.toString();
     }
 
+    /**
+     * @return The provided assignment with epochs as a String.
+     *
+     * Example:
+     * [topicid1-0@5, topicid1-1@5, topicid2-0@3, topicid2-1@3]
+     */
+    public static String assignmentEpochToString(

Review Comment:
   ```suggestion
       public static String assignmentWithEpochsToString(
   ```



##########
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java:
##########
@@ -79,12 +78,12 @@ public static ConsumerGroupHeartbeatResponse parse(Readable 
readable, short vers
     }
 
     public static ConsumerGroupHeartbeatResponseData.Assignment 
createAssignment(
-        Map<Uuid, Set<Integer>> assignment
+        Map<Uuid, Map<Integer, Integer>> assignmentWithEpochs

Review Comment:
   Could we not rename these everywhere unless absolutely necessary? I don't 
think it helps much with clarity and makes the review harder.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1082,21 +1082,21 @@ void removePartitionEpochs(
     /**
      * Adds the partitions epoch based on the provided assignment.
      *
-     * @param assignment    The assignment.
+     * @param assignmentWithEpochs    The assignment with epochs.
      * @param epoch         The new epoch.
      * @throws IllegalStateException if updating a partition with a smaller or 
equal epoch.
      * package-private for testing.

Review Comment:
   ```suggestion
        * Adds the partitions epoch based on the provided assignment and member 
epoch.
        *
        * @param assignment    The assignment with epochs. The assignment 
epochs are ignored.
        * @param epoch         The new member epoch.
        * @throws IllegalStateException if updating a partition with a smaller 
or equal member epoch.
        * package-private for testing.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1045,18 +1045,18 @@ private void maybeRemovePartitionEpoch(
     /**
      * Removes the partition epochs based on the provided assignment.
      *
-     * @param assignment    The assignment.
+     * @param assignmentWithEpochs    The assignment with epochs.
      * @param expectedEpoch The expected epoch.

Review Comment:
   The renaming has broken the alignment in the javadocs, here and elsewhere.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -62,6 +62,37 @@ public static Map<Uuid, Set<Integer>> 
mkAssignment(Map.Entry<Uuid, Set<Integer>>
         return Collections.unmodifiableMap(assignment);
     }
 
+    /**
+     * Converts a regular assignment to an epochs-based assignment using the 
given epoch.
+     */
+    public static Map<Uuid, Map<Integer, Integer>> toEpochsAssignment(
+        Map<Uuid, Set<Integer>> assignment,
+        int epoch
+    ) {
+        Map<Uuid, Map<Integer, Integer>> result = new LinkedHashMap<>();

Review Comment:
   Is it necessary to use a `LinkedHashMap`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########


Review Comment:
   ```
   Map<Integer, Integer> partitionsPendingRevocation = new 
HashSet<>(currentAssignedPartitionsWithEpochs);
   partitionsPendingRevocation.keySet().removeAll(assignedPartitions.keySet());
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -432,7 +502,21 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
             .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
     }
 
-    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
+    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromEpochMap(

Review Comment:
   ```suggestion
       private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromAssignmentWithEpochs(
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java:
##########
@@ -81,8 +84,8 @@ public void testNewMember() {
         assertEquals(Set.of("bar", "foo"), member.subscribedTopicNames());
         assertEquals("regex", member.subscribedTopicRegex());
         assertEquals("range", member.serverAssignorName().get());
-        assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
-        assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+        
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 
3)), 10), member.assignedPartitions());
+        
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 
6)), 9), member.partitionsPendingRevocation());

Review Comment:
   Could we add asserts for `ConsumerGroupMember.assignmentEpoch` and 
`pendingRevocationEpoch`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static 
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
     }
 
     /**
-     * Creates a map of topic id and partition set from a list of consumer 
group TopicPartitions.
+     * Creates a map of topic id and partition-epoch map from a list of 
consumer group TopicPartitions.
      *
-     * @param topicPartitionsList   The list of TopicPartitions.
-     * @return a map of topic id and partition set.
+     * @param topicPartitions The list of TopicPartitions.
+     * @param defaultEpoch The default epoch to use when the epoch information 
is not available for a partition.
+     * @return a map of topic id and partition-epoch map.
      */
-    public static Map<Uuid, Set<Integer>> assignmentFromTopicPartitions(
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitionsList
+    public static Map<Uuid, Map<Integer, Integer>> 
assignmentFromTopicPartitions(
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitions,
+        int defaultEpoch
     ) {
-        return topicPartitionsList.stream().collect(Collectors.toMap(
-            ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId,
-            topicPartitions -> Collections.unmodifiableSet(new 
HashSet<>(topicPartitions.partitions()))));
+        // For legacy static member, the defaultEpoch could be 
-2(LEAVE_GROUP_STATIC_MEMBER_EPOCH).

Review Comment:
   ```suggestion
           // For legacy static member, the defaultEpoch could be -2 
(LEAVE_GROUP_STATIC_MEMBER_EPOCH).
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java:
##########
@@ -243,8 +246,35 @@ public void 
testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
 
         assertEquals(10, member.memberEpoch());
         assertEquals(9, member.previousMemberEpoch());
-        assertEquals(mkAssignment(mkTopicAssignment(topicId1, 0, 1, 2)), 
member.assignedPartitions());
-        assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), 
member.partitionsPendingRevocation());
+        
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId1, 0, 1, 
2)), 10), member.assignedPartitions());
+        
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 
5)), 10), member.partitionsPendingRevocation());
+    }
+
+    @Test
+    public void 
testUpdateWithConsumerGroupCurrentMemberAssignmentValueWithNegativeEpoch() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+
+        ConsumerGroupCurrentMemberAssignmentValue record = new 
ConsumerGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) // -2
+            .setPreviousMemberEpoch(5)
+            .setAssignedPartitions(List.of(new 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId1)
+                .setPartitions(Arrays.asList(0, 1, 2))))
+            .setPartitionsPendingRevocation(List.of(new 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId2)
+                .setPartitions(Arrays.asList(3, 4, 5))));
+
+        ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+            .updateWith(record)
+            .build();
+
+        assertEquals(-2, member.memberEpoch());
+        assertEquals(5, member.previousMemberEpoch());
+
+        // Partition epoch should be 0, not -2.

Review Comment:
   ```suggestion
           // Assignment epochs should be 0, not -2.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -404,47 +406,64 @@ private ConsumerGroupMember computeNextAssignment(
                 // Don't consider a partition unreleased if it is owned by the 
current member
                 // because it is pending revocation. This is safe to do since 
only a single member
                 // can own a partition at a time.
-                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
+                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Map.of()).containsKey(partitionId)
             ) || hasUnreleasedPartitions;
 
+            // Build epochs map for assigned partitions, preserve existing 
epochs
             if (!assignedPartitions.isEmpty()) {
-                newAssignedPartitions.put(topicId, assignedPartitions);
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : assignedPartitions) {
+                    partitionEpochs.put(partitionId, 
currentAssignedPartitionsWithEpochs.get(partitionId));
+                }
+                newAssignedPartitionsWithEpochs.put(topicId, partitionEpochs);
             }
 
+            // Build epochs map for partitions pending revocation, preserve 
existing epochs
             if (!partitionsPendingRevocation.isEmpty()) {
-                newPartitionsPendingRevocation.put(topicId, 
partitionsPendingRevocation);
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : partitionsPendingRevocation) {
+                    partitionEpochs.put(partitionId, 
currentAssignedPartitionsWithEpochs.get(partitionId));
+                }
+                newPartitionsPendingRevocationWithEpochs.put(topicId, 
partitionEpochs);
             }
 
             if (!partitionsPendingAssignment.isEmpty()) {
                 newPartitionsPendingAssignment.put(topicId, 
partitionsPendingAssignment);
             }
         }
 
-        if (!newPartitionsPendingRevocation.isEmpty() && 
ownsRevokedPartitions(newPartitionsPendingRevocation)) {
+        if (!newPartitionsPendingRevocationWithEpochs.isEmpty() && 
ownsRevokedPartitions(newPartitionsPendingRevocationWithEpochs)) {
             // If there are partitions to be revoked, the member remains in 
its current
             // epoch and requests the revocation of those partitions. It 
transitions to
             // the UNREVOKED_PARTITIONS state to wait until the client 
acknowledges the
             // revocation of the partitions.
             return new ConsumerGroupMember.Builder(member)
                 .setState(MemberState.UNREVOKED_PARTITIONS)
                 .updateMemberEpoch(memberEpoch)
-                .setAssignedPartitions(newAssignedPartitions)
-                .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
+                .setAssignedPartitions(newAssignedPartitionsWithEpochs)
+                
.setPartitionsPendingRevocation(newPartitionsPendingRevocationWithEpochs)
                 .build();
         } else if (!newPartitionsPendingAssignment.isEmpty()) {
             // If there are partitions to be assigned, the member transitions 
to the
             // target epoch and requests the assignment of those partitions. 
Note that
             // the partitions are directly added to the assigned partitions 
set. The
             // member transitions to the STABLE state or to the 
UNRELEASED_PARTITIONS
             // state depending on whether there are unreleased partitions or 
not.
-            newPartitionsPendingAssignment.forEach((topicId, partitions) -> 
newAssignedPartitions
-                .computeIfAbsent(topicId, __ -> new HashSet<>())
-                .addAll(partitions));
+            // Add newly assigned partitions, preserving original epoch if 
partition was pending revocation
+            newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
+                Map<Integer, Integer> topicEpochs = 
newAssignedPartitionsWithEpochs
+                    .computeIfAbsent(topicId, __ -> new HashMap<>());
+                Map<Integer, Integer> pendingRevocationEpochs = 
member.partitionsPendingRevocation()
+                    .getOrDefault(topicId, Map.of());
+                for (Integer partitionId : partitions) {
+                    topicEpochs.put(partitionId, 
pendingRevocationEpochs.getOrDefault(partitionId, targetAssignmentEpoch));
+                }
+            });

Review Comment:
   We cannot reach `computeNextAssignment` unless the member has revoked all of 
its partitions pending revocation. I don't think it's reasonable for it to 
perform an offset commit with an older epoch after revocation?
   
   ```suggestion
               newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
                   Map<Integer, Integer> topicEpochs = 
newAssignedPartitionsWithEpochs
                       .computeIfAbsent(topicId, __ -> new HashMap<>());
                   for (Integer partitionId : partitions) {
                       topicEpochs.put(partitionId, targetAssignmentEpoch);
                   }
               });
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -404,47 +406,64 @@ private ConsumerGroupMember computeNextAssignment(
                 // Don't consider a partition unreleased if it is owned by the 
current member
                 // because it is pending revocation. This is safe to do since 
only a single member
                 // can own a partition at a time.
-                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
+                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Map.of()).containsKey(partitionId)
             ) || hasUnreleasedPartitions;
 
+            // Build epochs map for assigned partitions, preserve existing 
epochs
             if (!assignedPartitions.isEmpty()) {
-                newAssignedPartitions.put(topicId, assignedPartitions);
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : assignedPartitions) {
+                    partitionEpochs.put(partitionId, 
currentAssignedPartitionsWithEpochs.get(partitionId));
+                }
+                newAssignedPartitionsWithEpochs.put(topicId, partitionEpochs);
             }

Review Comment:
   ```suggestion
               if (!assignedPartitions.isEmpty()) {
                   newAssignedPartitions.put(topicId, assignedPartitions);
               }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -404,47 +406,64 @@ private ConsumerGroupMember computeNextAssignment(
                 // Don't consider a partition unreleased if it is owned by the 
current member
                 // because it is pending revocation. This is safe to do since 
only a single member
                 // can own a partition at a time.
-                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
+                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Map.of()).containsKey(partitionId)
             ) || hasUnreleasedPartitions;
 
+            // Build epochs map for assigned partitions, preserve existing 
epochs
             if (!assignedPartitions.isEmpty()) {
-                newAssignedPartitions.put(topicId, assignedPartitions);
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : assignedPartitions) {
+                    partitionEpochs.put(partitionId, 
currentAssignedPartitionsWithEpochs.get(partitionId));
+                }
+                newAssignedPartitionsWithEpochs.put(topicId, partitionEpochs);
             }
 
+            // Build epochs map for partitions pending revocation, preserve 
existing epochs
             if (!partitionsPendingRevocation.isEmpty()) {
-                newPartitionsPendingRevocation.put(topicId, 
partitionsPendingRevocation);
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : partitionsPendingRevocation) {
+                    partitionEpochs.put(partitionId, 
currentAssignedPartitionsWithEpochs.get(partitionId));
+                }
+                newPartitionsPendingRevocationWithEpochs.put(topicId, 
partitionEpochs);
             }

Review Comment:
   ```suggestion
               if (!partitionsPendingRevocation.isEmpty()) {
                   newPartitionsPendingRevocation.put(topicId, 
partitionsPendingRevocation);
               }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
      * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
      *
      * @param memberEpoch               The epoch of the member to use.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitionsWithEpochs  The assigned partitions with 
epochs of the member to use.

Review Comment:
   "epochs of the member" reads too much like "member epoch"
   
   ```suggestion
        * @param memberAssignedPartitionsWithEpochs  The assigned partitions of 
the member to use and their assignment epochs.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -360,28 +361,29 @@ private ConsumerGroupMember updateCurrentAssignment(
      *
      * @param memberEpoch               The epoch of the member to use. This 
may be different
      *                                  from the epoch in {@link 
CurrentAssignmentBuilder#member}.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitionsWithEpochs  The assigned partitions with 
epochs of the member to use.

Review Comment:
   ```suggestion
        * @param memberAssignedPartitionsWithEpochs  The assigned partitions of 
the member to use and their assignment epochs.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static 
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
     }
 
     /**
-     * Creates a map of topic id and partition set from a list of consumer 
group TopicPartitions.
+     * Creates a map of topic id and partition-epoch map from a list of 
consumer group TopicPartitions.
      *
-     * @param topicPartitionsList   The list of TopicPartitions.
-     * @return a map of topic id and partition set.
+     * @param topicPartitions The list of TopicPartitions.
+     * @param defaultEpoch The default epoch to use when the epoch information 
is not available for a partition.
+     * @return a map of topic id and partition-epoch map.

Review Comment:
   ```suggestion
        * @return a map of topic id and partitions with assignment epochs.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static 
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
     }
 
     /**
-     * Creates a map of topic id and partition set from a list of consumer 
group TopicPartitions.
+     * Creates a map of topic id and partition-epoch map from a list of 
consumer group TopicPartitions.

Review Comment:
   ```suggestion
        * Creates a map of topic id and partition with assignment epochs from a 
list of consumer group TopicPartitions.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,21 +1414,21 @@ private void removeGroup(
      * it owns any other partitions.
      *
      * @param ownedTopicPartitions  The partitions provided by the consumer in 
the request.
-     * @param target                The partitions that the member should have.
+     * @param target                The partitions with epochs that the member 
should have.

Review Comment:
   ```suggestion
        * @param target                The partitions that the member should 
have with assignment epochs.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3561,14 +3573,18 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
             result.response()
         );
 
+        // member2: partition 3 (fooTopicId) and 2 (barTopicId) were retained 
from epoch 10,
+        // partition 2 (fooTopicId) is newly assigned at epoch 11
+        Map<Uuid, Map<Integer, Integer>> member2ExpectedAssignment = Map.of(
+            fooTopicId, new TreeMap<>(Map.of(2, 11, 3, 10)),

Review Comment:
   Why do we need `TreeMap`s now? They weren't there before.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -199,10 +200,10 @@ public void testUpdatingMemberUpdatesPartitionEpoch() {
 
         member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
-            .setAssignedPartitions(mkAssignment(
-                mkTopicAssignment(fooTopicId, 1, 2, 3)))
-            .setPartitionsPendingRevocation(mkAssignment(
-                mkTopicAssignment(barTopicId, 4, 5, 6)))
+            .setAssignedPartitions(toEpochsAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1, 2, 3)), 10))
+            .setPartitionsPendingRevocation(toEpochsAssignment(mkAssignment(
+                mkTopicAssignment(barTopicId, 4, 5, 6)), 10))

Review Comment:
   For these partition epoch tests, can we choose an older assignment epoch? 
They should fail if the implementation uses the assignment epoch instead of the 
member epoch.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -57,9 +58,9 @@ public void testStableToStable() {
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
             .setSubscribedTopicNames(List.of(topic1, topic2))
-            .setAssignedPartitions(mkAssignment(
+            .setAssignedPartitions(toEpochsAssignment(mkAssignment(
                 mkTopicAssignment(topicId1, 1, 2, 3),
-                mkTopicAssignment(topicId2, 4, 5, 6)))

Review Comment:
   For these tests, could we pick an older assignment epoch, to confirm that 
the implementation is not resetting them to the latest member epoch?



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java:
##########


Review Comment:
   ```
               .setMemberEpoch(memberEpoch)
               .setPreviousMemberEpoch(memberEpoch)
   ```



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java:
##########
@@ -131,12 +131,16 @@ public static GroupSpec createConsumerGroupSpec(
         for (Map.Entry<String, ConsumerGroupMember> memberEntry : 
members.entrySet()) {
             String memberId = memberEntry.getKey();
             ConsumerGroupMember member = memberEntry.getValue();
+            Map<Uuid, Set<Integer>> partitions = new HashMap<>();
+            member.assignedPartitions().forEach((topicId, partitionEpochMap) ->
+                partitions.put(topicId, partitionEpochMap.keySet())
+            );
 
             memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
                 Optional.ofNullable(member.rackId()),
                 Optional.ofNullable(member.instanceId()),
                 new TopicIds(member.subscribedTopicNames(), topicResolver),
-                new Assignment(member.assignedPartitions())
+                new Assignment(partitions)

Review Comment:
   ```suggestion
                   new 
Assignment(AssignmentTestUtil.toPartitionMap(member.assignedPartitions()))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -62,6 +62,37 @@ public static Map<Uuid, Set<Integer>> 
mkAssignment(Map.Entry<Uuid, Set<Integer>>
         return Collections.unmodifiableMap(assignment);
     }
 
+    /**
+     * Converts a regular assignment to an epochs-based assignment using the 
given epoch.
+     */
+    public static Map<Uuid, Map<Integer, Integer>> toEpochsAssignment(

Review Comment:
   ```suggestion
       /**
        * Adds the given assignment epoch to an assignment without epochs.
        */
       public static Map<Uuid, Map<Integer, Integer>> toAssignmentWithEpochs(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to