This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c75e10d2292 KAFKA-20424 : clients: Update KafkaConsumerTest
comments,tests with relevant protocol (#22144)
c75e10d2292 is described below
commit c75e10d2292a4749787f4359457c9671da1d3ca3
Author: Murali Basani <[email protected]>
AuthorDate: Tue Apr 28 17:45:56 2026 +0200
KAFKA-20424 : clients: Update KafkaConsumerTest comments,tests with
relevant protocol (#22144)
Ref : https://issues.apache.org/jira/browse/KAFKA-20424
- 18 tests with updated comments (classic only)
- 1 test (testAutoCommitSentBeforePositionUpdate) now runs for both
protocols.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumerTest.java | 59 ++++++++--------------
1 file changed, 20 insertions(+), 39 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index da4a799fa5e..89e79eda5c6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -485,8 +485,7 @@ public class KafkaConsumerTest {
assertInstanceOf(ClientTelemetryReporter.class,
consumer.metricsRegistry().reporters().get(0));
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
@SuppressWarnings("unchecked")
@@ -502,8 +501,7 @@ public class KafkaConsumerTest {
assertEquals(new OffsetAndMetadata(5), records.nextOffsets().get(tp0));
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
@SuppressWarnings("unchecked")
@@ -790,6 +788,7 @@ public class KafkaConsumerTest {
() -> consumer.subscribe(Pattern.compile("")));
}
+ // NOTE: this test configures partition.assignment.strategy, which only
applies to the CLASSIC group protocol.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol
groupProtocol) {
@@ -968,8 +967,7 @@ public class KafkaConsumerTest {
return newConsumer(propsToMap(props), keyDeserializer,
valueDeserializer);
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: this test exercises the Heartbeat RPC, which does not exist in
the CONSUMER group protocol.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws
Exception {
@@ -1001,8 +999,7 @@ public class KafkaConsumerTest {
assertTrue(heartbeatReceived.get());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: this test exercises the Heartbeat RPC, which does not exist in
the CONSUMER group protocol.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol
groupProtocol) throws Exception {
@@ -1410,10 +1407,8 @@ public class KafkaConsumerTest {
assertNull(committed.get(tp1));
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testAutoCommitSentBeforePositionUpdate(GroupProtocol
groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1444,8 +1439,7 @@ public class KafkaConsumerTest {
assertTrue(commitReceived.get());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testRegexSubscription(GroupProtocol groupProtocol) {
@@ -1473,8 +1467,7 @@ public class KafkaConsumerTest {
assertEquals(Set.of(tp0), consumer.assignment());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
@@ -1510,8 +1503,7 @@ public class KafkaConsumerTest {
assertEquals(Set.of(otherTopic), consumer.subscription());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol)
throws Exception {
@@ -1614,8 +1606,7 @@ public class KafkaConsumerTest {
* Upon unsubscribing from subscribed topics the consumer subscription and
assignment
* are both updated right away but its consumed offsets are not auto
committed.
*/
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
@SuppressWarnings("unchecked")
@@ -1733,8 +1724,7 @@ public class KafkaConsumerTest {
* Upon unsubscribing from subscribed topics, the assigned partitions
immediately
* change but if auto-commit is disabled the consumer offsets are not
committed.
*/
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol
groupProtocol) {
@@ -1852,8 +1842,7 @@ public class KafkaConsumerTest {
client.requests().clear();
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void
testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(GroupProtocol
groupProtocol) {
@@ -1878,8 +1867,7 @@ public class KafkaConsumerTest {
assertEquals(partitionRevoked + singleTopicPartition,
unsubscribeException.getCause().getMessage());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void
testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProtocol
groupProtocol) throws Exception {
@@ -2138,16 +2126,14 @@ public class KafkaConsumerTest {
consumerCloseTest(groupProtocol, closeTimeoutMs,
serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: this test drives consumerCloseTest, whose close/rebalance mock
setup is Classic-specific.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCloseTimeout(GroupProtocol groupProtocol) throws Exception
{
consumerCloseTest(groupProtocol, 5000, Collections.emptyList(), 5000,
false);
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: this test drives consumerCloseTest with an OffsetCommit +
LeaveGroup flow that is Classic-specific.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws
Exception {
@@ -2157,16 +2143,14 @@ public class KafkaConsumerTest {
consumerCloseTest(groupProtocol, 5000, List.of(commitResponse), 5000,
false);
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: this test drives consumerCloseTest, whose close/rebalance mock
setup is Classic-specific.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCloseNoWait(GroupProtocol groupProtocol) throws Exception {
consumerCloseTest(groupProtocol, 0, Collections.emptyList(), 0, false);
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: this test drives consumerCloseTest, whose close/rebalance mock
setup is Classic-specific.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCloseInterrupt(GroupProtocol groupProtocol) throws
Exception {
@@ -2600,8 +2584,7 @@ public class KafkaConsumerTest {
assertTrue((Double) metric.metricValue() >=
Duration.ofMillis(999).toNanos());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testRebalanceException(GroupProtocol groupProtocol) {
@@ -2637,8 +2620,7 @@ public class KafkaConsumerTest {
assertTrue(subscription.assignedPartitions().isEmpty());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol)
throws InterruptedException {
@@ -2779,8 +2761,7 @@ public class KafkaConsumerTest {
consumer.close(CloseOptions.timeout(Duration.ZERO));
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
+ // NOTE: the assertion path is specific to the CLASSIC consumer.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testGetGroupMetadata(GroupProtocol groupProtocol) {