This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a5549ca9bf MINOR: Rename waitForTopic to waitTopicCreation (#20216)
8a5549ca9bf is described below
commit 8a5549ca9bfc722b86fca406a3172f9f5357085d
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Tue Jul 22 21:02:57 2025 +0800
MINOR: Rename waitForTopic to waitTopicCreation (#20216)
Changes: Rename `waitForTopic` to `waitTopicCreation` for better clarity
Reasons: To align with `waitTopicDeletion` Reference:
https://github.com/apache/kafka/pull/20108/files#r2221659660
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>
---
.../kafka/clients/admin/ClientTelemetryTest.java | 2 +-
.../kafka/clients/admin/DeleteTopicTest.java | 2 +-
.../admin/DescribeAuthorizedOperationsTest.java | 6 +-
.../clients/admin/StaticBrokerConfigTest.java | 2 +-
.../clients/consumer/ConsumerIntegrationTest.java | 6 +-
...umerWithLegacyMessageFormatIntegrationTest.java | 2 +-
.../consumer/ShareConsumerRackAwareTest.java | 4 +-
.../security/GroupAuthorizerIntegrationTest.java | 2 +-
.../server/ShareGroupHeartbeatRequestTest.scala | 4 +-
.../kafka/server/LogManagerIntegrationTest.java | 4 +-
.../apache/kafka/server/log/LogAppendTimeTest.java | 2 +-
...ogMetadataManagerMultipleSubscriptionsTest.java | 2 +-
...icBasedRemoteLogMetadataManagerRestartTest.java | 4 +-
.../TopicBasedRemoteLogMetadataManagerTest.java | 6 +-
.../apache/kafka/common/test/ClusterInstance.java | 4 +-
.../test/junit/ClusterTestExtensionsTest.java | 6 +-
.../kafka/tools/ConfigCommandIntegrationTest.java | 2 +-
.../org/apache/kafka/tools/LogDirsCommandTest.java | 2 +-
.../org/apache/kafka/tools/TopicCommandTest.java | 82 +++++++++++-----------
.../reassign/ReassignPartitionsCommandTest.java | 6 +-
20 files changed, 75 insertions(+), 75 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ClientTelemetryTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ClientTelemetryTest.java
index 008b6096c3d..4629cb3b078 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ClientTelemetryTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ClientTelemetryTest.java
@@ -79,7 +79,7 @@ public class ClientTelemetryTest {
try (Admin admin = Admin.create(configs)) {
String testTopicName = "test_topic";
admin.createTopics(Collections.singletonList(new
NewTopic(testTopicName, 1, (short) 1)));
- clusterInstance.waitForTopic(testTopicName, 1);
+ clusterInstance.waitTopicCreation(testTopicName, 1);
Map<String, Object> producerConfigs = new HashMap<>();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers());
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
index 1e1639ec62c..a803b426f50 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
@@ -166,7 +166,7 @@ public class DeleteTopicTest {
"Follower " + followerBrokerId + " was not shutdown");
Map<String, NewPartitions> newPartitionSet = Map.of(DEFAULT_TOPIC,
NewPartitions.increaseTo(3));
admin.createPartitions(newPartitionSet);
- cluster.waitForTopic(DEFAULT_TOPIC, 3);
+ cluster.waitTopicCreation(DEFAULT_TOPIC, 3);
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
follower.startup();
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeAuthorizedOperationsTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeAuthorizedOperationsTest.java
index 32677ee22f0..1464fc06eb1 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeAuthorizedOperationsTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DescribeAuthorizedOperationsTest.java
@@ -118,7 +118,7 @@ public class DescribeAuthorizedOperationsTest {
Admin user1 =
clusterInstance.admin(createAdminConfig(JaasUtils.KAFKA_PLAIN_USER1,
JaasUtils.KAFKA_PLAIN_USER1_PASSWORD))
) {
admin.createTopics(List.of(new NewTopic("topic1", 1, (short) 1)));
- clusterInstance.waitForTopic("topic1", 1);
+ clusterInstance.waitTopicCreation("topic1", 1);
// create consumers to avoid group not found error
TopicPartition tp = new TopicPartition("topic1", 0);
@@ -193,8 +193,8 @@ public class DescribeAuthorizedOperationsTest {
new NewTopic(topic1, 1, (short) 1),
new NewTopic(topic2, 1, (short) 1)
));
- clusterInstance.waitForTopic(topic1, 1);
- clusterInstance.waitForTopic(topic2, 1);
+ clusterInstance.waitTopicCreation(topic1, 1);
+ clusterInstance.waitTopicCreation(topic2, 1);
}
try (Admin admin =
clusterInstance.admin(createAdminConfig(JaasUtils.KAFKA_PLAIN_USER1,
JaasUtils.KAFKA_PLAIN_USER1_PASSWORD))) {
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
index 46c8206931b..8ba228dd252 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
@@ -133,7 +133,7 @@ public class StaticBrokerConfigTest {
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short)
1))).config(TOPIC).get();
// make sure the topic metadata exist
- cluster.waitForTopic(TOPIC, 1);
+ cluster.waitTopicCreation(TOPIC, 1);
Map<ConfigResource, Config> configResourceMap =
admin.describeConfigs(
List.of(brokerResource, topicResource, groupResource,
clientMetricsResource)).all().get();
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 03a9b159e23..220866c240f 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -257,7 +257,7 @@ public class ConsumerIntegrationTest {
) {
// Create a new topic with 1 partition on broker 0.
admin.createTopics(List.of(new NewTopic(topic, Map.of(0,
List.of(0)))));
- clusterInstance.waitForTopic(topic, 1);
+ clusterInstance.waitTopicCreation(topic, 1);
producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes()));
producer.flush();
@@ -282,7 +282,7 @@ public class ConsumerIntegrationTest {
NewPartitions.increaseTo(3, List.of(List.of(1),
List.of(1)))
)
);
- clusterInstance.waitForTopic(topic, 3);
+ clusterInstance.waitTopicCreation(topic, 3);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
@@ -299,7 +299,7 @@ public class ConsumerIntegrationTest {
NewPartitions.increaseTo(6, List.of(List.of(2),
List.of(2), List.of(2)))
)
);
- clusterInstance.waitForTopic(topic, 6);
+ clusterInstance.waitTopicCreation(topic, 6);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerWithLegacyMessageFormatIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerWithLegacyMessageFormatIntegrationTest.java
index fb35bad3a0b..755d9c89b7c 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerWithLegacyMessageFormatIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerWithLegacyMessageFormatIntegrationTest.java
@@ -112,7 +112,7 @@ public class ConsumerWithLegacyMessageFormatIntegrationTest
{
try (Admin admin = cluster.admin()) {
NewTopic newTopic = new NewTopic(topic, assignment);
admin.createTopics(List.of(newTopic));
- cluster.waitForTopic(topic, assignment.size());
+ cluster.waitTopicCreation(topic, assignment.size());
}
}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 8d6a8469694..4e60b0e12cc 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -76,7 +76,7 @@ public class ShareConsumerRackAwareTest {
) {
// Create a new topic with 1 partition on broker 0.
admin.createTopics(List.of(new NewTopic(topic, Map.of(0,
List.of(0)))));
- clusterInstance.waitForTopic(topic, 1);
+ clusterInstance.waitTopicCreation(topic, 1);
producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes()));
producer.flush();
@@ -105,7 +105,7 @@ public class ShareConsumerRackAwareTest {
NewPartitions.increaseTo(3, List.of(List.of(1),
List.of(1)))
)
);
- clusterInstance.waitForTopic(topic, 3);
+ clusterInstance.waitTopicCreation(topic, 3);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
index d899e4faea9..32069732db8 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
@@ -111,7 +111,7 @@ public class GroupAuthorizerIntegrationTest {
AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true))
) {
admin.createTopics(Collections.singleton(offsetTopic));
- clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1);
+ clusterInstance.waitTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME,
1);
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 390fb11be36..6ab0d40d432 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -472,8 +472,8 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
.setMemberEpoch(1)
).build()
- cluster.waitForTopic("foo", 2)
- cluster.waitForTopic("bar", 3)
+ cluster.waitTopicCreation("foo", 2)
+ cluster.waitTopicCreation("bar", 3)
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
diff --git
a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java
index 15a57bbe22d..36f035c80f7 100644
---
a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java
@@ -64,7 +64,7 @@ public class LogManagerIntegrationTest {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic("foo", 1, (short)
1))).all().get();
}
- cluster.waitForTopic("foo", 1);
+ cluster.waitTopicCreation("foo", 1);
// Produce some data into the topic
Map<String, Object> producerConfigs = Map.of(
@@ -129,7 +129,7 @@ public class LogManagerIntegrationTest {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic("foo", 1, (short)
3))).all().get();
}
- cluster.waitForTopic("foo", 1);
+ cluster.waitTopicCreation("foo", 1);
Optional<PartitionMetadataFile> partitionMetadataFile =
cluster.brokers().get(0).logManager()
.getLog(new TopicPartition("foo", 0), false).get()
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
index 7269b2e4b5c..bd26f99848b 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
@@ -77,7 +77,7 @@ public class LogAppendTimeTest {
admin.createTopics(List.of(
new NewTopic(TOPIC, NUM_PARTITION, NUM_REPLICAS).
configs(Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime"))));
- clusterInstance.waitForTopic(TOPIC, NUM_PARTITION);
+ clusterInstance.waitTopicCreation(TOPIC, NUM_PARTITION);
}
testProduceConsume(clusterInstance);
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index d8435bad2f5..c867b68635f 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -174,7 +174,7 @@ public class
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
private void createTopic(String topic, Map<Integer, List<Integer>>
replicasAssignments) {
try (Admin admin =
Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
admin.createTopics(List.of(new NewTopic(topic,
replicasAssignments)));
- assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic,
replicasAssignments.size()));
+ assertDoesNotThrow(() -> clusterInstance.waitTopicCreation(topic,
replicasAssignments.size()));
}
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 200fe7d7db6..908c9ef014c 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -66,8 +66,8 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
NewTopic newFollowerTopic = new NewTopic(followerTopic, Map.of(0,
List.of(1, 2, 0)));
admin.createTopics(List.of(newLeaderTopic,
newFollowerTopic)).all().get();
}
- clusterInstance.waitForTopic(leaderTopic, 1);
- clusterInstance.waitForTopic(followerTopic, 1);
+ clusterInstance.waitTopicCreation(leaderTopic, 1);
+ clusterInstance.waitTopicCreation(followerTopic, 1);
TopicIdPartition leaderTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
TopicIdPartition followerTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 7817c7ae014..4e40b7c7018 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -88,7 +88,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
try (Admin admin = clusterInstance.admin()) {
String topic = "test-topic-exist";
admin.createTopics(List.of(new NewTopic(topic, 1, (short)
1))).all().get();
- clusterInstance.waitForTopic(topic, 1);
+ clusterInstance.waitTopicCreation(topic, 1);
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin,
topic);
assertTrue(doesTopicExist);
}
@@ -137,9 +137,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
try (Admin admin = clusterInstance.admin()) {
// Set broker id 0 as the first entry which is taken as the leader.
admin.createTopics(List.of(new NewTopic(leaderTopic, Map.of(0,
List.of(0, 1, 2))))).all().get();
- clusterInstance.waitForTopic(leaderTopic, 1);
+ clusterInstance.waitTopicCreation(leaderTopic, 1);
admin.createTopics(List.of(new NewTopic(followerTopic, Map.of(0,
List.of(1, 2, 0))))).all().get();
- clusterInstance.waitForTopic(followerTopic, 1);
+ clusterInstance.waitTopicCreation(followerTopic, 1);
}
final TopicIdPartition newLeaderTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index ceb30af6e97..243ca5394d5 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -319,7 +319,7 @@ public interface ClusterInstance {
default void createTopic(String topicName, int partitions, short replicas,
Map<String, String> props) throws InterruptedException {
try (Admin admin = admin()) {
admin.createTopics(List.of(new NewTopic(topicName, partitions,
replicas).configs(props)));
- waitForTopic(topicName, partitions);
+ waitTopicCreation(topicName, partitions);
}
}
@@ -338,7 +338,7 @@ public interface ClusterInstance {
void waitForReadyBrokers() throws InterruptedException;
- default void waitForTopic(String topic, int partitions) throws
InterruptedException {
+ default void waitTopicCreation(String topic, int partitions) throws
InterruptedException {
if (partitions <= 0) {
throw new IllegalArgumentException("Partition count must be > 0,
but was " + partitions);
}
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index 73615455ee0..f7403cab91c 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -242,7 +242,7 @@ public class ClusterTestExtensionsTest {
short numReplicas = 3;
clusterInstance.createTopic(topicName, numPartition, numReplicas);
clusterInstance.shutdownBroker(0);
- clusterInstance.waitForTopic(topicName, numPartition);
+ clusterInstance.waitTopicCreation(topicName, numPartition);
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
@@ -273,7 +273,7 @@ public class ClusterTestExtensionsTest {
try (Admin admin = clusterInstance.admin()) {
String testTopic = "testTopic";
admin.createTopics(List.of(new NewTopic(testTopic, 1, (short) 1)));
- clusterInstance.waitForTopic(testTopic, 1);
+ clusterInstance.waitTopicCreation(testTopic, 1);
admin.deleteTopics(List.of(testTopic));
clusterInstance.waitTopicDeletion(testTopic);
Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch(
@@ -357,7 +357,7 @@ public class ClusterTestExtensionsTest {
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName()))) {
admin.createTopics(List.of(new NewTopic(topicName, 1, (short)
1))).all().get();
- cluster.waitForTopic(topicName, 1);
+ cluster.waitTopicCreation(topicName, 1);
cluster.brokers().values().forEach(broker -> {
broker.shutdown();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index d0407c23d6f..c494821a5b0 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -224,7 +224,7 @@ public class ConfigCommandIntegrationTest {
try (Admin client = cluster.admin()) {
NewTopic newTopic = new NewTopic("topic", 1, (short) 1);
client.createTopics(Set.of(newTopic)).all().get();
- cluster.waitForTopic("topic", 1);
+ cluster.waitTopicCreation("topic", 1);
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "topics",
"--entity-name", "topic",
diff --git a/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
index b17491cf582..f4c1f471cc8 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java
@@ -222,7 +222,7 @@ public class LogDirsCommandTest {
private void createTopic(ClusterInstance clusterInstance, String topic) {
try (Admin admin =
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
assertDoesNotThrow(() ->
admin.createTopics(Collections.singletonList(new NewTopic(topic,
Collections.singletonMap(0,
Collections.singletonList(0))))).topicId(topic).get());
- assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 1));
+ assertDoesNotThrow(() -> clusterInstance.waitTopicCreation(topic,
1));
}
}
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index e5a786ce026..99e5e7be9c7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -387,7 +387,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
"Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
@@ -411,7 +411,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
"Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
@@ -443,7 +443,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, 2, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, 2);
+ clusterInstance.waitTopicCreation(testTopicName, 2);
List<TopicPartitionInfo> partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames()
@@ -461,7 +461,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, (short) 2)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
List<TopicPartitionInfo> partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames()
@@ -484,7 +484,7 @@ public class TopicCommandTest {
topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, 2, (short) 2).configs(topicConfig)));
- clusterInstance.waitForTopic(testTopicName, 2);
+ clusterInstance.waitTopicCreation(testTopicName, 2);
Config configs =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
@@ -503,7 +503,7 @@ public class TopicCommandTest {
"--topic", testTopicName);
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
// try to re-create the topic
assertThrows(TopicExistsException.class, () ->
topicService.createTopic(createOpts),
@@ -517,7 +517,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
TopicCommand.TopicCommandOptions createOpts =
buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--create", "--topic", testTopicName, "--if-not-exists");
@@ -540,7 +540,7 @@ public class TopicCommandTest {
replicaAssignmentMap.put(2, Arrays.asList(1, 0));
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, replicaAssignmentMap)));
- clusterInstance.waitForTopic(testTopicName, 3);
+ clusterInstance.waitTopicCreation(testTopicName, 3);
List<TopicPartitionInfo> partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
@@ -611,7 +611,7 @@ public class TopicCommandTest {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
String output = captureListTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list"));
assertTrue(output.contains(testTopicName), "Expected topic name to
be present in output: " + output);
@@ -629,9 +629,9 @@ public class TopicCommandTest {
adminClient.createTopics(Collections.singletonList(new
NewTopic(topic1, partition, replicationFactor)));
adminClient.createTopics(Collections.singletonList(new
NewTopic(topic2, partition, replicationFactor)));
adminClient.createTopics(Collections.singletonList(new
NewTopic(topic3, partition, replicationFactor)));
- clusterInstance.waitForTopic(topic1, partition);
- clusterInstance.waitForTopic(topic2, partition);
- clusterInstance.waitForTopic(topic3, partition);
+ clusterInstance.waitTopicCreation(topic1, partition);
+ clusterInstance.waitTopicCreation(topic2, partition);
+ clusterInstance.waitTopicCreation(topic3, partition);
String output = captureListTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--topic",
"kafka.*"));
assertTrue(output.contains(topic1), "Expected topic name " +
topic1 + " to be present in output: " + output);
@@ -648,7 +648,7 @@ public class TopicCommandTest {
int partition = 2;
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(topic1, partition, replicationFactor)));
- clusterInstance.waitForTopic(topic1, partition);
+ clusterInstance.waitTopicCreation(topic1, partition);
String output = captureListTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list",
"--exclude-internal"));
assertTrue(output.contains(topic1), "Expected topic name " +
topic1 + " to be present in output: " + output);
@@ -664,7 +664,7 @@ public class TopicCommandTest {
int partition = 2;
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partition);
+ clusterInstance.waitTopicCreation(testTopicName, partition);
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--alter", "--topic", testTopicName, "--partitions", "3"));
TestUtils.waitForCondition(
@@ -690,7 +690,7 @@ public class TopicCommandTest {
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partition);
+ clusterInstance.waitTopicCreation(testTopicName, partition);
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--alter",
"--topic", testTopicName, "--replica-assignment",
"5:3,3:1,4:2", "--partitions", "3"));
@@ -722,7 +722,7 @@ public class TopicCommandTest {
int partition = 2;
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partition);
+ clusterInstance.waitTopicCreation(testTopicName, partition);
assertThrows(ExecutionException.class,
() ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--alter",
@@ -740,7 +740,7 @@ public class TopicCommandTest {
int partition = 2;
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partition);
+ clusterInstance.waitTopicCreation(testTopicName, partition);
assertThrows(ExecutionException.class,
() ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--alter", "--topic", testTopicName,
@@ -757,7 +757,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
assertThrows(ExecutionException.class,
() ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--alter", "--partitions", "-1", "--topic", testTopicName)),
@@ -806,7 +806,7 @@ public class TopicCommandTest {
int numPartitions = 18;
int replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, numPartitions, (short) replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, numPartitions);
+ clusterInstance.waitTopicCreation(testTopicName, numPartitions);
Map<Integer, List<Integer>> assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions()
@@ -851,7 +851,7 @@ public class TopicCommandTest {
HashMap<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy);
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions,
defaultReplicationFactor).configs(topicConfig)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Config props =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
@@ -888,7 +888,7 @@ public class TopicCommandTest {
String testTopicName = TestUtils.randomString(10);
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
// delete the NormalTopic
TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic",
testTopicName);
@@ -914,7 +914,7 @@ public class TopicCommandTest {
// create the topic with colliding chars
String topicWithCollidingChar = "test.a";
adminClient.createTopics(Collections.singletonList(new
NewTopic(topicWithCollidingChar, defaultNumPartitions,
defaultReplicationFactor)));
- clusterInstance.waitForTopic(topicWithCollidingChar,
defaultNumPartitions);
+ clusterInstance.waitTopicCreation(topicWithCollidingChar,
defaultNumPartitions);
// delete the topic
TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic",
topicWithCollidingChar);
@@ -928,7 +928,7 @@ public class TopicCommandTest {
// recreate same topic
adminClient.createTopics(Collections.singletonList(new
NewTopic(topicWithCollidingChar, defaultNumPartitions,
defaultReplicationFactor)));
- clusterInstance.waitForTopic(topicWithCollidingChar,
defaultNumPartitions);
+ clusterInstance.waitTopicCreation(topicWithCollidingChar,
defaultNumPartitions);
}
}
@@ -945,7 +945,7 @@ public class TopicCommandTest {
// create the offset topic
adminClient.createTopics(Collections.singletonList(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions,
defaultReplicationFactor)));
- clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME,
defaultNumPartitions);
+ clusterInstance.waitTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME,
defaultNumPartitions);
// Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is
allowed by default.
// This is a difference between the new and the old command as the
old one didn't allow internal topic deletion.
@@ -1002,7 +1002,7 @@ public class TopicCommandTest {
int partition = 2;
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partition);
+ clusterInstance.waitTopicCreation(testTopicName, partition);
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic",
testTopicName));
String[] rows = output.split(System.lineSeparator());
@@ -1025,11 +1025,11 @@ public class TopicCommandTest {
topics.add(new NewTopic("test-5", 100, (short) 2));
adminClient.createTopics(topics);
- clusterInstance.waitForTopic(testTopicName, 20);
- clusterInstance.waitForTopic("test-2", 41);
- clusterInstance.waitForTopic("test-3", 5);
- clusterInstance.waitForTopic("test-4", 5);
- clusterInstance.waitForTopic("test-5", 100);
+ clusterInstance.waitTopicCreation(testTopicName, 20);
+ clusterInstance.waitTopicCreation("test-2", 41);
+ clusterInstance.waitTopicCreation("test-3", 5);
+ clusterInstance.waitTopicCreation("test-4", 5);
+ clusterInstance.waitTopicCreation("test-5", 100);
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--describe", "--partition-size-limit-per-response=20",
"--exclude-internal"));
@@ -1076,14 +1076,14 @@ public class TopicCommandTest {
short replicationFactor = 1;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partitions);
+ clusterInstance.waitTopicCreation(testTopicName, partitions);
// check which partition is on broker 0 which we'll kill
clusterInstance.shutdownBroker(0);
assertEquals(2, clusterInstance.aliveBrokers().size());
// wait until the topic metadata for the test topic is propagated
to each alive broker
- clusterInstance.waitForTopic(testTopicName, 3);
+ clusterInstance.waitTopicCreation(testTopicName, 3);
// grab the console output and assert
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic",
testTopicName, "--unavailable-partitions"));
@@ -1103,7 +1103,7 @@ public class TopicCommandTest {
int partitions = 1;
short replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
- clusterInstance.waitForTopic(testTopicName, partitions);
+ clusterInstance.waitTopicCreation(testTopicName, partitions);
clusterInstance.shutdownBroker(0);
Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 2);
@@ -1134,7 +1134,7 @@ public class TopicCommandTest {
int partitions = 1;
short replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
- clusterInstance.waitForTopic(testTopicName, partitions);
+ clusterInstance.waitTopicCreation(testTopicName, partitions);
clusterInstance.shutdownBroker(0);
assertEquals(2, clusterInstance.aliveBrokers().size());
@@ -1158,7 +1158,7 @@ public class TopicCommandTest {
try (Admin adminClient = clusterInstance.admin();
KafkaProducer<String, String> producer =
createProducer(clusterInstance)) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
TopicPartition tp = new TopicPartition(testTopicName, 0);
@@ -1236,7 +1236,7 @@ public class TopicCommandTest {
short replicationFactor = 6;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
- clusterInstance.waitForTopic(testTopicName, partitions);
+ clusterInstance.waitTopicCreation(testTopicName, partitions);
clusterInstance.shutdownBroker(0);
clusterInstance.shutdownBroker(1);
@@ -1293,7 +1293,7 @@ public class TopicCommandTest {
adminClient.createTopics(newTopics);
for (NewTopic topioc: newTopics) {
- clusterInstance.waitForTopic(topioc.name(), partitions);
+ clusterInstance.waitTopicCreation(topioc.name(), partitions);
}
clusterInstance.shutdownBroker(0);
@@ -1334,7 +1334,7 @@ public class TopicCommandTest {
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
- clusterInstance.waitForTopic(testTopicName, partitions);
+ clusterInstance.waitTopicCreation(testTopicName, partitions);
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe"));
assertTrue(output.contains(config), String.format("Describe output
should have contained %s", config));
@@ -1346,7 +1346,7 @@ public class TopicCommandTest {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName,
defaultNumPartitions);
// test describe
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe",
"--describe", "--exclude-internal"));
@@ -1377,7 +1377,7 @@ public class TopicCommandTest {
Collections.singleton(new TopicPartition(testTopicName, 0))
);
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
- clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(testTopicName, defaultNumPartitions);
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic",
testTopicName));
String[] rows = output.split(System.lineSeparator());
@@ -1396,7 +1396,7 @@ public class TopicCommandTest {
int partitions = 1;
short replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(topic, partitions, replicationFactor)));
- clusterInstance.waitForTopic(topic, defaultNumPartitions);
+ clusterInstance.waitTopicCreation(topic, defaultNumPartitions);
assertThrows(TopicExistsException.class,
() ->
topicService.createTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--create", "--topic", topic)));
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index c83d16f0a4d..381adc4699b 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -486,19 +486,19 @@ public class ReassignPartitionsCommandTest {
fooReplicasAssignments.put(0, List.of(0, 1, 2));
fooReplicasAssignments.put(1, List.of(1, 2, 3));
Assertions.assertDoesNotThrow(() -> admin.createTopics(List.of(new
NewTopic("foo", fooReplicasAssignments))).topicId("foo").get());
- Assertions.assertDoesNotThrow(() ->
clusterInstance.waitForTopic("foo", fooReplicasAssignments.size()));
+ Assertions.assertDoesNotThrow(() ->
clusterInstance.waitTopicCreation("foo", fooReplicasAssignments.size()));
Map<Integer, List<Integer>> barReplicasAssignments = new
HashMap<>();
barReplicasAssignments.put(0, List.of(3, 2, 1));
Assertions.assertDoesNotThrow(() -> admin.createTopics(List.of(new
NewTopic("bar", barReplicasAssignments))).topicId("bar").get());
- Assertions.assertDoesNotThrow(() ->
clusterInstance.waitForTopic("bar", barReplicasAssignments.size()));
+ Assertions.assertDoesNotThrow(() ->
clusterInstance.waitTopicCreation("bar", barReplicasAssignments.size()));
Map<Integer, List<Integer>> bazReplicasAssignments = new
HashMap<>();
bazReplicasAssignments.put(0, List.of(1, 0, 2));
bazReplicasAssignments.put(1, List.of(2, 0, 1));
bazReplicasAssignments.put(2, List.of(0, 2, 1));
Assertions.assertDoesNotThrow(() -> admin.createTopics(List.of(new
NewTopic("baz", bazReplicasAssignments))).topicId("baz").get());
- Assertions.assertDoesNotThrow(() ->
clusterInstance.waitForTopic("baz", bazReplicasAssignments.size()));
+ Assertions.assertDoesNotThrow(() ->
clusterInstance.waitTopicCreation("baz", bazReplicasAssignments.size()));
}
}