This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28402a3e458 MINOR: Move CreateTopicsRequestTest to
clients-integration-tests and rewrite in Java (#22105)
28402a3e458 is described below
commit 28402a3e4589487f12685b5ebaf37a742acfd20c
Author: majialong <[email protected]>
AuthorDate: Thu Apr 23 00:29:29 2026 +0800
MINOR: Move CreateTopicsRequestTest to clients-integration-tests and
rewrite in Java (#22105)
Rewrite `CreateTopicsRequestTest` in Java and move it from core to
`clients-integration-tests`.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/CreateTopicsRequestTest.java | 375 +++++++++++++++++++++
.../server/AbstractCreateTopicsRequestTest.scala | 195 -----------
.../kafka/server/CreateTopicsRequestTest.scala | 159 ---------
3 files changed, 375 insertions(+), 354 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestTest.java
new file mode 100644
index 00000000000..25005e27d76
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/CreateTopicsRequestTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
+import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.IntegrationTestUtils;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ types = {Type.KRAFT},
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key =
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+ }
+)
+public class CreateTopicsRequestTest {
+
+ @ClusterTest
+ public void testValidCreateTopicsRequests(ClusterInstance cluster) throws
Exception {
+ // Generated assignments
+ validateValidCreateTopicsRequests(cluster,
topicsReq(topicReq("topic1")));
+ validateValidCreateTopicsRequests(cluster,
topicsReq(topicReq("topic2", null, 3)));
+ validateValidCreateTopicsRequests(cluster, topicsReq(
+ topicReq("topic3", 5, 2, Map.of("min.insync.replicas", "2"),
null)));
+
+ // Manual assignments
+ validateValidCreateTopicsRequests(cluster, topicsReq(
+ topicReq("topic4", null, null, null, Map.of(0, List.of(0)))));
+ validateValidCreateTopicsRequests(cluster, topicsReq(
+ topicReq("topic5", null, null, Map.of("min.insync.replicas", "2"),
+ Map.of(0, List.of(0, 1), 1, List.of(1, 0), 2, List.of(1,
2)))));
+
+ // Mixed
+ validateValidCreateTopicsRequests(cluster, topicsReq(
+ topicReq("topic6"),
+ topicReq("topic7", 5, 2),
+ topicReq("topic8", null, null, null,
+ Map.of(0, List.of(0, 1), 1, List.of(1, 0), 2, List.of(1,
2)))));
+ validateValidCreateTopicsRequests(cluster, topicsReq(true,
+ topicReq("topic9"),
+ topicReq("topic10", 5, 2),
+ topicReq("topic11", null, null, null,
+ Map.of(0, List.of(0, 1), 1, List.of(1, 0), 2, List.of(1,
2)))));
+
+ // Defaults
+ validateValidCreateTopicsRequests(cluster,
topicsReq(topicReq("topic12", -1, -1)));
+ validateValidCreateTopicsRequests(cluster,
topicsReq(topicReq("topic13", -1, 2)));
+ validateValidCreateTopicsRequests(cluster,
topicsReq(topicReq("topic14", 2, -1)));
+ }
+
+ @ClusterTest
+ public void testErrorCreateTopicsRequests(ClusterInstance cluster) throws
Exception {
+ String existingTopic = "existing-topic";
+ cluster.createTopic(existingTopic, 1, (short) 1);
+
+ int brokerCount = cluster.brokers().size();
+
+ // Basic
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq(existingTopic)),
+ Map.of(existingTopic, new ApiError(Errors.TOPIC_ALREADY_EXISTS,
"Topic 'existing-topic' already exists.")), true);
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq("error-partitions", -2, null)),
+ Map.of("error-partitions", new
ApiError(Errors.INVALID_PARTITIONS)), false);
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq("error-replication", null, brokerCount + 1)),
+ Map.of("error-replication", new
ApiError(Errors.INVALID_REPLICATION_FACTOR)), false);
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq("error-config", null, null,
Map.of("not.a.property", "error"), null)),
+ Map.of("error-config", new ApiError(Errors.INVALID_CONFIG)),
false);
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq("error-assignment", null, null, null,
+ Map.of(0, List.of(0, 1), 1, List.of(0)))),
+ Map.of("error-assignment", new
ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)), false);
+
+ // Partial
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(
+ topicReq(existingTopic),
+ topicReq("partial-partitions", -2, null),
+ topicReq("partial-replication", null, brokerCount + 1),
+ topicReq("partial-assignment", null, null, null,
+ Map.of(0, List.of(0, 1), 1, List.of(0))),
+ topicReq("partial-none")),
+ Map.of(
+ existingTopic, new ApiError(Errors.TOPIC_ALREADY_EXISTS),
+ "partial-partitions", new ApiError(Errors.INVALID_PARTITIONS),
+ "partial-replication", new
ApiError(Errors.INVALID_REPLICATION_FACTOR),
+ "partial-assignment", new
ApiError(Errors.INVALID_REPLICA_ASSIGNMENT),
+ "partial-none", new ApiError(Errors.NONE)),
+ false);
+ validateTopicExists(cluster, "partial-none", 1);
+ }
+
+ @ClusterTest
+ public void testInvalidCreateTopicsRequests(ClusterInstance cluster)
throws Exception {
+ // Partitions/ReplicationFactor and ReplicaAssignment should not both
be specified
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq("bad-args-topic", 10, 3, null, Map.of(0,
List.of(0)))),
+ Map.of("bad-args-topic", new ApiError(Errors.INVALID_REQUEST)),
false);
+
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(true, topicReq("bad-args-topic", 10, 3, null, Map.of(0,
List.of(0)))),
+ Map.of("bad-args-topic", new ApiError(Errors.INVALID_REQUEST)),
false);
+ }
+
+ @ClusterTest
+ public void testCreateTopicsRequestVersions(ClusterInstance cluster)
throws Exception {
+ for (short version = ApiKeys.CREATE_TOPICS.oldestVersion(); version <=
ApiKeys.CREATE_TOPICS.latestVersion(); version++) {
+ String topic = "topic_" + version;
+ CreateTopicsRequestData data = new CreateTopicsRequestData()
+ .setTimeoutMs(10000)
+ .setValidateOnly(false)
+ .setTopics(new CreatableTopicCollection(List.of(
+ topicReq(topic, 1, 1, Map.of("min.insync.replicas", "2"),
null)
+ ).iterator()));
+
+ CreateTopicsRequest request = new
CreateTopicsRequest.Builder(data).build(version);
+ CreateTopicsResponse response = sendCreateTopicRequest(cluster,
request);
+
+ CreatableTopicResult topicResponse =
response.data().topics().find(topic);
+ assertNotNull(topicResponse);
+ assertEquals(topic, topicResponse.name());
+ assertEquals(Errors.NONE.code(), topicResponse.errorCode());
+
+ if (version >= 5) {
+ assertEquals(1, topicResponse.numPartitions());
+ assertEquals(1, topicResponse.replicationFactor());
+ var config = topicResponse.configs().stream()
+ .filter(c ->
"min.insync.replicas".equals(c.name())).findFirst();
+ assertTrue(config.isPresent());
+ assertEquals("2", config.get().value());
+ } else {
+ assertEquals(-1, topicResponse.numPartitions());
+ assertEquals(-1, topicResponse.replicationFactor());
+ assertTrue(topicResponse.configs().isEmpty());
+ }
+
+ if (version >= 7) {
+ assertNotEquals(Uuid.ZERO_UUID, topicResponse.topicId());
+ } else {
+ assertEquals(Uuid.ZERO_UUID, topicResponse.topicId());
+ }
+ }
+ }
+
+ @ClusterTest
+ public void testCreateClusterMetadataTopic(ClusterInstance cluster) throws
Exception {
+ validateErrorCreateTopicsRequests(cluster,
+ topicsReq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME)),
+ Map.of(Topic.CLUSTER_METADATA_TOPIC_NAME,
+ new ApiError(Errors.INVALID_REQUEST,
+ "Creation of internal topic " +
Topic.CLUSTER_METADATA_TOPIC_NAME + " is prohibited.")),
+ true);
+ }
+
+ private static CreateTopicsRequest topicsReq(CreatableTopic... topics) {
+ return topicsReq(false, topics);
+ }
+
+ private static CreateTopicsRequest topicsReq(boolean validateOnly,
CreatableTopic... topics) {
+ var req = new CreateTopicsRequestData()
+ .setTimeoutMs(10000)
+ .setTopics(new
CreatableTopicCollection(List.of(topics).iterator()))
+ .setValidateOnly(validateOnly);
+ return new CreateTopicsRequest.Builder(req).build();
+ }
+
+ private static CreatableTopic topicReq(String name) {
+ return topicReq(name, null, null, null, null);
+ }
+
+ private static CreatableTopic topicReq(String name, Integer numPartitions,
Integer replicationFactor) {
+ return topicReq(name, numPartitions, replicationFactor, null, null);
+ }
+
+ private static CreatableTopic topicReq(
+ String name,
+ Integer numPartitions,
+ Integer replicationFactor,
+ Map<String, String> config,
+ Map<Integer, List<Integer>> assignment
+ ) {
+ CreatableTopic topic = new CreatableTopic();
+ topic.setName(name);
+ if (numPartitions != null) {
+ topic.setNumPartitions(numPartitions);
+ } else if (assignment != null) {
+ topic.setNumPartitions(-1);
+ } else {
+ topic.setNumPartitions(1);
+ }
+ if (replicationFactor != null) {
+ topic.setReplicationFactor(replicationFactor.shortValue());
+ } else if (assignment != null) {
+ topic.setReplicationFactor((short) -1);
+ } else {
+ topic.setReplicationFactor((short) 1);
+ }
+ if (config != null) {
+ var effectiveConfigs = new CreatableTopicConfigCollection();
+ config.forEach((configName, configValue) ->
+ effectiveConfigs.add(new CreatableTopicConfig()
+ .setName(configName)
+ .setValue(configValue)));
+ topic.setConfigs(effectiveConfigs);
+ }
+ if (assignment != null) {
+ var effectiveAssignments = new
CreatableReplicaAssignmentCollection();
+ assignment.forEach((partitionIndex, brokerIdList) ->
+ effectiveAssignments.add(new CreatableReplicaAssignment()
+ .setPartitionIndex(partitionIndex)
+ .setBrokerIds(new ArrayList<>(brokerIdList))));
+ topic.setAssignments(effectiveAssignments);
+ }
+ return topic;
+ }
+
+ private static void validateValidCreateTopicsRequests(ClusterInstance
cluster, CreateTopicsRequest request) throws Exception {
+ CreateTopicsResponse response = sendCreateTopicRequest(cluster,
request);
+
+ assertFalse(response.errorCounts().keySet().stream().anyMatch(e ->
e.code() > 0),
+ "There should be no errors, found " +
response.errorCounts().keySet());
+
+ for (CreatableTopic topic : request.data().topics()) {
+ if (!request.data().validateOnly()) {
+ int partitions = !topic.assignments().isEmpty()
+ ? topic.assignments().size()
+ : (topic.numPartitions() == -1 ?
defaultNumPartitions(cluster) : topic.numPartitions());
+ cluster.waitTopicCreation(topic.name(), partitions);
+ }
+ verifyMetadata(cluster, topic, request.data().validateOnly());
+ }
+ }
+
+ private static void verifyMetadata(ClusterInstance cluster, CreatableTopic
topic,
+ boolean validateOnly) throws Exception {
+ MetadataResponse metadataResponse = sendMetadataRequest(cluster,
+ new MetadataRequest.Builder(List.of(topic.name()), false).build());
+ MetadataResponse.TopicMetadata metadataForTopic =
metadataResponse.topicMetadata().stream()
+ .filter(t ->
topic.name().equals(t.topic())).findFirst().orElse(null);
+
+ int partitions = !topic.assignments().isEmpty() ?
topic.assignments().size() : topic.numPartitions();
+ int replication = !topic.assignments().isEmpty() ?
topic.assignments().iterator().next().brokerIds().size() :
topic.replicationFactor();
+
+ if (validateOnly) {
+ assertNotNull(metadataForTopic);
+ assertNotEquals(Errors.NONE, metadataForTopic.error(), "Topic " +
topic + " should not be created");
+ assertTrue(metadataForTopic.partitionMetadata().isEmpty(), "The
topic should have no partitions");
+ } else {
+ assertNotNull(metadataForTopic, "The topic should be created");
+ assertEquals(Errors.NONE, metadataForTopic.error());
+ if (partitions == -1) {
+ assertEquals(defaultNumPartitions(cluster),
metadataForTopic.partitionMetadata().size(),
+ "The topic should have the default number of partitions");
+ } else {
+ assertEquals(partitions,
metadataForTopic.partitionMetadata().size(),
+ "The topic should have the correct number of partitions");
+ }
+
+ if (replication == -1) {
+ assertEquals(defaultReplicationFactor(cluster),
metadataForTopic.partitionMetadata().get(0).replicaIds.size(),
+ "The topic should have the default replication factor");
+ } else {
+ assertEquals(replication,
metadataForTopic.partitionMetadata().get(0).replicaIds.size(),
+ "The topic should have the correct replication factor");
+ }
+ }
+ }
+
+ private static void validateErrorCreateTopicsRequests(
+ ClusterInstance cluster,
+ CreateTopicsRequest request,
+ Map<String, ApiError> expectedResponse,
+ boolean checkErrorMessage
+ ) throws Exception {
+ CreateTopicsResponse response = sendCreateTopicRequest(cluster,
request);
+ assertEquals(expectedResponse.size(), response.data().topics().size(),
"The response size should match");
+
+ for (var entry : expectedResponse.entrySet()) {
+ String topicName = entry.getKey();
+ ApiError expectedError = entry.getValue();
+
+ CreatableTopicResult actual =
response.data().topics().find(topicName);
+ if (actual == null) {
+ throw new RuntimeException("No response data found for topic "
+ topicName);
+ }
+ assertEquals(expectedError.error().code(), actual.errorCode(),
"The response error code should match");
+ if (checkErrorMessage) {
+ assertEquals(expectedError.message(), actual.errorMessage(),
"The response error message should match");
+ }
+ // If no error, validate topic exists
+ if (expectedError.isSuccess() && !request.data().validateOnly()) {
+ CreatableTopic topic = request.data().topics().find(topicName);
+ int partitions = !topic.assignments().isEmpty()
+ ? topic.assignments().size()
+ : (topic.numPartitions() == -1 ?
defaultNumPartitions(cluster) : topic.numPartitions());
+ validateTopicExists(cluster, topicName, partitions);
+ }
+ }
+ }
+
+ private static void validateTopicExists(ClusterInstance cluster, String
topic, int partitions) throws Exception {
+ cluster.waitTopicCreation(topic, partitions);
+ MetadataResponse metadataResponse = sendMetadataRequest(cluster,
+ new MetadataRequest.Builder(List.of(topic), true).build());
+ assertTrue(metadataResponse.topicMetadata().stream().anyMatch(p ->
topic.equals(p.topic()) && p.error() == Errors.NONE),
+ "The topic should be created");
+ }
+
+ private static CreateTopicsResponse sendCreateTopicRequest(ClusterInstance
cluster, CreateTopicsRequest request) throws Exception {
+ return IntegrationTestUtils.connectAndReceive(request,
cluster.brokerBoundPorts().get(0));
+ }
+
+ private static MetadataResponse sendMetadataRequest(ClusterInstance
cluster, MetadataRequest request) throws Exception {
+ return IntegrationTestUtils.connectAndReceive(request,
cluster.brokerBoundPorts().get(0));
+ }
+
+ private static int defaultNumPartitions(ClusterInstance cluster) {
+ return Integer.parseInt(cluster.config().serverProperties()
+ .getOrDefault(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1"));
+ }
+
+ private static int defaultReplicationFactor(ClusterInstance cluster) {
+ return Integer.parseInt(cluster.config().serverProperties()
+
.getOrDefault(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1"));
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
deleted file mode 100644
index 374da81a8a1..00000000000
---
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util
-import java.util.Properties
-
-import kafka.network.SocketServer
-import kafka.utils.TestUtils
-import org.apache.kafka.common.message.CreateTopicsRequestData
-import org.apache.kafka.common.message.CreateTopicsRequestData._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests._
-import org.apache.kafka.server.config.ServerLogConfigs
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue}
-
-import scala.jdk.CollectionConverters._
-
-abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
-
- override def brokerPropertyOverrides(properties: Properties): Unit =
- properties.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false.toString)
-
- def topicsReq(topics: Seq[CreatableTopic],
- timeout: Integer = 10000,
- validateOnly: Boolean = false) = {
- val req = new CreateTopicsRequestData()
- req.setTimeoutMs(timeout)
- req.setTopics(new CreatableTopicCollection(topics.asJava))
- req.setValidateOnly(validateOnly)
- new CreateTopicsRequest.Builder(req).build()
- }
-
- def topicReq(name: String,
- numPartitions: Integer = null,
- replicationFactor: Integer = null,
- config: Map[String, String] = null,
- assignment: Map[Int, Seq[Int]] = null): CreatableTopic = {
- val topic = new CreatableTopic()
- topic.setName(name)
- if (numPartitions != null) {
- topic.setNumPartitions(numPartitions)
- } else if (assignment != null) {
- topic.setNumPartitions(-1)
- } else {
- topic.setNumPartitions(1)
- }
- if (replicationFactor != null) {
- topic.setReplicationFactor(replicationFactor.toShort)
- } else if (assignment != null) {
- topic.setReplicationFactor((-1).toShort)
- } else {
- topic.setReplicationFactor(1.toShort)
- }
- if (config != null) {
- val effectiveConfigs = new CreatableTopicConfigCollection()
- config.foreach {
- case (name, value) =>
- effectiveConfigs.add(new
CreatableTopicConfig().setName(name).setValue(value))
- }
- topic.setConfigs(effectiveConfigs)
- }
- if (assignment != null) {
- val effectiveAssignments = new CreatableReplicaAssignmentCollection()
- assignment.foreach {
- case (partitionIndex, brokerIdList) => {
- val effectiveAssignment = new CreatableReplicaAssignment()
- effectiveAssignment.setPartitionIndex(partitionIndex)
- val brokerIds = new util.ArrayList[java.lang.Integer]()
- brokerIdList.foreach(brokerId => brokerIds.add(brokerId))
- effectiveAssignment.setBrokerIds(brokerIds)
- effectiveAssignments.add(effectiveAssignment)
- }
- }
- topic.setAssignments(effectiveAssignments)
- }
- topic
- }
-
- protected def validateValidCreateTopicsRequests(request:
CreateTopicsRequest): Unit = {
- val response = sendCreateTopicRequest(request, adminSocketServer)
-
- assertFalse(response.errorCounts().keySet().asScala.exists(_.code() > 0),
- s"There should be no errors, found
${response.errorCounts().keySet().asScala.mkString(", ")},")
-
- request.data.topics.forEach { topic =>
- def verifyMetadata(socketServer: SocketServer): Unit = {
- val metadata = sendMetadataRequest(
- new MetadataRequest.Builder(List(topic.name()).asJava,
false).build(), socketServer).topicMetadata.asScala
- val metadataForTopic = metadata.filter(_.topic == topic.name()).head
-
- val partitions = if (!topic.assignments().isEmpty)
- topic.assignments().size
- else
- topic.numPartitions
-
- val replication = if (!topic.assignments().isEmpty)
- topic.assignments().iterator().next().brokerIds().size()
- else
- topic.replicationFactor
-
- if (request.data.validateOnly) {
- assertNotNull(metadataForTopic)
- assertNotEquals(Errors.NONE, metadataForTopic.error, s"Topic $topic
should not be created")
- assertTrue(metadataForTopic.partitionMetadata.isEmpty, "The topic
should have no partitions")
- }
- else {
- assertNotNull(metadataForTopic, "The topic should be created")
- assertEquals(Errors.NONE, metadataForTopic.error)
- if (partitions == -1) {
- assertEquals(configs.head.numPartitions,
metadataForTopic.partitionMetadata.size, "The topic should have the default
number of partitions")
- } else {
- assertEquals(partitions, metadataForTopic.partitionMetadata.size,
"The topic should have the correct number of partitions")
- }
-
- if (replication == -1) {
- assertEquals(configs.head.defaultReplicationFactor,
- metadataForTopic.partitionMetadata.asScala.head.replicaIds.size,
"The topic should have the default replication factor")
- } else {
- assertEquals(replication,
metadataForTopic.partitionMetadata.asScala.head.replicaIds.size, "The topic
should have the correct replication factor")
- }
- }
- }
-
- if (!request.data.validateOnly) {
- // Wait until metadata is propagated and validate non-controller
broker has the correct metadata
- TestUtils.waitForPartitionMetadata(brokers, topic.name(), 0)
- }
- verifyMetadata(notControllerSocketServer)
- }
- }
-
- protected def error(error: Errors, errorMessage: Option[String] = None):
ApiError =
- new ApiError(error, errorMessage.orNull)
-
- protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
- expectedResponse:
Map[String, ApiError],
- checkErrorMessage: Boolean =
true): Unit = {
- val response = sendCreateTopicRequest(request, adminSocketServer)
- assertEquals(expectedResponse.size, response.data().topics().size, "The
response size should match")
-
- expectedResponse.foreach { case (topicName, expectedError) =>
- val expected = expectedResponse(topicName)
- val actual = response.data().topics().find(topicName)
- if (actual == null) {
- throw new RuntimeException(s"No response data found for topic
$topicName")
- }
- assertEquals(expected.error.code(), actual.errorCode(), "The response
error code should match")
- if (checkErrorMessage) {
- assertEquals(expected.message, actual.errorMessage(), "The response
error message should match")
- }
- // If no error validate topic exists
- if (expectedError.isSuccess && !request.data.validateOnly) {
- validateTopicExists(topicName)
- }
- }
- }
-
- protected def validateTopicExists(topic: String): Unit = {
- TestUtils.waitForPartitionMetadata(brokers, topic, 0)
- val metadata = sendMetadataRequest(
- new MetadataRequest.Builder(List(topic).asJava,
true).build()).topicMetadata.asScala
- assertTrue(metadata.exists(p => p.topic.equals(topic) && p.error ==
Errors.NONE), "The topic should be created")
- }
-
- protected def sendCreateTopicRequest(
- request: CreateTopicsRequest,
- socketServer: SocketServer = controllerSocketServer
- ): CreateTopicsResponse = {
- connectAndReceive[CreateTopicsResponse](request, socketServer)
- }
-
- protected def sendMetadataRequest(
- request: MetadataRequest,
- socketServer: SocketServer = anySocketServer
- ): MetadataResponse = {
- connectAndReceive[MetadataResponse](request, socketServer)
- }
-
-}
diff --git
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
deleted file mode 100644
index a6ec21d782f..00000000000
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.CreateTopicsRequestData
-import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.CreateTopicsRequest
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
-
- @Test
- def testValidCreateTopicsRequests(): Unit = {
- // Generated assignments
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1"))))
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2",
replicationFactor = 3))))
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic3",
- numPartitions = 5, replicationFactor = 2, config =
Map("min.insync.replicas" -> "2")))))
- // Manual assignments
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic4",
assignment = Map(0 -> List(0))))))
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic5",
- assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)),
- config = Map("min.insync.replicas" -> "2")))))
- // Mixed
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic6"),
- topicReq("topic7", numPartitions = 5, replicationFactor = 2),
- topicReq("topic8", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2
-> List(1, 2))))))
- validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic9"),
- topicReq("topic10", numPartitions = 5, replicationFactor = 2),
- topicReq("topic11", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2
-> List(1, 2)))),
- validateOnly = true))
- // Defaults
- validateValidCreateTopicsRequests(topicsReq(Seq(
- topicReq("topic12", replicationFactor = -1, numPartitions = -1))))
- validateValidCreateTopicsRequests(topicsReq(Seq(
- topicReq("topic13", replicationFactor = 2, numPartitions = -1))))
- validateValidCreateTopicsRequests(topicsReq(Seq(
- topicReq("topic14", replicationFactor = -1, numPartitions = 2))))
- }
-
- @Test
- def testErrorCreateTopicsRequests(): Unit = {
- val existingTopic = "existing-topic"
- createTopic(existingTopic)
- // Basic
- validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq(existingTopic))),
- Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic
'existing-topic' already exists."))))
-
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
numPartitions = -2))),
- Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)),
checkErrorMessage = false)
-
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
- replicationFactor = brokerCount + 1))),
- Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)),
checkErrorMessage = false)
- validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config",
- config=Map("not.a.property" -> "error")))),
- Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage =
false)
-
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-assignment",
- assignment=Map(0 -> List(0, 1), 1 -> List(0))))),
- Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)),
checkErrorMessage = false)
-
- // Partial
- validateErrorCreateTopicsRequests(topicsReq(Seq(
- topicReq(existingTopic),
- topicReq("partial-partitions", numPartitions = -2),
- topicReq("partial-replication", replicationFactor=brokerCount + 1),
- topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 ->
List(0))),
- topicReq("partial-none"))),
- Map(
- existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS),
- "partial-partitions" -> error(Errors.INVALID_PARTITIONS),
- "partial-replication" -> error(Errors.INVALID_REPLICATION_FACTOR),
- "partial-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT),
- "partial-none" -> error(Errors.NONE)
- ), checkErrorMessage = false
- )
- validateTopicExists("partial-none")
- }
-
- @Test
- def testInvalidCreateTopicsRequests(): Unit = {
- // Partitions/ReplicationFactor and ReplicaAssignment
- validateErrorCreateTopicsRequests(topicsReq(Seq(
- topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
- assignment = Map(0 -> List(0))))),
- Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
checkErrorMessage = false)
-
- validateErrorCreateTopicsRequests(topicsReq(Seq(
- topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
- assignment = Map(0 -> List(0)))), validateOnly = true),
- Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
checkErrorMessage = false)
- }
-
- @Test
- def testCreateTopicsRequestVersions(): Unit = {
- for (version <- ApiKeys.CREATE_TOPICS.oldestVersion to
ApiKeys.CREATE_TOPICS.latestVersion) {
- val topic = s"topic_$version"
- val data = new CreateTopicsRequestData()
- data.setTimeoutMs(10000)
- data.setValidateOnly(false)
- data.setTopics(new CreatableTopicCollection(List(
- topicReq(topic, numPartitions = 1, replicationFactor = 1,
- config = Map("min.insync.replicas" -> "2"))
- ).asJava))
-
- val request = new
CreateTopicsRequest.Builder(data).build(version.asInstanceOf[Short])
- val response = sendCreateTopicRequest(request, adminSocketServer)
-
- val topicResponse = response.data.topics.find(topic)
- assertNotNull(topicResponse)
- assertEquals(topic, topicResponse.name)
- assertEquals(Errors.NONE.code, topicResponse.errorCode)
- if (version >= 5) {
- assertEquals(1, topicResponse.numPartitions)
- assertEquals(1, topicResponse.replicationFactor)
- val config = topicResponse.configs().asScala.find(_.name ==
"min.insync.replicas")
- assertTrue(config.isDefined)
- assertEquals("2", config.get.value)
- } else {
- assertEquals(-1, topicResponse.numPartitions)
- assertEquals(-1, topicResponse.replicationFactor)
- assertTrue(topicResponse.configs.isEmpty)
- }
-
- if (version >= 7)
- assertNotEquals(Uuid.ZERO_UUID, topicResponse.topicId())
- else
- assertEquals(Uuid.ZERO_UUID, topicResponse.topicId())
- }
- }
-
- @Test
- def testCreateClusterMetadataTopic(): Unit = {
- validateErrorCreateTopicsRequests(
- topicsReq(Seq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME))),
- Map(Topic.CLUSTER_METADATA_TOPIC_NAME ->
- error(Errors.INVALID_REQUEST, Some(s"Creation of internal topic
${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited.")))
- )
- }
-}