This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb96d7ccce4 KAFKA-20563: Fix flaky ShareConsumerRackAwareTest (#22268)
eb96d7ccce4 is described below

commit eb96d7ccce49ef99874cfd07d67c59a4a6370e8b
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu May 14 11:12:15 2026 +0100

    KAFKA-20563: Fix flaky ShareConsumerRackAwareTest (#22268)
    
    ShareConsumerRackAwareTest is very slightly flaky. This is caused by the
    fact that the assignor it uses is very particular about the combination
    of members, assigned partitions and rack IDs. Because the assignment can
    be performed at unpredictable points during the test, it is possible
    that the set of members is incomplete when the assignment is calculated.
    If any of the racks has no members, the assignor throws an exception and
    the test then fails.
    
    The PR makes the assignor tolerant of when there are no members for a
    rack, just not making an assignment for partitions in such a rack.
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>, Sushant Mahajan <[email protected]>
---
 .../kafka/clients/consumer/ConsumerIntegrationTest.java       |  4 ++--
 .../{RackAwareAssignor.java => RackAwareTestAssignor.java}    | 11 ++++++-----
 .../kafka/clients/consumer/ShareConsumerRackAwareTest.java    |  4 +++-
 3 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index d10dddde472..fcfb493388d 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -242,7 +242,7 @@ public class ConsumerIntegrationTest {
                 @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
-                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareTestAssignor"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
             }
         ),
@@ -255,7 +255,7 @@ public class ConsumerIntegrationTest {
                 @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"1000"),
-                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareTestAssignor"),
                 @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"1000")
             }
         )
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareTestAssignor.java
similarity index 88%
rename from 
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
rename to 
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareTestAssignor.java
index 4b9ee6fd274..f230b3d1979 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareTestAssignor.java
@@ -33,11 +33,11 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * The RackAwareAssignor is a consumer group partition assignor that takes 
into account the rack
- * information of the members when assigning partitions to them.
- * It needs all brokers and members to have rack information available.
+ * The RackAwareTestAssignor is a partition assignor for consumer groups and 
share groups that takes
+ * into account the rack information of the members when assigning partitions 
to them.
+ * It leaves partitions unassigned if there are no members with the same rack 
information.
  */
-public class RackAwareAssignor implements ConsumerGroupPartitionAssignor, 
ShareGroupPartitionAssignor {
+public class RackAwareTestAssignor implements ConsumerGroupPartitionAssignor, 
ShareGroupPartitionAssignor {
     @Override
     public String name() {
         return "rack-aware-assignor";
@@ -81,7 +81,8 @@ public class RackAwareAssignor implements 
ConsumerGroupPartitionAssignor, ShareG
                 }
 
                 if (assignedRack == null) {
-                    throw new PartitionAssignorException("No member found for 
racks " + racks + " for partition " + partitionId + " of topic " + topicId);
+                    // No rack-local member found, which can be transiently 
true as membership changes. Just skip this partition for now.
+                    break;
                 }
 
                 Map<Uuid, Set<Integer>> assignment = 
assignments.computeIfAbsent(
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 668c84a0a06..3c9b07be26a 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -50,7 +50,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
         @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
         @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
         @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
-        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor")
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareTestAssignor")
     }
 )
 public class ShareConsumerRackAwareTest {
@@ -137,6 +137,8 @@ public class ShareConsumerRackAwareTest {
                     NewPartitions.increaseTo(6, List.of(List.of(2), 
List.of(2), List.of(2)))
                 )
             );
+            clusterInstance.waitTopicCreation(topic, 6);
+
             TestUtils.waitForCondition(() -> {
                 consumer0.poll(Duration.ofMillis(1000));
                 consumer1.poll(Duration.ofMillis(1000));

Reply via email to