This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1697a7b8fe3 KAFKA-20316 Rewrite AddPartitionsTest with ClusterInstance 
(#21903)
1697a7b8fe3 is described below

commit 1697a7b8fe330db6452ff74739a4ecd93bb444e2
Author: Lan Ding <[email protected]>
AuthorDate: Tue Mar 31 16:25:38 2026 +0800

    KAFKA-20316 Rewrite AddPartitionsTest with ClusterInstance (#21903)
    
    Rewrite `AddPartitionsTest` with `ClusterInstance`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/AddPartitionsTest.java     | 212 ++++++++++++++++++++
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 216 ---------------------
 2 files changed, 212 insertions(+), 216 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
new file mode 100644
index 00000000000..afaec380867
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(types = {Type.KRAFT}, brokers = 4)
+public class AddPartitionsTest {
+
+    @ClusterTest
+    public void testWrongReplicaCount(ClusterInstance cluster) throws 
Exception {
+        try (Admin admin = cluster.admin()) {
+            cluster.createTopicWithAssignment("topic1", Map.of(0, List.of(0, 
1)));
+
+            ExecutionException exception = 
assertThrows(ExecutionException.class, () ->
+                admin.createPartitions(Map.of("topic1",
+                    NewPartitions.increaseTo(2, List.of(List.of(0, 1, 
2))))).all().get());
+            assertInstanceOf(InvalidReplicaAssignmentException.class, 
exception.getCause());
+        }
+    }
+
+    @ClusterTest
+    public void testMissingPartitionsInCreateTopics(ClusterInstance cluster) {
+        try (Admin admin = cluster.admin()) {
+            Map<Integer, List<Integer>> topic6Placements = Map.of(
+                1, List.of(0, 1),
+                2, List.of(1, 0));
+            Map<Integer, List<Integer>> topic7Placements = Map.of(
+                2, List.of(0, 1),
+                3, List.of(1, 0));
+
+            CreateTopicsResult result = admin.createTopics(List.of(
+                new NewTopic("new-topic6", topic6Placements),
+                new NewTopic("new-topic7", topic7Placements)));
+
+            Throwable topic6Cause = assertThrows(ExecutionException.class,
+                () -> result.values().get("new-topic6").get()).getCause();
+            assertInstanceOf(InvalidReplicaAssignmentException.class, 
topic6Cause);
+            assertTrue(topic6Cause.getMessage().contains("partitions should be 
a consecutive 0-based integer sequence"),
+                "Unexpected error message: " + topic6Cause.getMessage());
+
+            Throwable topic7Cause = assertThrows(ExecutionException.class,
+                () -> result.values().get("new-topic7").get()).getCause();
+            assertInstanceOf(InvalidReplicaAssignmentException.class, 
topic7Cause);
+            assertTrue(topic7Cause.getMessage().contains("partitions should be 
a consecutive 0-based integer sequence"),
+                "Unexpected error message: " + topic7Cause.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testMissingPartitionsInCreatePartitions(ClusterInstance 
cluster) throws Exception {
+        try (Admin admin = cluster.admin()) {
+            cluster.createTopicWithAssignment("topic1", Map.of(0, List.of(0, 
1)));
+
+            Throwable cause = assertThrows(ExecutionException.class, () ->
+                admin.createPartitions(Map.of("topic1",
+                    NewPartitions.increaseTo(3, List.of(List.of(0, 1, 
2))))).all().get()).getCause();
+            assertInstanceOf(InvalidReplicaAssignmentException.class, cause);
+            assertTrue(cause.getMessage().contains(
+                "Attempted to add 2 additional partition(s), but only 1 
assignment(s) were specified."),
+                "Unexpected error message: " + cause.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testIncrementPartitions(ClusterInstance cluster) throws 
Exception {
+        try (Admin admin = cluster.admin()) {
+            cluster.createTopicWithAssignment("topic1", Map.of(0, List.of(0, 
1)));
+
+            admin.createPartitions(Map.of("topic1", 
NewPartitions.increaseTo(3))).all().get();
+
+            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic1", 1, 30000);
+            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic1", 2, 30000);
+
+            cluster.waitTopicCreation("topic1", 3);
+
+            Map<String, TopicDescription> descriptions = 
admin.describeTopics(List.of("topic1")).allTopicNames().get();
+            TopicDescription topicDescription = descriptions.get("topic1");
+            assertEquals(3, topicDescription.partitions().size());
+
+            for (TopicPartitionInfo partition : topicDescription.partitions()) 
{
+                assertEquals(2, partition.replicas().size());
+                assertNotNull(partition.leader());
+                assertTrue(partition.replicas().contains(partition.leader()));
+            }
+        }
+    }
+
+    @ClusterTest
+    public void testManualAssignmentOfReplicas(ClusterInstance cluster) throws 
Exception {
+        try (Admin admin = cluster.admin()) {
+            cluster.createTopicWithAssignment("topic2", Map.of(0, List.of(1, 
2)));
+
+            admin.createPartitions(Map.of("topic2", NewPartitions.increaseTo(3,
+                List.of(List.of(0, 1), List.of(2, 3))))).all().get();
+
+            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic2", 1, 30000);
+            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic2", 2, 30000);
+
+            cluster.waitTopicCreation("topic2", 3);
+
+            Map<String, TopicDescription> descriptions = 
admin.describeTopics(List.of("topic2")).allTopicNames().get();
+            TopicDescription topicDescription = descriptions.get("topic2");
+            assertEquals(3, topicDescription.partitions().size());
+
+            List<TopicPartitionInfo> partitions = 
topicDescription.partitions().stream()
+                .sorted(Comparator.comparingInt(TopicPartitionInfo::partition))
+                .toList();
+
+            assertEquals(0, partitions.get(0).partition());
+            assertEquals(1, partitions.get(1).partition());
+            assertEquals(2, partitions.get(2).partition());
+
+            Set<Integer> partition1Replicas = 
partitions.get(1).replicas().stream()
+                .map(Node::id)
+                .collect(Collectors.toSet());
+            assertEquals(2, partition1Replicas.size());
+            assertEquals(Set.of(0, 1), partition1Replicas);
+        }
+    }
+
+    @ClusterTest
+    public void testReplicaPlacementAllServers(ClusterInstance cluster) throws 
Exception {
+        try (Admin admin = cluster.admin()) {
+            cluster.createTopicWithAssignment("topic3", Map.of(0, List.of(2, 
3, 0, 1)));
+
+            admin.createPartitions(Map.of("topic3", 
NewPartitions.increaseTo(7))).all().get();
+
+            cluster.waitTopicCreation("topic3", 7);
+
+            Map<String, TopicDescription> descriptions = 
admin.describeTopics(List.of("topic3")).allTopicNames().get();
+            TopicDescription topicDescription = descriptions.get("topic3");
+            assertEquals(7, topicDescription.partitions().size());
+
+            for (TopicPartitionInfo partition : topicDescription.partitions()) 
{
+                Set<Integer> replicaIds = partition.replicas().stream()
+                    .map(Node::id)
+                    .collect(Collectors.toSet());
+                assertEquals(4, replicaIds.size(),
+                    "Partition " + partition.partition() + " should have 4 
replicas");
+                assertTrue(replicaIds.stream().allMatch(id -> id >= 0 && id <= 
3),
+                    "Replicas should only include brokers 0-3");
+                assertNotNull(partition.leader(),
+                    "Partition " + partition.partition() + " should have a 
leader");
+                assertTrue(replicaIds.contains(partition.leader().id()),
+                    "Leader should be one of the replicas");
+            }
+        }
+    }
+
+    @ClusterTest
+    public void testReplicaPlacementPartialServers(ClusterInstance cluster) 
throws Exception {
+        try (Admin admin = cluster.admin()) {
+            cluster.createTopicWithAssignment("topic2", Map.of(0, List.of(1, 
2)));
+
+            admin.createPartitions(Map.of("topic2", 
NewPartitions.increaseTo(3))).all().get();
+
+            cluster.waitTopicCreation("topic2", 3);
+
+            Map<String, TopicDescription> descriptions = 
admin.describeTopics(List.of("topic2")).allTopicNames().get();
+            TopicDescription topicDescription = descriptions.get("topic2");
+            assertEquals(3, topicDescription.partitions().size());
+
+            for (TopicPartitionInfo partition : topicDescription.partitions()) 
{
+                Set<Integer> replicaIds = partition.replicas().stream()
+                    .map(Node::id)
+                    .collect(Collectors.toSet());
+                assertEquals(2, replicaIds.size(),
+                    "Partition " + partition.partition() + " should have 2 
replicas");
+                assertTrue(replicaIds.stream().allMatch(id -> id >= 0 && id <= 
3),
+                    "Replicas should only include brokers 0-3");
+                assertNotNull(partition.leader(),
+                    "Partition " + partition.partition() + " should have a 
leader");
+                assertTrue(replicaIds.contains(partition.leader().id()),
+                    "Leader should be one of the replicas");
+            }
+        }
+    }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
deleted file mode 100755
index bba5278d7a6..00000000000
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import kafka.server.{BaseRequestTest, BrokerServer}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
-import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
-import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
-import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
-import java.util
-import java.util.concurrent.ExecutionException
-import scala.jdk.CollectionConverters._
-
-class AddPartitionsTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 4
-
-  val partitionId = 0
-
-  val topic1 = "new-topic1"
-  val topic1Assignment = Map(0 -> Seq(0,1))
-  val topic2 = "new-topic2"
-  val topic2Assignment = Map(0 -> Seq(1,2))
-  val topic3 = "new-topic3"
-  val topic3Assignment = Map(0 -> Seq(2,3,0,1))
-  val topic4 = "new-topic4"
-  val topic4Assignment = Map(0 -> Seq(0,3))
-  val topic5 = "new-topic5"
-  val topic5Assignment = Map(1 -> Seq(0,1))
-  var admin: Admin = _
-
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    brokers.foreach(broker => 
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
-    createTopicWithAssignment(topic1, partitionReplicaAssignment = 
topic1Assignment)
-    createTopicWithAssignment(topic2, partitionReplicaAssignment = 
topic2Assignment)
-    createTopicWithAssignment(topic3, partitionReplicaAssignment = 
topic3Assignment)
-    createTopicWithAssignment(topic4, partitionReplicaAssignment = 
topic4Assignment)
-    admin = createAdminClient()
-  }
-
-  @Test
-  def testWrongReplicaCount(): Unit = {
-    assertEquals(classOf[InvalidReplicaAssignmentException], 
assertThrows(classOf[ExecutionException], () => {
-        admin.createPartitions(util.Map.of(topic1,
-          NewPartitions.increaseTo(2, util.List.of(util.List.of[Integer](0, 1, 
2))))).all().get()
-      }).getCause.getClass)
-  }
-
-  /**
-   * Test that when we supply a manual partition assignment to createTopics, 
it must be 0-based
-   * and consecutive.
-   */
-  @Test
-  def testMissingPartitionsInCreateTopics(): Unit = {
-    val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
-    topic6Placements.put(1, util.List.of(0, 1))
-    topic6Placements.put(2, util.List.of(1, 0))
-    val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
-    topic7Placements.put(2, util.List.of(0, 1))
-    topic7Placements.put(3, util.List.of(1, 0))
-    val futures = admin.createTopics(util.List.of(
-      new NewTopic("new-topic6", topic6Placements),
-      new NewTopic("new-topic7", topic7Placements))).values()
-    val topic6Cause = assertThrows(classOf[ExecutionException], () => 
futures.get("new-topic6").get()).getCause
-    assertEquals(classOf[InvalidReplicaAssignmentException], 
topic6Cause.getClass)
-    assertTrue(topic6Cause.getMessage.contains("partitions should be a 
consecutive 0-based integer sequence"),
-      "Unexpected error message: " + topic6Cause.getMessage)
-    val topic7Cause = assertThrows(classOf[ExecutionException], () => 
futures.get("new-topic7").get()).getCause
-    assertEquals(classOf[InvalidReplicaAssignmentException], 
topic7Cause.getClass)
-    assertTrue(topic7Cause.getMessage.contains("partitions should be a 
consecutive 0-based integer sequence"),
-      "Unexpected error message: " + topic7Cause.getMessage)
-  }
-
-  /**
-   * Test that when we supply a manual partition assignment to 
createPartitions, it must contain
-   * enough partitions.
-   */
-  @Test
-  def testMissingPartitionsInCreatePartitions(): Unit = {
-    val cause = assertThrows(classOf[ExecutionException], () =>
-      admin.createPartitions(util.Map.of(topic1,
-        NewPartitions.increaseTo(3, util.List.of(util.List.of[Integer](0, 1, 
2))))).all().get()).getCause
-    assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
-    assertTrue(cause.getMessage.contains("Attempted to add 2 additional 
partition(s), but only 1 assignment(s) " +
-      "were specified."), "Unexpected error message: " + cause.getMessage)
-  }
-
-  @Test
-  def testIncrementPartitions(): Unit = {
-    admin.createPartitions(util.Map.of(topic1, 
NewPartitions.increaseTo(3))).all().get()
-
-    // wait until leader is elected
-    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
-    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 2)
-
-    // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
-    TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
-    val response = connectAndReceive[MetadataResponse](
-      new MetadataRequest.Builder(util.List.of(topic1), false).build)
-    assertEquals(1, response.topicMetadata.size)
-    val partitions = 
response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition)
-    assertEquals(partitions.size, 3)
-    assertEquals(1, partitions(1).partition)
-    assertEquals(2, partitions(2).partition)
-
-    for (partition <- partitions) {
-      val replicas = partition.replicaIds
-      assertEquals(2, replicas.size)
-      assertTrue(partition.leaderId.isPresent)
-      val leaderId = partition.leaderId.get
-      assertTrue(replicas.contains(leaderId))
-    }
-  }
-
-  @Test
-  def testManualAssignmentOfReplicas(): Unit = {
-    // Add 2 partitions
-    admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3,
-      util.List.of(util.List.of[Integer](0, 1), util.List.of[Integer](2, 
3))))).all().get()
-    // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
-
-    // read metadata from a broker and verify the new topic partitions exist
-    val partition1Metadata = TestUtils.waitForPartitionMetadata(brokers, 
topic2, 1)
-    assertEquals(leader1, partition1Metadata.leader())
-    val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, 
topic2, 2)
-    assertEquals(leader2, partition2Metadata.leader())
-    val response = connectAndReceive[MetadataResponse](
-      new MetadataRequest.Builder(util.List.of(topic2), false).build)
-    assertEquals(1, response.topicMetadata.size)
-    val topicMetadata = response.topicMetadata.asScala.head
-    val partitionMetadata = 
topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
-    assertEquals(3, topicMetadata.partitionMetadata.size)
-    assertEquals(0, partitionMetadata(0).partition)
-    assertEquals(1, partitionMetadata(1).partition)
-    assertEquals(2, partitionMetadata(2).partition)
-    val replicas = partitionMetadata(1).replicaIds
-    assertEquals(2, replicas.size)
-    assertEquals(Set(0, 1), replicas.asScala.toSet)
-  }
-
-  @Test
-  def testReplicaPlacementAllServers(): Unit = {
-    admin.createPartitions(util.Map.of(topic3, 
NewPartitions.increaseTo(7))).all().get()
-
-    // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
-    TestUtils.waitForPartitionMetadata(brokers, topic3, 2)
-    TestUtils.waitForPartitionMetadata(brokers, topic3, 3)
-    TestUtils.waitForPartitionMetadata(brokers, topic3, 4)
-    TestUtils.waitForPartitionMetadata(brokers, topic3, 5)
-    TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
-
-    val response = connectAndReceive[MetadataResponse](
-      new MetadataRequest.Builder(util.List.of(topic3), false).build)
-    assertEquals(1, response.topicMetadata.size)
-    val topicMetadata = response.topicMetadata.asScala.head
-
-    assertEquals(7, topicMetadata.partitionMetadata.size)
-    for (partition <- topicMetadata.partitionMetadata.asScala) {
-      val replicas = partition.replicaIds.asScala.toSet
-      assertEquals(4, replicas.size, s"Partition ${partition.partition} should 
have 4 replicas")
-      assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only 
include brokers 0-3")
-      assertTrue(partition.leaderId.isPresent, s"Partition 
${partition.partition} should have a leader")
-      assertTrue(replicas.contains(partition.leaderId.get), "Leader should be 
one of the replicas")
-    }
-  }
-
-  @Test
-  def testReplicaPlacementPartialServers(): Unit = {
-    admin.createPartitions(util.Map.of(topic2, 
NewPartitions.increaseTo(3))).all().get()
-
-    // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
-
-    val response = connectAndReceive[MetadataResponse](
-      new MetadataRequest.Builder(util.List.of(topic2), false).build)
-    assertEquals(1, response.topicMetadata.size)
-    val topicMetadata = response.topicMetadata.asScala.head
-
-    assertEquals(3, topicMetadata.partitionMetadata.size)
-    for (partition <- topicMetadata.partitionMetadata.asScala) {
-      val replicas = partition.replicaIds.asScala.toSet
-      assertEquals(2, replicas.size, s"Partition ${partition.partition} should 
have 2 replicas")
-      assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only 
include brokers 0-3")
-      assertTrue(partition.leaderId.isPresent, s"Partition 
${partition.partition} should have a leader")
-      assertTrue(replicas.contains(partition.leaderId.get), "Leader should be 
one of the replicas")
-    }
-  }
-
-}

Reply via email to