This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55260e9835a KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest
to clients-integration-tests module (#20339)
55260e9835a is described below
commit 55260e9835ae74c7eb1e1c54e90b7f00f6e278d2
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Fri Aug 15 17:44:47 2025 +0800
KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest to
clients-integration-tests module (#20339)
This PR does the following:
- Rewrite to new test infra.
- Rewrite to java.
- Move to clients-integration-tests.
- Add `ensureConsistentMetadata` method to `ClusterInstance`,
similar to `ensureConsistentKRaftMetadata` in the old infra, and
refactors related code.
Reviewers: TengYao Chi <[email protected]>, Ken Huang
<[email protected]>
---
.../AdminClientWithPoliciesIntegrationTest.java | 201 +++++++++++++++++
.../AdminClientWithPoliciesIntegrationTest.scala | 247 ---------------------
.../apache/kafka/common/test/ClusterInstance.java | 17 +-
3 files changed, 211 insertions(+), 254 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java
new file mode 100644
index 00000000000..8bad3b1c900
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.server.policy.AlterConfigPolicy;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests AdminClient calls when the broker is configured with policies -
AlterConfigPolicy.
+ */
+
+@ClusterTestDefaults(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key =
ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, value =
"org.apache.kafka.clients.admin.AdminClientWithPoliciesIntegrationTest$Policy"),
+ }
+)
+public class AdminClientWithPoliciesIntegrationTest {
+ private final ClusterInstance clusterInstance;
+ private static List<AlterConfigPolicy.RequestMetadata> validations = new
ArrayList<>();
+
+ AdminClientWithPoliciesIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @BeforeEach
+ public void setup() throws InterruptedException {
+ clusterInstance.waitForReadyBrokers();
+ }
+
+ @ClusterTest
+ public void testInvalidAlterConfigsDueToPolicy() throws Exception {
+ try (final Admin adminClient = clusterInstance.admin()) {
+ // Create topics
+ String topic1 = "invalid-alter-configs-due-to-policy-topic-1";
+ String topic2 = "invalid-alter-configs-due-to-policy-topic-2";
+ String topic3 = "invalid-alter-configs-due-to-policy-topic-3";
+ clusterInstance.createTopic(topic1, 1, (short) 1);
+ clusterInstance.createTopic(topic2, 1, (short) 1);
+ clusterInstance.createTopic(topic3, 1, (short) 1);
+
+ ConfigResource topicResource1 = new
ConfigResource(ConfigResource.Type.TOPIC, topic1);
+ ConfigResource topicResource2 = new
ConfigResource(ConfigResource.Type.TOPIC, topic2);
+ ConfigResource topicResource3 = new
ConfigResource(ConfigResource.Type.TOPIC, topic3);
+
+ // Set a mutable broker config
+ ConfigResource brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, "0"); // "0" represents the broker ID
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(
+ brokerResource, List.of(new AlterConfigOp(new
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET))
+ );
+ adminClient.incrementalAlterConfigs(configOps).all().get();
+ assertEquals(Set.of(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG),
validationsForResource(brokerResource).get(0).configs().keySet());
+ validations.clear();
+
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = new
HashMap<>();
+ alterConfigs.put(topicResource1, List.of(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET)
+ ));
+ alterConfigs.put(topicResource2, List.of(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET)));
+ alterConfigs.put(topicResource3, List.of(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET)));
+ alterConfigs.put(brokerResource, List.of(new AlterConfigOp(new
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET)));
+
+ // Alter configs: second is valid, the others are invalid
+ AlterConfigsResult alterResult =
adminClient.incrementalAlterConfigs(alterConfigs);
+ assertEquals(Set.of(topicResource1, topicResource2,
topicResource3, brokerResource), alterResult.values().keySet());
+ assertFutureThrows(PolicyViolationException.class,
alterResult.values().get(topicResource1));
+ alterResult.values().get(topicResource2).get();
+ assertFutureThrows(InvalidConfigurationException.class,
alterResult.values().get(topicResource3));
+ assertFutureThrows(InvalidRequestException.class,
alterResult.values().get(brokerResource));
+ assertTrue(validationsForResource(brokerResource).isEmpty(),
+ "Should not see the broker resource in the AlterConfig
policy when the broker configs are not being updated.");
+ validations.clear();
+
+ // Verify that the second resource was updated and the others were
not
+ clusterInstance.ensureConsistentMetadata();
+ DescribeConfigsResult describeResult =
adminClient.describeConfigs(List.of(topicResource1, topicResource2,
topicResource3, brokerResource));
+ var configs = describeResult.all().get();
+ assertEquals(4, configs.size());
+
+
assertEquals(String.valueOf(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO),
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+
assertEquals(String.valueOf(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT),
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
+
+ assertEquals("0.8",
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+
+
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value());
+
+ // Alter configs with validateOnly = true: only second is valid
+ alterConfigs.put(topicResource2, List.of(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET)));
+ alterResult = adminClient.incrementalAlterConfigs(alterConfigs,
new AlterConfigsOptions().validateOnly(true));
+
+ assertFutureThrows(PolicyViolationException.class,
alterResult.values().get(topicResource1));
+ alterResult.values().get(topicResource2).get();
+ assertFutureThrows(InvalidConfigurationException.class,
alterResult.values().get(topicResource3));
+ assertFutureThrows(InvalidRequestException.class,
alterResult.values().get(brokerResource));
+ assertTrue(validationsForResource(brokerResource).isEmpty(),
+ "Should not see the broker resource in the AlterConfig
policy when the broker configs are not being updated.");
+ validations.clear();
+
+ // Verify that no resources are updated since validate_only = true
+ clusterInstance.ensureConsistentMetadata();
+ describeResult =
adminClient.describeConfigs(List.of(topicResource1, topicResource2,
topicResource3, brokerResource));
+ configs = describeResult.all().get();
+ assertEquals(4, configs.size());
+
+
assertEquals(String.valueOf(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO),
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+
assertEquals(String.valueOf(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT),
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
+
+ assertEquals("0.8",
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+
+
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value());
+
+ // Do an incremental alter config on the broker, ensure we don't
see the broker config we set earlier in the policy
+ alterResult = adminClient.incrementalAlterConfigs(Map.of(
+ brokerResource, List.of(new AlterConfigOp(new
ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, "9999"), OpType.SET))
+ ));
+ alterResult.all().get();
+ assertEquals(Set.of(SocketServerConfigs.MAX_CONNECTIONS_CONFIG),
validationsForResource(brokerResource).get(0).configs().keySet());
+ }
+ }
+
+ private static List<AlterConfigPolicy.RequestMetadata>
validationsForResource(ConfigResource resource) {
+ return validations.stream().filter(req ->
req.resource().equals(resource)).toList();
+ }
+
+ /**
+ * Used in @ClusterTestDefaults serverProperties, so it may appear unused
in the IDE.
+ */
+ public static class Policy implements AlterConfigPolicy {
+ private Map<String, ?> configs;
+ private boolean closed = false;
+
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ validations.clear();
+ this.configs = configs;
+ }
+
+ @Override
+ public void validate(AlterConfigPolicy.RequestMetadata
requestMetadata) {
+ validations.add(requestMetadata);
+ assertFalse(closed, "Policy should not be closed");
+ assertFalse(configs.isEmpty(), "configure should have been called
with non empty configs");
+ assertFalse(requestMetadata.configs().isEmpty(), "request configs
should not be empty");
+ assertFalse(requestMetadata.resource().name().isEmpty(), "resource
name should not be empty");
+ if
(requestMetadata.configs().containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
{
+ throw new PolicyViolationException("Min in sync replicas
cannot be updated");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.closed = true;
+ }
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
deleted file mode 100644
index e13e2655e83..00000000000
---
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.util
-import java.util.Properties
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsOptions, ConfigEntry}
-import org.apache.kafka.common.config.{ConfigResource, SslConfigs, TopicConfig}
-import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException, PolicyViolationException}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
-import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.apache.kafka.storage.internals.log.LogConfig
-import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-/**
- * Tests AdminClient calls when the broker is configured with policies like
AlterConfigPolicy, CreateTopicPolicy, etc.
- */
-@Timeout(120)
-class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness
with Logging {
-
- import AdminClientWithPoliciesIntegrationTest._
-
- var client: Admin = _
- val brokerCount = 3
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- TestUtils.waitUntilBrokerMetadataIsPropagated(brokers)
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- if (client != null)
- Utils.closeQuietly(client, "AdminClient")
- super.tearDown()
- }
-
- def createConfig: util.Map[String, Object] =
- util.Map.of[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers())
-
- override def generateConfigs: collection.Seq[KafkaConfig] = {
- val configs = TestUtils.createBrokerConfigs(brokerCount)
- configs.foreach(overrideNodeConfigs)
- configs.map(KafkaConfig.fromProps)
- }
-
- override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
- val props = new Properties()
- overrideNodeConfigs(props)
- Seq(props)
- }
-
- private def overrideNodeConfigs(props: Properties): Unit = {
- props.put(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
classOf[Policy])
- }
-
- @Test
- def testValidAlterConfigs(): Unit = {
- client = Admin.create(createConfig)
- // Create topics
- val topic1 = "describe-alter-configs-topic-1"
- val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- val topicConfig1 = new Properties
- val maxMessageBytes = "500000"
- val retentionMs = "60000000"
- topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
maxMessageBytes)
- topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs)
- createTopic(topic1, 1, 1, topicConfig1)
-
- val topic2 = "describe-alter-configs-topic-2"
- val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- createTopic(topic2)
-
- PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this,
topicResource1, topicResource2, maxMessageBytes, retentionMs)
- }
-
- @Test
- def testInvalidAlterConfigs(): Unit = {
- client = Admin.create(createConfig)
- PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
- }
-
- @Test
- def testInvalidAlterConfigsDueToPolicy(): Unit = {
- client = Admin.create(createConfig)
-
- // Create topics
- val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
- val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- createTopic(topic1)
-
- val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
- val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- createTopic(topic2)
-
- val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
- val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
- createTopic(topic3)
-
- // Set a mutable broker config
- val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
brokers.head.config.brokerId.toString)
- var alterResult =
client.incrementalAlterConfigs(util.Map.of(brokerResource,
- util.List.of(new AlterConfigOp(new
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET))))
- alterResult.all.get
- assertEquals(Set(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG),
validationsForResource(brokerResource).head.configs().keySet().asScala)
- validations.clear()
-
- val alterConfigs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
- alterConfigs.put(topicResource1, util.List.of(
- new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
- new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET)
- ))
-
- alterConfigs.put(topicResource2, util.List.of(
- new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET),
- ))
-
- alterConfigs.put(topicResource3, util.List.of(
- new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET),
- ))
-
- alterConfigs.put(brokerResource, util.List.of(
- new AlterConfigOp(new
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET),
- ))
-
- // Alter configs: second is valid, the others are invalid
- alterResult = client.incrementalAlterConfigs(alterConfigs)
-
- assertEquals(util.Set.of(topicResource1, topicResource2, topicResource3,
brokerResource), alterResult.values.keySet)
- assertFutureThrows(classOf[PolicyViolationException],
alterResult.values.get(topicResource1))
- alterResult.values.get(topicResource2).get
- assertFutureThrows(classOf[InvalidConfigurationException],
alterResult.values.get(topicResource3))
- assertFutureThrows(classOf[InvalidRequestException],
alterResult.values.get(brokerResource))
- assertTrue(validationsForResource(brokerResource).isEmpty,
- "Should not see the broker resource in the AlterConfig policy when the
broker configs are not being updated.")
- validations.clear()
-
- // Verify that the second resource was updated and the others were not
- ensureConsistentKRaftMetadata()
- var describeResult = client.describeConfigs(util.List.of(topicResource1,
topicResource2, topicResource3, brokerResource))
- var configs = describeResult.all.get
- assertEquals(4, configs.size)
-
- assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString,
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
- assertEquals(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.toString,
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value)
-
- assertEquals("0.8",
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
-
-
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
-
- // Alter configs with validateOnly = true: only second is valid
- alterConfigs.put(topicResource2, util.List.of(
- new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET),
- ))
-
- alterResult = client.incrementalAlterConfigs(alterConfigs, new
AlterConfigsOptions().validateOnly(true))
-
- assertEquals(util.Set.of(topicResource1, topicResource2, topicResource3,
brokerResource), alterResult.values.keySet)
- assertFutureThrows(classOf[PolicyViolationException],
alterResult.values.get(topicResource1))
- alterResult.values.get(topicResource2).get
- assertFutureThrows(classOf[InvalidConfigurationException],
alterResult.values.get(topicResource3))
- assertFutureThrows(classOf[InvalidRequestException],
alterResult.values.get(brokerResource))
- assertTrue(validationsForResource(brokerResource).isEmpty,
- "Should not see the broker resource in the AlterConfig policy when the
broker configs are not being updated.")
- validations.clear()
-
- // Verify that no resources are updated since validate_only = true
- ensureConsistentKRaftMetadata()
- describeResult = client.describeConfigs(util.List.of(topicResource1,
topicResource2, topicResource3, brokerResource))
- configs = describeResult.all.get
- assertEquals(4, configs.size)
-
- assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString,
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
- assertEquals(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.toString,
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value)
-
- assertEquals("0.8",
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
-
-
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
-
- // Do an incremental alter config on the broker, ensure we don't see the
broker config we set earlier in the policy
- alterResult = client.incrementalAlterConfigs(util.Map.of(
- brokerResource ,
- util.List.of(new AlterConfigOp(
- new ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, "9999"),
OpType.SET)
- )
- ))
- alterResult.all.get
- assertEquals(Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG),
validationsForResource(brokerResource).head.configs().keySet().asScala)
- }
-
-}
-
-object AdminClientWithPoliciesIntegrationTest {
-
- val validations = new mutable.ListBuffer[AlterConfigPolicy.RequestMetadata]()
-
- def validationsForResource(resource: ConfigResource):
Seq[AlterConfigPolicy.RequestMetadata] = {
- validations.filter { req => req.resource().equals(resource) }.toSeq
- }
-
- class Policy extends AlterConfigPolicy {
-
- var configs: Map[String, _] = _
- var closed = false
-
- def configure(configs: util.Map[String, _]): Unit = {
- validations.clear()
- this.configs = configs.asScala.toMap
- }
-
- def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
- validations.append(requestMetadata)
- require(!closed, "Policy should not be closed")
- require(configs.nonEmpty, "configure should have been called with non
empty configs")
- require(!requestMetadata.configs.isEmpty, "request configs should not be
empty")
- require(requestMetadata.resource.name.nonEmpty, "resource name should
not be empty")
- if
(requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
- throw new PolicyViolationException("Min in sync replicas cannot be
updated")
- }
-
- def close(): Unit = closed = true
-
- }
-}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 7662eeda7a3..471692d0016 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -273,12 +273,7 @@ public interface ClusterInstance {
broker ->
broker.metadataCache().numPartitions(topic).isEmpty()),
60000L, topic + " metadata not propagated after 60000 ms");
- for (ControllerServer controller : controllers().values()) {
- long controllerOffset =
controller.raftManager().replicatedLog().endOffset().offset() - 1;
- TestUtils.waitForCondition(
- () -> brokers.stream().allMatch(broker -> ((BrokerServer)
broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),
- 60000L, "Timeout waiting for controller metadata propagating
to brokers");
- }
+ ensureConsistentMetadata(brokers, controllers().values());
TopicPartition topicPartition = new TopicPartition(topic, 0);
@@ -358,7 +353,15 @@ public interface ClusterInstance {
() -> brokers.stream().allMatch(broker ->
broker.metadataCache().numPartitions(topic).filter(p -> p ==
partitions).isPresent()),
60000L, topic + " metadata not propagated after 60000 ms");
- for (ControllerServer controller : controllers().values()) {
+ ensureConsistentMetadata(brokers, controllers().values());
+ }
+
+ default void ensureConsistentMetadata() throws InterruptedException {
+ ensureConsistentMetadata(aliveBrokers().values(),
controllers().values());
+ }
+
+ default void ensureConsistentMetadata(Collection<KafkaBroker> brokers,
Collection<ControllerServer> controllers) throws InterruptedException {
+ for (ControllerServer controller : controllers) {
long controllerOffset =
controller.raftManager().replicatedLog().endOffset().offset() - 1;
TestUtils.waitForCondition(
() -> brokers.stream().allMatch(broker -> ((BrokerServer)
broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),