This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d8a90a47b5 KAFKA-20112: Fix flaky consumer callback/position test 
(#21746)
3d8a90a47b5 is described below

commit 3d8a90a47b5ed3c4305d84e5495eca66ed3060d0
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Mar 13 14:11:49 2026 -0400

    KAFKA-20112: Fix flaky consumer callback/position test (#21746)
    
    Was flaky on trunk, I expect that because the test relies on checking
    position on the revocation callback, but was only waiting for a
    rebalance to happen before moving to a next rebalance or close, which is
    no guarantee that the positions will be initialized (e.g. partition
    assigned then revoked again too fast that it doesn't have enough time to
    get a valid position)
    
    Fix by changing the test to wait for at least a record consume (this
    ensures that we got the assignment and that it has position)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../clients/consumer/PlaintextConsumerPollTest.java    | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
index 3383049465e..ef5df01cf51 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java
@@ -195,6 +195,11 @@ public class PlaintextConsumerPollTest {
         var commitCompleted = new AtomicBoolean(false);
         var committedPosition = new AtomicLong(-1);
 
+        String otherTopic = "otherTopic";
+        TopicPartition tpOther = new TopicPartition(otherTopic, 0);
+        sendRecords(cluster, tp, 1);
+        sendRecords(cluster, tpOther, 1);
+
         try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
             var listener = new TestConsumerReassignmentListener() {
                 @Override
@@ -219,14 +224,17 @@ public class PlaintextConsumerPollTest {
             };
             consumer.subscribe(List.of(topic), listener);
 
-            // rebalance to get the initial assignment
-            awaitRebalance(consumer, listener);
+            // Consume records to ensure the rebalance completed and positions 
are initialized
+            // (position then used in callback, triggered on next rebalance)
+            awaitNonEmptyRecords(consumer, tp, 100);
 
             // force a rebalance to trigger an invocation of the revocation 
callback while in the group
-            consumer.subscribe(List.of("otherTopic"), listener);
-            awaitRebalance(consumer, listener);
+            consumer.subscribe(List.of(otherTopic), listener);
+            // Consume records to ensure positions for otherTopic are 
initialized
+            // (position then used in callback, triggered on close)
+            awaitNonEmptyRecords(consumer, tpOther, 100);
 
-            assertEquals(0, committedPosition.get());
+            assertEquals(1, committedPosition.get());
             assertTrue(commitCompleted.get());
         }
     }

Reply via email to