lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526716168


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() {
         assertEquals(toTopicIdPartitionMap(assignment1), 
membershipManager.currentAssignment().partitions);
     }
 
+    /**
+     * This is the case where we receive a new assignment while reconciling an 
existing one. The intermediate assignment
+     * is not applied, and a new assignment containing the same partitions is 
received and reconciled. In all assignments,
+     * one topic is not resolvable.
+     *
+     * We need to make sure that the last assignment is acked and applied, 
even though the set of partitions does not change.
+     * In this case, no rebalance listeners are run.
+     */
+    @Test
+    public void testSameAssignmentReconciledAgainWithMissingTopic() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topic1 = Uuid.randomUuid();
+        Uuid topic2 = Uuid.randomUuid();
+        final Assignment assignment1 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+            .setTopicPartitions(Arrays.asList(
+                new 
TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)),
+                new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+            ));
+        final Assignment assignment2 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+            .setTopicPartitions(Arrays.asList(
+                new 
TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)),
+                new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+            ));
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
"topic1"));
+
+        // Receive assignment - full reconciliation triggered
+        // stay in RECONCILING state, since an unresolved topic is assigned
+        
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        membershipManager.poll(time.milliseconds());
+        verifyReconciliationTriggeredAndCompleted(membershipManager,
+            Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
+        );
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        clearInvocations(membershipManager);
+
+        // Receive extended assignment - assignment received but no 
reconciliation triggered
+        
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        verifyReconciliationNotTriggered(membershipManager);

Review Comment:
   Exactly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to