squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2853281635


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
      * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
      *
      * @param memberEpoch               The epoch of the member to use.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitionsWithEpochs  The assigned partitions with 
epochs of the member to use.
      * @return A new ConsumerGroupMember.
      */
     private ConsumerGroupMember updateCurrentAssignment(
         int memberEpoch,
-        Map<Uuid, Set<Integer>> memberAssignedPartitions
+        Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
 
         // Reuse the original map if no topics need to be removed.
-        Map<Uuid, Set<Integer>> newAssignedPartitions;
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs;
+
         if (subscribedTopicIds.isEmpty() && 
member.partitionsPendingRevocation().isEmpty()) {
-            newAssignedPartitions = Map.of();
-            newPartitionsPendingRevocation = memberAssignedPartitions;
+            newAssignedPartitionsWithEpochs = Map.of();
+            // Move all assigned to pending revocation with their epochs
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.assignedPartitions());
         } else {
-            newAssignedPartitions = memberAssignedPartitions;
-            newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
-            for (Map.Entry<Uuid, Set<Integer>> entry : 
memberAssignedPartitions.entrySet()) {
+            newAssignedPartitionsWithEpochs = new 
HashMap<>(member.assignedPartitions());
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.partitionsPendingRevocation());
+            for (Map.Entry<Uuid, Map<Integer, Integer>> entry : 
memberAssignedPartitionsWithEpochs.entrySet()) {
                 if (!subscribedTopicIds.contains(entry.getKey())) {
-                    if (newAssignedPartitions == memberAssignedPartitions) {
-                        newAssignedPartitions = new 
HashMap<>(memberAssignedPartitions);
-                        newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());

Review Comment:
   I can't remember. It looks like an oversight to me. Let's fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to