This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1697a7b8fe3 KAFKA-20316 Rewrite AddPartitionsTest with ClusterInstance
(#21903)
1697a7b8fe3 is described below
commit 1697a7b8fe330db6452ff74739a4ecd93bb444e2
Author: Lan Ding <[email protected]>
AuthorDate: Tue Mar 31 16:25:38 2026 +0800
KAFKA-20316 Rewrite AddPartitionsTest with ClusterInstance (#21903)
Rewrite `AddPartitionsTest` with `ClusterInstance`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/AddPartitionsTest.java | 212 ++++++++++++++++++++
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 216 ---------------------
2 files changed, 212 insertions(+), 216 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
new file mode 100644
index 00000000000..afaec380867
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(types = {Type.KRAFT}, brokers = 4)
+public class AddPartitionsTest {
+
+ @ClusterTest
+ public void testWrongReplicaCount(ClusterInstance cluster) throws
Exception {
+ try (Admin admin = cluster.admin()) {
+ cluster.createTopicWithAssignment("topic1", Map.of(0, List.of(0,
1)));
+
+ ExecutionException exception =
assertThrows(ExecutionException.class, () ->
+ admin.createPartitions(Map.of("topic1",
+ NewPartitions.increaseTo(2, List.of(List.of(0, 1,
2))))).all().get());
+ assertInstanceOf(InvalidReplicaAssignmentException.class,
exception.getCause());
+ }
+ }
+
+ @ClusterTest
+ public void testMissingPartitionsInCreateTopics(ClusterInstance cluster) {
+ try (Admin admin = cluster.admin()) {
+ Map<Integer, List<Integer>> topic6Placements = Map.of(
+ 1, List.of(0, 1),
+ 2, List.of(1, 0));
+ Map<Integer, List<Integer>> topic7Placements = Map.of(
+ 2, List.of(0, 1),
+ 3, List.of(1, 0));
+
+ CreateTopicsResult result = admin.createTopics(List.of(
+ new NewTopic("new-topic6", topic6Placements),
+ new NewTopic("new-topic7", topic7Placements)));
+
+ Throwable topic6Cause = assertThrows(ExecutionException.class,
+ () -> result.values().get("new-topic6").get()).getCause();
+ assertInstanceOf(InvalidReplicaAssignmentException.class,
topic6Cause);
+ assertTrue(topic6Cause.getMessage().contains("partitions should be
a consecutive 0-based integer sequence"),
+ "Unexpected error message: " + topic6Cause.getMessage());
+
+ Throwable topic7Cause = assertThrows(ExecutionException.class,
+ () -> result.values().get("new-topic7").get()).getCause();
+ assertInstanceOf(InvalidReplicaAssignmentException.class,
topic7Cause);
+ assertTrue(topic7Cause.getMessage().contains("partitions should be
a consecutive 0-based integer sequence"),
+ "Unexpected error message: " + topic7Cause.getMessage());
+ }
+ }
+
+ @ClusterTest
+ public void testMissingPartitionsInCreatePartitions(ClusterInstance
cluster) throws Exception {
+ try (Admin admin = cluster.admin()) {
+ cluster.createTopicWithAssignment("topic1", Map.of(0, List.of(0,
1)));
+
+ Throwable cause = assertThrows(ExecutionException.class, () ->
+ admin.createPartitions(Map.of("topic1",
+ NewPartitions.increaseTo(3, List.of(List.of(0, 1,
2))))).all().get()).getCause();
+ assertInstanceOf(InvalidReplicaAssignmentException.class, cause);
+ assertTrue(cause.getMessage().contains(
+ "Attempted to add 2 additional partition(s), but only 1
assignment(s) were specified."),
+ "Unexpected error message: " + cause.getMessage());
+ }
+ }
+
+ @ClusterTest
+ public void testIncrementPartitions(ClusterInstance cluster) throws
Exception {
+ try (Admin admin = cluster.admin()) {
+ cluster.createTopicWithAssignment("topic1", Map.of(0, List.of(0,
1)));
+
+ admin.createPartitions(Map.of("topic1",
NewPartitions.increaseTo(3))).all().get();
+
+ cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic1", 1, 30000);
+ cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic1", 2, 30000);
+
+ cluster.waitTopicCreation("topic1", 3);
+
+ Map<String, TopicDescription> descriptions =
admin.describeTopics(List.of("topic1")).allTopicNames().get();
+ TopicDescription topicDescription = descriptions.get("topic1");
+ assertEquals(3, topicDescription.partitions().size());
+
+ for (TopicPartitionInfo partition : topicDescription.partitions())
{
+ assertEquals(2, partition.replicas().size());
+ assertNotNull(partition.leader());
+ assertTrue(partition.replicas().contains(partition.leader()));
+ }
+ }
+ }
+
+ @ClusterTest
+ public void testManualAssignmentOfReplicas(ClusterInstance cluster) throws
Exception {
+ try (Admin admin = cluster.admin()) {
+ cluster.createTopicWithAssignment("topic2", Map.of(0, List.of(1,
2)));
+
+ admin.createPartitions(Map.of("topic2", NewPartitions.increaseTo(3,
+ List.of(List.of(0, 1), List.of(2, 3))))).all().get();
+
+ cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic2", 1, 30000);
+ cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic2", 2, 30000);
+
+ cluster.waitTopicCreation("topic2", 3);
+
+ Map<String, TopicDescription> descriptions =
admin.describeTopics(List.of("topic2")).allTopicNames().get();
+ TopicDescription topicDescription = descriptions.get("topic2");
+ assertEquals(3, topicDescription.partitions().size());
+
+ List<TopicPartitionInfo> partitions =
topicDescription.partitions().stream()
+ .sorted(Comparator.comparingInt(TopicPartitionInfo::partition))
+ .toList();
+
+ assertEquals(0, partitions.get(0).partition());
+ assertEquals(1, partitions.get(1).partition());
+ assertEquals(2, partitions.get(2).partition());
+
+ Set<Integer> partition1Replicas =
partitions.get(1).replicas().stream()
+ .map(Node::id)
+ .collect(Collectors.toSet());
+ assertEquals(2, partition1Replicas.size());
+ assertEquals(Set.of(0, 1), partition1Replicas);
+ }
+ }
+
+ @ClusterTest
+ public void testReplicaPlacementAllServers(ClusterInstance cluster) throws
Exception {
+ try (Admin admin = cluster.admin()) {
+ cluster.createTopicWithAssignment("topic3", Map.of(0, List.of(2,
3, 0, 1)));
+
+ admin.createPartitions(Map.of("topic3",
NewPartitions.increaseTo(7))).all().get();
+
+ cluster.waitTopicCreation("topic3", 7);
+
+ Map<String, TopicDescription> descriptions =
admin.describeTopics(List.of("topic3")).allTopicNames().get();
+ TopicDescription topicDescription = descriptions.get("topic3");
+ assertEquals(7, topicDescription.partitions().size());
+
+ for (TopicPartitionInfo partition : topicDescription.partitions())
{
+ Set<Integer> replicaIds = partition.replicas().stream()
+ .map(Node::id)
+ .collect(Collectors.toSet());
+ assertEquals(4, replicaIds.size(),
+ "Partition " + partition.partition() + " should have 4
replicas");
+ assertTrue(replicaIds.stream().allMatch(id -> id >= 0 && id <=
3),
+ "Replicas should only include brokers 0-3");
+ assertNotNull(partition.leader(),
+ "Partition " + partition.partition() + " should have a
leader");
+ assertTrue(replicaIds.contains(partition.leader().id()),
+ "Leader should be one of the replicas");
+ }
+ }
+ }
+
+ @ClusterTest
+ public void testReplicaPlacementPartialServers(ClusterInstance cluster)
throws Exception {
+ try (Admin admin = cluster.admin()) {
+ cluster.createTopicWithAssignment("topic2", Map.of(0, List.of(1,
2)));
+
+ admin.createPartitions(Map.of("topic2",
NewPartitions.increaseTo(3))).all().get();
+
+ cluster.waitTopicCreation("topic2", 3);
+
+ Map<String, TopicDescription> descriptions =
admin.describeTopics(List.of("topic2")).allTopicNames().get();
+ TopicDescription topicDescription = descriptions.get("topic2");
+ assertEquals(3, topicDescription.partitions().size());
+
+ for (TopicPartitionInfo partition : topicDescription.partitions())
{
+ Set<Integer> replicaIds = partition.replicas().stream()
+ .map(Node::id)
+ .collect(Collectors.toSet());
+ assertEquals(2, replicaIds.size(),
+ "Partition " + partition.partition() + " should have 2
replicas");
+ assertTrue(replicaIds.stream().allMatch(id -> id >= 0 && id <=
3),
+ "Replicas should only include brokers 0-3");
+ assertNotNull(partition.leader(),
+ "Partition " + partition.partition() + " should have a
leader");
+ assertTrue(replicaIds.contains(partition.leader().id()),
+ "Leader should be one of the replicas");
+ }
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
deleted file mode 100755
index bba5278d7a6..00000000000
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import kafka.server.{BaseRequestTest, BrokerServer}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
-import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
-import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
-import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
-import java.util
-import java.util.concurrent.ExecutionException
-import scala.jdk.CollectionConverters._
-
-class AddPartitionsTest extends BaseRequestTest {
-
- override def brokerCount: Int = 4
-
- val partitionId = 0
-
- val topic1 = "new-topic1"
- val topic1Assignment = Map(0 -> Seq(0,1))
- val topic2 = "new-topic2"
- val topic2Assignment = Map(0 -> Seq(1,2))
- val topic3 = "new-topic3"
- val topic3Assignment = Map(0 -> Seq(2,3,0,1))
- val topic4 = "new-topic4"
- val topic4Assignment = Map(0 -> Seq(0,3))
- val topic5 = "new-topic5"
- val topic5Assignment = Map(1 -> Seq(0,1))
- var admin: Admin = _
-
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- brokers.foreach(broker =>
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
- createTopicWithAssignment(topic1, partitionReplicaAssignment =
topic1Assignment)
- createTopicWithAssignment(topic2, partitionReplicaAssignment =
topic2Assignment)
- createTopicWithAssignment(topic3, partitionReplicaAssignment =
topic3Assignment)
- createTopicWithAssignment(topic4, partitionReplicaAssignment =
topic4Assignment)
- admin = createAdminClient()
- }
-
- @Test
- def testWrongReplicaCount(): Unit = {
- assertEquals(classOf[InvalidReplicaAssignmentException],
assertThrows(classOf[ExecutionException], () => {
- admin.createPartitions(util.Map.of(topic1,
- NewPartitions.increaseTo(2, util.List.of(util.List.of[Integer](0, 1,
2))))).all().get()
- }).getCause.getClass)
- }
-
- /**
- * Test that when we supply a manual partition assignment to createTopics,
it must be 0-based
- * and consecutive.
- */
- @Test
- def testMissingPartitionsInCreateTopics(): Unit = {
- val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
- topic6Placements.put(1, util.List.of(0, 1))
- topic6Placements.put(2, util.List.of(1, 0))
- val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
- topic7Placements.put(2, util.List.of(0, 1))
- topic7Placements.put(3, util.List.of(1, 0))
- val futures = admin.createTopics(util.List.of(
- new NewTopic("new-topic6", topic6Placements),
- new NewTopic("new-topic7", topic7Placements))).values()
- val topic6Cause = assertThrows(classOf[ExecutionException], () =>
futures.get("new-topic6").get()).getCause
- assertEquals(classOf[InvalidReplicaAssignmentException],
topic6Cause.getClass)
- assertTrue(topic6Cause.getMessage.contains("partitions should be a
consecutive 0-based integer sequence"),
- "Unexpected error message: " + topic6Cause.getMessage)
- val topic7Cause = assertThrows(classOf[ExecutionException], () =>
futures.get("new-topic7").get()).getCause
- assertEquals(classOf[InvalidReplicaAssignmentException],
topic7Cause.getClass)
- assertTrue(topic7Cause.getMessage.contains("partitions should be a
consecutive 0-based integer sequence"),
- "Unexpected error message: " + topic7Cause.getMessage)
- }
-
- /**
- * Test that when we supply a manual partition assignment to
createPartitions, it must contain
- * enough partitions.
- */
- @Test
- def testMissingPartitionsInCreatePartitions(): Unit = {
- val cause = assertThrows(classOf[ExecutionException], () =>
- admin.createPartitions(util.Map.of(topic1,
- NewPartitions.increaseTo(3, util.List.of(util.List.of[Integer](0, 1,
2))))).all().get()).getCause
- assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
- assertTrue(cause.getMessage.contains("Attempted to add 2 additional
partition(s), but only 1 assignment(s) " +
- "were specified."), "Unexpected error message: " + cause.getMessage)
- }
-
- @Test
- def testIncrementPartitions(): Unit = {
- admin.createPartitions(util.Map.of(topic1,
NewPartitions.increaseTo(3))).all().get()
-
- // wait until leader is elected
- waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
- waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 2)
-
- // read metadata from a broker and verify the new topic partitions exist
- TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
- TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
- val response = connectAndReceive[MetadataResponse](
- new MetadataRequest.Builder(util.List.of(topic1), false).build)
- assertEquals(1, response.topicMetadata.size)
- val partitions =
response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition)
- assertEquals(partitions.size, 3)
- assertEquals(1, partitions(1).partition)
- assertEquals(2, partitions(2).partition)
-
- for (partition <- partitions) {
- val replicas = partition.replicaIds
- assertEquals(2, replicas.size)
- assertTrue(partition.leaderId.isPresent)
- val leaderId = partition.leaderId.get
- assertTrue(replicas.contains(leaderId))
- }
- }
-
- @Test
- def testManualAssignmentOfReplicas(): Unit = {
- // Add 2 partitions
- admin.createPartitions(util.Map.of(topic2, NewPartitions.increaseTo(3,
- util.List.of(util.List.of[Integer](0, 1), util.List.of[Integer](2,
3))))).all().get()
- // wait until leader is elected
- val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
- val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
-
- // read metadata from a broker and verify the new topic partitions exist
- val partition1Metadata = TestUtils.waitForPartitionMetadata(brokers,
topic2, 1)
- assertEquals(leader1, partition1Metadata.leader())
- val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers,
topic2, 2)
- assertEquals(leader2, partition2Metadata.leader())
- val response = connectAndReceive[MetadataResponse](
- new MetadataRequest.Builder(util.List.of(topic2), false).build)
- assertEquals(1, response.topicMetadata.size)
- val topicMetadata = response.topicMetadata.asScala.head
- val partitionMetadata =
topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
- assertEquals(3, topicMetadata.partitionMetadata.size)
- assertEquals(0, partitionMetadata(0).partition)
- assertEquals(1, partitionMetadata(1).partition)
- assertEquals(2, partitionMetadata(2).partition)
- val replicas = partitionMetadata(1).replicaIds
- assertEquals(2, replicas.size)
- assertEquals(Set(0, 1), replicas.asScala.toSet)
- }
-
- @Test
- def testReplicaPlacementAllServers(): Unit = {
- admin.createPartitions(util.Map.of(topic3,
NewPartitions.increaseTo(7))).all().get()
-
- // read metadata from a broker and verify the new topic partitions exist
- TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
- TestUtils.waitForPartitionMetadata(brokers, topic3, 2)
- TestUtils.waitForPartitionMetadata(brokers, topic3, 3)
- TestUtils.waitForPartitionMetadata(brokers, topic3, 4)
- TestUtils.waitForPartitionMetadata(brokers, topic3, 5)
- TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
-
- val response = connectAndReceive[MetadataResponse](
- new MetadataRequest.Builder(util.List.of(topic3), false).build)
- assertEquals(1, response.topicMetadata.size)
- val topicMetadata = response.topicMetadata.asScala.head
-
- assertEquals(7, topicMetadata.partitionMetadata.size)
- for (partition <- topicMetadata.partitionMetadata.asScala) {
- val replicas = partition.replicaIds.asScala.toSet
- assertEquals(4, replicas.size, s"Partition ${partition.partition} should
have 4 replicas")
- assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only
include brokers 0-3")
- assertTrue(partition.leaderId.isPresent, s"Partition
${partition.partition} should have a leader")
- assertTrue(replicas.contains(partition.leaderId.get), "Leader should be
one of the replicas")
- }
- }
-
- @Test
- def testReplicaPlacementPartialServers(): Unit = {
- admin.createPartitions(util.Map.of(topic2,
NewPartitions.increaseTo(3))).all().get()
-
- // read metadata from a broker and verify the new topic partitions exist
- TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
- TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
-
- val response = connectAndReceive[MetadataResponse](
- new MetadataRequest.Builder(util.List.of(topic2), false).build)
- assertEquals(1, response.topicMetadata.size)
- val topicMetadata = response.topicMetadata.asScala.head
-
- assertEquals(3, topicMetadata.partitionMetadata.size)
- for (partition <- topicMetadata.partitionMetadata.asScala) {
- val replicas = partition.replicaIds.asScala.toSet
- assertEquals(2, replicas.size, s"Partition ${partition.partition} should
have 2 replicas")
- assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only
include brokers 0-3")
- assertTrue(partition.leaderId.isPresent, s"Partition
${partition.partition} should have a leader")
- assertTrue(replicas.contains(partition.leaderId.get), "Leader should be
one of the replicas")
- }
- }
-
-}