This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 90a4c939790 MINOR: Fix testRackAwareAssignment flake (#22154)
90a4c939790 is described below

commit 90a4c9397902044935ac566e2cf4b828bdb1d36d
Author: Sean Quah <[email protected]>
AuthorDate: Tue Apr 28 15:16:28 2026 +0100

    MINOR: Fix testRackAwareAssignment flake (#22154)
    
    The last part of testRackAwareAssignment was found to be flaky. This
    part moves all topic partitions to different racks and waits for
    consumer assignments to settle. Each of the three consumers is expected
    to revoke all its partitions and be assigned partitions previously held
    by another within a 15 second timeout.
    
    This timeout is not always sufficient. The consumer heartbeat interval
    is left at the default of 5,000 ms and each consumer polls every
    3,000 ms. In the worst case, it takes a consumer around 7,000 ms to
    reconcile an assignment change. An additional 3,000 ms round of polling
    may be required when a consumer needs to auto-commit offsets. Two rounds
    of reconciliation must happen within 15,000 ms.
    
    The timeline of an example failing run looks like:
    
    -02.956 Group coordinator computes target assignment at epoch=6
            consumer0=[0] consumer1=[1, 2] consumer2=[3, 4, 5]
    +00.000 15 second timeout starts
    
    +03.179 consumer0 heartbeats          This is the first heartbeat since
    the rack reassignments.  +03.179 Group coordinator computes target
    assignment at epoch=7          consumer0=[5] consumer1=[3, 4]
    consumer2=[0, 1, 2]  +03.186 consumer0 heartbeat receives assignment []
    
    +04.151 consumer1 starts poll()  +04.877 consumer1 heartbeats  +04.878
    consumer1 heartbeat receives assignment []  +05.155 consumer1 ends
    poll()
    
    +07.259 consumer1 starts poll()  +07.259     consumer1 sends auto-commit
    with offsets for [1, 2]  +07.288     consumer1 receives auto-commit
    response  +08.263 consumer1 ends poll()
    
    +10.379 consumer1 starts poll()  +10.379     consumer1 calls
    onPartitionsRevoked with [1, 2]  +10.379     consumer1 calls
    onPartitionsAssigned with []  +10.382     consumer1 heartbeats with
    owned partitions []  +10.387     consumer1 heartbeat receives assignment
    [3, 4]  +10.483     consumer1 calls onPartitionsAssigned [3, 4]  +10.483
    consumer1 heartbeats with owned partitions [3, 4]  +11.384 consumer1
    ends poll()
    
    +15.000 15 second timeout elapses and the test fails  +15.300 consumer2
    heartbeat receives assignment [0, 1, 2]
    
    To fix the test we:
    * Make config changes to reduce the reconciliation time. This also
      reduces the test duration from 60 seconds to 20 seconds.
      * Disable auto-commit, since the consumers do not consume any records.
      * Reduce the heartbeat interval to 1,000 ms.
      * Reduce the poll timeouts to 100 ms, so that polls happen every
        300 ms.
    * Raise the final timeout to 30 seconds, since under heavy CI load, the
      reduced intervals above aren't effective.
    
    Reviewers: Lianet Magrans <[email protected]>, David Jacot
    <[email protected]>
---
 .../clients/consumer/ConsumerIntegrationTest.java  | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 16e38967817..d10dddde472 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -240,6 +240,8 @@ public class ConsumerIntegrationTest {
                 @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
                 @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
                 @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
             }
@@ -251,6 +253,8 @@ public class ConsumerIntegrationTest {
                 @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
                 @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
                 @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"1000")
             }
@@ -263,16 +267,19 @@ public class ConsumerIntegrationTest {
              Consumer<byte[], byte[]> consumer0 = 
clusterInstance.consumer(Map.of(
                  ConsumerConfig.GROUP_ID_CONFIG, "group0",
                  ConsumerConfig.CLIENT_RACK_CONFIG, "rack0",
+                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
                  ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()
              ));
              Consumer<byte[], byte[]> consumer1 = 
clusterInstance.consumer(Map.of(
                  ConsumerConfig.GROUP_ID_CONFIG, "group0",
                  ConsumerConfig.CLIENT_RACK_CONFIG, "rack1",
+                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
                  ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()
              ));
              Consumer<byte[], byte[]> consumer2 = 
clusterInstance.consumer(Map.of(
                  ConsumerConfig.GROUP_ID_CONFIG, "group0",
                  ConsumerConfig.CLIENT_RACK_CONFIG, "rack2",
+                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
                  ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()
              ))
         ) {
@@ -288,9 +295,9 @@ public class ConsumerIntegrationTest {
             consumer2.subscribe(List.of(topic));
 
             TestUtils.waitForCondition(() -> {
-                consumer0.poll(Duration.ofMillis(1000));
-                consumer1.poll(Duration.ofMillis(1000));
-                consumer2.poll(Duration.ofMillis(1000));
+                consumer0.poll(Duration.ofMillis(100));
+                consumer1.poll(Duration.ofMillis(100));
+                consumer2.poll(Duration.ofMillis(100));
                 return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 0))) &&
                     consumer1.assignment().isEmpty() &&
                     consumer2.assignment().isEmpty();
@@ -305,9 +312,9 @@ public class ConsumerIntegrationTest {
             );
             clusterInstance.waitTopicCreation(topic, 3);
             TestUtils.waitForCondition(() -> {
-                consumer0.poll(Duration.ofMillis(1000));
-                consumer1.poll(Duration.ofMillis(1000));
-                consumer2.poll(Duration.ofMillis(1000));
+                consumer0.poll(Duration.ofMillis(100));
+                consumer1.poll(Duration.ofMillis(100));
+                consumer2.poll(Duration.ofMillis(100));
                 return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 0))) &&
                     consumer1.assignment().equals(Set.of(new 
TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
                     consumer2.assignment().isEmpty();
@@ -322,9 +329,9 @@ public class ConsumerIntegrationTest {
             );
             clusterInstance.waitTopicCreation(topic, 6);
             TestUtils.waitForCondition(() -> {
-                consumer0.poll(Duration.ofMillis(1000));
-                consumer1.poll(Duration.ofMillis(1000));
-                consumer2.poll(Duration.ofMillis(1000));
+                consumer0.poll(Duration.ofMillis(100));
+                consumer1.poll(Duration.ofMillis(100));
+                consumer2.poll(Duration.ofMillis(100));
                 return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 0))) &&
                     consumer1.assignment().equals(Set.of(new 
TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
                     consumer2.assignment().equals(Set.of(new 
TopicPartition(topic, 3), new TopicPartition(topic, 4), new 
TopicPartition(topic, 5)));
@@ -346,13 +353,13 @@ public class ConsumerIntegrationTest {
                 new TopicPartition(topic, 5), Optional.of(new 
NewPartitionReassignment(List.of(0)))
             )).all().get();
             TestUtils.waitForCondition(() -> {
-                consumer0.poll(Duration.ofMillis(1000));
-                consumer1.poll(Duration.ofMillis(1000));
-                consumer2.poll(Duration.ofMillis(1000));
+                consumer0.poll(Duration.ofMillis(100));
+                consumer1.poll(Duration.ofMillis(100));
+                consumer2.poll(Duration.ofMillis(100));
                 return consumer0.assignment().equals(Set.of(new 
TopicPartition(topic, 5))) &&
                     consumer1.assignment().equals(Set.of(new 
TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
                     consumer2.assignment().equals(Set.of(new 
TopicPartition(topic, 0), new TopicPartition(topic, 1), new 
TopicPartition(topic, 2)));
-            }, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 
3, 4 | 2 -> 0, 1, 2");
+            }, 30000, "Consumer with topic partition mapping should be 0 -> 5 
| 1 -> 3, 4 | 2 -> 0, 1, 2");
         }
     }
 

Reply via email to