This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c75e10d2292 KAFKA-20424 : clients: Update KafkaConsumerTest 
comments,tests with relevant protocol (#22144)
c75e10d2292 is described below

commit c75e10d2292a4749787f4359457c9671da1d3ca3
Author: Murali Basani <[email protected]>
AuthorDate: Tue Apr 28 17:45:56 2026 +0200

    KAFKA-20424 : clients: Update KafkaConsumerTest comments,tests with 
relevant protocol (#22144)
    
    Ref : https://issues.apache.org/jira/browse/KAFKA-20424
    
    - 18 tests with updated comments (classic only)
    - 1 test (testAutoCommitSentBeforePositionUpdate) now runs for both
    protocols.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 59 ++++++++--------------
 1 file changed, 20 insertions(+), 39 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index da4a799fa5e..89e79eda5c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -485,8 +485,7 @@ public class KafkaConsumerTest {
         assertInstanceOf(ClientTelemetryReporter.class, 
consumer.metricsRegistry().reporters().get(0));
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     @SuppressWarnings("unchecked")
@@ -502,8 +501,7 @@ public class KafkaConsumerTest {
         assertEquals(new OffsetAndMetadata(5), records.nextOffsets().get(tp0));
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     @SuppressWarnings("unchecked")
@@ -790,6 +788,7 @@ public class KafkaConsumerTest {
             () -> consumer.subscribe(Pattern.compile("")));
     }
 
+    // NOTE: this test configures partition.assignment.strategy, which only 
applies to the CLASSIC group protocol.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol 
groupProtocol) {
@@ -968,8 +967,7 @@ public class KafkaConsumerTest {
         return newConsumer(propsToMap(props), keyDeserializer, 
valueDeserializer);
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: this test exercises the Heartbeat RPC, which does not exist in 
the CONSUMER group protocol.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws 
Exception {
@@ -1001,8 +999,7 @@ public class KafkaConsumerTest {
         assertTrue(heartbeatReceived.get());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: this test exercises the Heartbeat RPC, which does not exist in 
the CONSUMER group protocol.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol 
groupProtocol) throws Exception {
@@ -1410,10 +1407,8 @@ public class KafkaConsumerTest {
         assertNull(committed.get(tp1));
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testAutoCommitSentBeforePositionUpdate(GroupProtocol 
groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -1444,8 +1439,7 @@ public class KafkaConsumerTest {
         assertTrue(commitReceived.get());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testRegexSubscription(GroupProtocol groupProtocol) {
@@ -1473,8 +1467,7 @@ public class KafkaConsumerTest {
         assertEquals(Set.of(tp0), consumer.assignment());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
@@ -1510,8 +1503,7 @@ public class KafkaConsumerTest {
         assertEquals(Set.of(otherTopic), consumer.subscription());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) 
throws Exception {
@@ -1614,8 +1606,7 @@ public class KafkaConsumerTest {
      * Upon unsubscribing from subscribed topics the consumer subscription and 
assignment
      * are both updated right away but its consumed offsets are not auto 
committed.
      */
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     @SuppressWarnings("unchecked")
@@ -1733,8 +1724,7 @@ public class KafkaConsumerTest {
      * Upon unsubscribing from subscribed topics, the assigned partitions 
immediately
      * change but if auto-commit is disabled the consumer offsets are not 
committed.
      */
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol 
groupProtocol) {
@@ -1852,8 +1842,7 @@ public class KafkaConsumerTest {
         client.requests().clear();
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void 
testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(GroupProtocol 
groupProtocol) {
@@ -1878,8 +1867,7 @@ public class KafkaConsumerTest {
         assertEquals(partitionRevoked + singleTopicPartition, 
unsubscribeException.getCause().getMessage());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void 
testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProtocol 
groupProtocol) throws Exception {
@@ -2138,16 +2126,14 @@ public class KafkaConsumerTest {
         consumerCloseTest(groupProtocol, closeTimeoutMs, 
serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: this test drives consumerCloseTest, whose close/rebalance mock 
setup is Classic-specific.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testCloseTimeout(GroupProtocol groupProtocol) throws Exception 
{
         consumerCloseTest(groupProtocol, 5000, Collections.emptyList(), 5000, 
false);
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: this test drives consumerCloseTest with an OffsetCommit + 
LeaveGroup flow that is Classic-specific.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws 
Exception {
@@ -2157,16 +2143,14 @@ public class KafkaConsumerTest {
         consumerCloseTest(groupProtocol, 5000, List.of(commitResponse), 5000, 
false);
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: this test drives consumerCloseTest, whose close/rebalance mock 
setup is Classic-specific.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testCloseNoWait(GroupProtocol groupProtocol) throws Exception {
         consumerCloseTest(groupProtocol, 0, Collections.emptyList(), 0, false);
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: this test drives consumerCloseTest, whose close/rebalance mock 
setup is Classic-specific.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testCloseInterrupt(GroupProtocol groupProtocol) throws 
Exception {
@@ -2600,8 +2584,7 @@ public class KafkaConsumerTest {
         assertTrue((Double) metric.metricValue() >= 
Duration.ofMillis(999).toNanos());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testRebalanceException(GroupProtocol groupProtocol) {
@@ -2637,8 +2620,7 @@ public class KafkaConsumerTest {
         assertTrue(subscription.assignedPartitions().isEmpty());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) 
throws InterruptedException {
@@ -2779,8 +2761,7 @@ public class KafkaConsumerTest {
         consumer.close(CloseOptions.timeout(Duration.ZERO));
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
+    // NOTE: the assertion path is specific to the CLASSIC consumer.
     @ParameterizedTest
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testGetGroupMetadata(GroupProtocol groupProtocol) {

Reply via email to