This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d8a90a47b5 KAFKA-20112: Fix flaky consumer callback/position test
(#21746)
3d8a90a47b5 is described below
commit 3d8a90a47b5ed3c4305d84e5495eca66ed3060d0
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Mar 13 14:11:49 2026 -0400
KAFKA-20112: Fix flaky consumer callback/position test (#21746)
Was flaky on trunk, I expect that because the test relies on checking
position on the revocation callback, but was only waiting for a
rebalance to happen before moving to a next rebalance or close, which is
no guarantee that the positions will be initialized (e.g. partition
assigned then revoked again too fast that it doesn't have enough time to
get a valid position)
Fix by changing the test to wait for at least a record consume (this
ensures that we got the assignment and that it has position)
Reviewers: Andrew Schofield <[email protected]>
---
.../clients/consumer/PlaintextConsumerPollTest.java | 18 +++++++++++++-----
1 file changed, 13 insertions(+), 5 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
index 3383049465e..ef5df01cf51 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
@@ -195,6 +195,11 @@ public class PlaintextConsumerPollTest {
var commitCompleted = new AtomicBoolean(false);
var committedPosition = new AtomicLong(-1);
+ String otherTopic = "otherTopic";
+ TopicPartition tpOther = new TopicPartition(otherTopic, 0);
+ sendRecords(cluster, tp, 1);
+ sendRecords(cluster, tpOther, 1);
+
try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
var listener = new TestConsumerReassignmentListener() {
@Override
@@ -219,14 +224,17 @@ public class PlaintextConsumerPollTest {
};
consumer.subscribe(List.of(topic), listener);
- // rebalance to get the initial assignment
- awaitRebalance(consumer, listener);
+ // Consume records to ensure the rebalance completed and positions
are initialized
+ // (position then used in callback, triggered on next rebalance)
+ awaitNonEmptyRecords(consumer, tp, 100);
// force a rebalance to trigger an invocation of the revocation
callback while in the group
- consumer.subscribe(List.of("otherTopic"), listener);
- awaitRebalance(consumer, listener);
+ consumer.subscribe(List.of(otherTopic), listener);
+ // Consume records to ensure positions for otherTopic are
initialized
+ // (position then used in callback, triggered on close)
+ awaitNonEmptyRecords(consumer, tpOther, 100);
- assertEquals(0, committedPosition.get());
+ assertEquals(1, committedPosition.get());
assertTrue(commitCompleted.get());
}
}