This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06d42a2baa4 MINOR: Optimize `describeConfigs` queries and address
cleanups for `ConfigCommand` (#22114)
06d42a2baa4 is described below
commit 06d42a2baa41166f77c6542b6da0d08644d5c7f9
Author: Nick Guo <[email protected]>
AuthorDate: Sat May 9 14:40:32 2026 +0800
MINOR: Optimize `describeConfigs` queries and address cleanups for
`ConfigCommand` (#22114)
see discussion :
https://github.com/apache/kafka/pull/22013#pullrequestreview-4133302711
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/tools/ConfigCommand.java | 115 +++++++++++++--------
.../org/apache/kafka/tools/ConfigCommandTest.java | 70 +++++++------
2 files changed, 114 insertions(+), 71 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
index 6bd5b82fd80..2a93bcdc557 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.clients.admin.UserScramCredentialAlteration;
import org.apache.kafka.clients.admin.UserScramCredentialDeletion;
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -71,6 +72,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -93,7 +96,7 @@ import joptsimple.OptionSpec;
* An entity described or altered by the command may be one of:
* <ul>
* <li> topic: {@code --topic <topic>} OR {@code --entity-type topics
--entity-name <topic>}
- * <li> client: {@code --client <client>} OR {@code --entity-type clients
--entity-name <client-id>}
+ * <li> client: {@code --client <client-id>} OR {@code --entity-type
clients --entity-name <client-id>}
* <li> user: {@code --user <user-principal>} OR {@code --entity-type
users --entity-name <user-principal>}
* <li> {@code <user, client>}: {@code --user <user-principal> --client
<client-id>} OR
* {@code --entity-type users --entity-name
<user-principal> --entity-type clients --entity-name <client-id>}
@@ -113,7 +116,6 @@ public class ConfigCommand {
private static final Logger LOG =
LoggerFactory.getLogger(ConfigCommand.class);
private static final String BROKER_DEFAULT_ENTITY_NAME = "";
- private static final List<String> BROKER_SUPPORTED_CONFIG_TYPES;
private static final int DEFAULT_SCRAM_ITERATIONS = 4096;
private static final String TOPIC_TYPE = ConfigType.TOPIC.value();
private static final String CLIENT_METRICS_TYPE =
ConfigType.CLIENT_METRICS.value();
@@ -124,13 +126,10 @@ public class ConfigCommand {
private static final String IP_TYPE = ConfigType.IP.value();
static final String BROKER_LOGGER_CONFIG_TYPE = "broker-loggers";
- static {
- BROKER_SUPPORTED_CONFIG_TYPES = new ArrayList<>();
- BROKER_SUPPORTED_CONFIG_TYPES.add(BROKER_LOGGER_CONFIG_TYPE);
- for (ConfigType configType : ConfigType.values()) {
- BROKER_SUPPORTED_CONFIG_TYPES.add(configType.value());
- }
- }
+ private static final List<String> BROKER_SUPPORTED_CONFIG_TYPES =
Stream.concat(
+ Stream.of(BROKER_LOGGER_CONFIG_TYPE),
+ Stream.of(ConfigType.values()).map(ConfigType::value)
+ ).toList();
public static void main(String[] args) {
try {
@@ -276,7 +275,7 @@ public class ConfigCommand {
throw new InvalidConfigurationException("Invalid broker
logger(s): " + String.join(",", invalidBrokerLoggers));
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName);
- AlterConfigsOptions alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+ AlterConfigsOptions alterOptions = new
AlterConfigsOptions().timeoutMs(30000);
List<AlterConfigOp> addEntries =
configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k,
AlterConfigOp.OpType.SET)).toList();
List<AlterConfigOp> deleteEntries =
configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""),
AlterConfigOp.OpType.DELETE)).toList();
Collection<AlterConfigOp> alterEntries =
Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
@@ -444,7 +443,7 @@ public class ConfigCommand {
}
ClientQuotaEntity entity = new ClientQuotaEntity(alterEntityMap);
- AlterClientQuotasOptions alterOptions = new
AlterClientQuotasOptions().validateOnly(false);
+ AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions();
List<ClientQuotaAlteration.Op> addOps =
configsToBeAddedMap.entrySet().stream()
.map(entry -> {
@@ -530,40 +529,58 @@ public class ConfigCommand {
}
}
- List<String> entities;
+ Set<String> entities;
if (entityName.isPresent()) {
- entities = List.of(entityName.get());
+ entities = Set.of(entityName.get());
} else {
if (TOPIC_TYPE.equals(entityType)) {
- entities = new ArrayList<>(adminClient.listTopics(new
ListTopicsOptions().listInternal(true)).names().get());
+ entities = new LinkedHashSet<>(adminClient.listTopics(new
ListTopicsOptions().listInternal(true)).names().get());
} else if (BROKER_TYPE.equals(entityType) ||
BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
- List<String> brokerIds = adminClient.describeCluster(new
DescribeClusterOptions()).nodes().get().stream()
+ Set<String> brokerIds = adminClient.describeCluster(new
DescribeClusterOptions()).nodes().get().stream()
.map(Node::idString)
- .collect(Collectors.toList());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
brokerIds.add(BROKER_DEFAULT_ENTITY_NAME);
entities = brokerIds;
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
entities =
adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new
ListConfigResourcesOptions()).all().get().stream()
.map(ConfigResource::name)
- .toList();
+ .collect(Collectors.toCollection(LinkedHashSet::new));
} else if (GROUP_TYPE.equals(entityType)) {
Set<String> groupIds =
adminClient.listGroups().all().get().stream()
.map(GroupListing::groupId)
- .collect(Collectors.toSet());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
Set<String> groupResources =
listGroupConfigResources(adminClient)
.map(resources -> resources.stream()
.map(ConfigResource::name)
- .collect(Collectors.toSet()))
- .orElse(Set.of());
- Set<String> combined = new HashSet<>(groupIds);
+
.collect(Collectors.toCollection(LinkedHashSet::new)))
+ .orElseGet(LinkedHashSet::new);
+ Set<String> combined = new LinkedHashSet<>(groupIds);
combined.addAll(groupResources);
- entities = new ArrayList<>(combined);
+ entities = combined;
} else {
throw new IllegalArgumentException("Invalid entity type: " +
entityType);
}
}
+ if (entities.isEmpty()) {
+ return;
+ }
+
+ Map<String, DescribeConfigContext> contextsByEntity = new
LinkedHashMap<>();
for (String entity : entities) {
+ contextsByEntity.put(entity, describeConfigContext(entityType,
entity));
+ }
+
+ DescribeConfigsOptions describeOptions = new
DescribeConfigsOptions().includeSynonyms(true);
+ Map<ConfigResource, KafkaFuture<Config>> configs =
adminClient.describeConfigs(
+ contextsByEntity.values().stream()
+ .map(DescribeConfigContext::configResource)
+ .toList(),
+ describeOptions
+ ).values();
+
+ for (String entity : entities) {
+ DescribeConfigContext context = contextsByEntity.get(entity);
if (BROKER_DEFAULT_ENTITY_NAME.equals(entity)) {
System.out.println("Default configs for " + entityType + " in
the cluster are:");
} else {
@@ -571,7 +588,12 @@ public class ConfigCommand {
String entityTypeSingular = entityType.substring(0,
entityType.length() - 1);
System.out.println(configSourceStr + " configs for " +
entityTypeSingular + " " + entity + " are:");
}
- getResourceConfig(adminClient, entityType, entity, true,
describeAll).forEach(entry -> {
+
+ Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll
+ ? Optional.empty()
+ : Optional.of(context.dynamicConfigSource());
+ Config config = configs.get(context.configResource()).get(30,
TimeUnit.SECONDS);
+ filterAndSortEntries(config, configSourceFilter).forEach(entry -> {
String synonyms = entry.synonyms().stream()
.map(synonym -> synonym.source() + ":" +
synonym.name() + "=" + synonym.value())
.collect(Collectors.joining(", ", "{", "}"));
@@ -582,7 +604,7 @@ public class ConfigCommand {
private static void alterResourceConfig(Admin adminClient, String
entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry>
configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException,
InterruptedException, TimeoutException {
ConfigResource configResource = new ConfigResource(resourceType,
entityNameHead);
- AlterConfigsOptions alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+ AlterConfigsOptions alterOptions = new
AlterConfigsOptions().timeoutMs(30000);
List<AlterConfigOp> addEntries =
configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k,
AlterConfigOp.OpType.SET)).toList();
List<AlterConfigOp> deleteEntries = configsToBeDeleted.stream().map(k
-> new AlterConfigOp(new ConfigEntry(k, ""),
AlterConfigOp.OpType.DELETE)).toList();
Collection<AlterConfigOp> alterEntries =
Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
@@ -597,54 +619,65 @@ public class ConfigCommand {
}
}
- private static List<ConfigEntry> getResourceConfig(Admin adminClient,
String entityType, String entityName, boolean includeSynonyms, boolean
describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+ private record DescribeConfigContext(ConfigResource configResource,
ConfigEntry.ConfigSource dynamicConfigSource) {
+ }
+
+ private static DescribeConfigContext describeConfigContext(String
entityType, String entityName) {
ConfigResource.Type configResourceType;
- Optional<ConfigEntry.ConfigSource> dynamicConfigSource;
+ ConfigEntry.ConfigSource dynamicConfigSource;
if (TOPIC_TYPE.equals(entityType)) {
if (!entityName.isEmpty()) {
Topic.validate(entityName);
}
configResourceType = ConfigResource.Type.TOPIC;
- dynamicConfigSource =
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
+ dynamicConfigSource =
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;
} else if (BROKER_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.BROKER;
if (BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
- configResourceType = ConfigResource.Type.BROKER;
- dynamicConfigSource =
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
+ dynamicConfigSource =
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
} else {
validateBrokerId(entityName, entityType);
- configResourceType = ConfigResource.Type.BROKER;
- dynamicConfigSource =
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG);
+ dynamicConfigSource =
ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
}
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
if (!entityName.isEmpty()) {
validateBrokerId(entityName, entityType);
}
configResourceType = ConfigResource.Type.BROKER_LOGGER;
- dynamicConfigSource = Optional.empty();
+ dynamicConfigSource =
ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG;
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
configResourceType = ConfigResource.Type.CLIENT_METRICS;
- dynamicConfigSource =
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG);
+ dynamicConfigSource =
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG;
} else if (GROUP_TYPE.equals(entityType)) {
configResourceType = ConfigResource.Type.GROUP;
- dynamicConfigSource =
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG);
+ dynamicConfigSource =
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG;
} else {
throw new IllegalArgumentException("Invalid entity type: " +
entityType);
}
- Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll ?
Optional.empty() : dynamicConfigSource;
-
- ConfigResource configResource = new ConfigResource(configResourceType,
entityName);
- DescribeConfigsOptions describeOptions = new
DescribeConfigsOptions().includeSynonyms(includeSynonyms);
- Map<ConfigResource, Config> configs =
adminClient.describeConfigs(Collections.singleton(configResource),
describeOptions)
- .all().get(30, TimeUnit.SECONDS);
+ return new DescribeConfigContext(new
ConfigResource(configResourceType, entityName), dynamicConfigSource);
+ }
- return configs.get(configResource).entries().stream()
+ private static List<ConfigEntry> filterAndSortEntries(Config config,
Optional<ConfigEntry.ConfigSource> configSourceFilter) {
+ return config.entries().stream()
.filter(entry -> configSourceFilter.isEmpty() ||
entry.source() == configSourceFilter.get())
.sorted(Comparator.comparing(ConfigEntry::name))
.toList();
}
+ private static List<ConfigEntry> getResourceConfig(Admin adminClient,
String entityType, String entityName, boolean includeSynonyms, boolean
describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+ DescribeConfigContext context = describeConfigContext(entityType,
entityName);
+ Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll
+ ? Optional.empty()
+ : Optional.of(context.dynamicConfigSource());
+ DescribeConfigsOptions describeOptions = new
DescribeConfigsOptions().includeSynonyms(includeSynonyms);
+ Map<ConfigResource, Config> configs =
adminClient.describeConfigs(Collections.singleton(context.configResource()),
describeOptions)
+ .all().get(30, TimeUnit.SECONDS);
+
+ return filterAndSortEntries(configs.get(context.configResource()),
configSourceFilter);
+ }
+
private static void describeQuotaConfigs(Admin adminClient, List<String>
entityTypes, List<String> entityNames) throws ExecutionException,
InterruptedException, TimeoutException {
Map<ClientQuotaEntity, Map<String, Double>> quotaConfigs =
getAllClientQuotasConfigs(adminClient, entityTypes, entityNames);
quotaConfigs.forEach((entity, entries) -> {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 29092e0c304..7e7787a824b 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -78,7 +78,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -846,6 +845,10 @@ public class ConfigCommandTest {
return ConfigTest.newConfigEntry(name, value,
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, List.of());
}
+ private ConfigEntry newBrokerLoggerConfigEntry(String name, String value) {
+ return ConfigTest.newConfigEntry(name, value,
ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, false, false, List.of());
+ }
+
@Test
public void shouldDescribeConfigSynonyms() throws Exception {
String resourceName = "my-topic";
@@ -856,10 +859,10 @@ public class ConfigCommandTest {
"--all"));
ConfigResource resource = new
ConfigResource(ConfigResource.Type.TOPIC, resourceName);
- KafkaFutureImpl<Map<ConfigResource, Config>> future = new
KafkaFutureImpl<>();
- future.complete(Map.of(resource, new Config(List.of())));
+ KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+ future.complete(new Config(List.of()));
DescribeConfigsResult describeResult =
mock(DescribeConfigsResult.class);
- when(describeResult.all()).thenReturn(future);
+ when(describeResult.values()).thenReturn(Map.of(resource, future));
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
@@ -871,16 +874,16 @@ public class ConfigCommandTest {
}
};
ConfigCommand.describeConfig(mockAdminClient, describeOpts);
- verify(describeResult).all();
+ verify(describeResult).values();
}
@Test
public void shouldAddBrokerLoggerConfig() throws Exception {
Node node = new Node(1, "localhost", 9092);
verifyAlterBrokerLoggerConfig(node, "1", "1", List.of(
- new ConfigEntry("kafka.log.LogCleaner", "INFO"),
- new ConfigEntry("kafka.server.ReplicaManager", "INFO"),
- new ConfigEntry("kafka.server.KafkaApi", "INFO")
+ newBrokerLoggerConfigEntry("kafka.log.LogCleaner", "INFO"),
+ newBrokerLoggerConfigEntry("kafka.server.ReplicaManager", "INFO"),
+ newBrokerLoggerConfigEntry("kafka.server.KafkaApi", "INFO")
));
}
@@ -944,9 +947,17 @@ public class ConfigCommandTest {
Node node = new Node(1, "localhost", 9092);
// verifyAlterBrokerLoggerConfig tries to alter kafka.log.LogCleaner,
kafka.server.ReplicaManager and kafka.server.KafkaApi
// yet, we make it so DescribeConfigs returns only one logger,
implying that kafka.server.ReplicaManager and kafka.log.LogCleaner are invalid
- assertThrows(InvalidConfigurationException.class, () ->
verifyAlterBrokerLoggerConfig(node, "1", "1", List.of(
- new ConfigEntry("kafka.server.KafkaApi", "INFO")
- )));
+ InvalidConfigurationException exception = assertThrows(
+ InvalidConfigurationException.class,
+ () -> verifyAlterBrokerLoggerConfig(node, "1", "1", List.of(
+ newBrokerLoggerConfigEntry("kafka.server.KafkaApi",
"INFO")
+ ))
+ );
+ assertEquals(
+ "Invalid broker logger(s):
kafka.server.ReplicaManager,kafka.log.LogCleaner",
+ exception.getMessage()
+ );
+
}
@Test
@@ -1005,30 +1016,29 @@ public class ConfigCommandTest {
String brokerDefaultEntityName = "";
ConfigResource resourceCustom = new
ConfigResource(ConfigResource.Type.BROKER, "1");
ConfigResource resourceDefault = new
ConfigResource(ConfigResource.Type.BROKER, brokerDefaultEntityName);
- KafkaFutureImpl<Map<ConfigResource, Config>> future = new
KafkaFutureImpl<>();
Config emptyConfig = new Config(List.of());
- Map<ConfigResource, Config> resultMap = new HashMap<>();
- resultMap.put(resourceCustom, emptyConfig);
- resultMap.put(resourceDefault, emptyConfig);
- future.complete(resultMap);
+ KafkaFutureImpl<Config> customFuture = new KafkaFutureImpl<>();
+ customFuture.complete(emptyConfig);
+ KafkaFutureImpl<Config> defaultFuture = new KafkaFutureImpl<>();
+ defaultFuture.complete(emptyConfig);
DescribeConfigsResult describeResult =
mock(DescribeConfigsResult.class);
// make sure it will be called 2 times: (1) for broker "1" (2) for
default broker ""
- when(describeResult.all()).thenReturn(future);
+ when(describeResult.values()).thenReturn(Map.of(
+ resourceCustom, customFuture,
+ resourceDefault, defaultFuture
+ ));
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
@Override
public synchronized DescribeConfigsResult
describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions
options) {
assertTrue(options.includeSynonyms(), "Synonyms not
requested");
- ConfigResource resource = resources.iterator().next();
- assertEquals(ConfigResource.Type.BROKER, resource.type());
- assertTrue(Objects.equals(resourceCustom.name(),
resource.name()) || Objects.equals(resourceDefault.name(), resource.name()));
- assertEquals(1, resources.size());
+ assertEquals(Set.of(resourceCustom, resourceDefault), new
HashSet<>(resources));
return describeResult;
}
};
ConfigCommand.describeConfig(mockAdminClient, describeOpts);
- verify(describeResult, times(2)).all();
+ verify(describeResult).values();
}
private void verifyAlterBrokerLoggerConfig(Node node, String resourceName,
String entityName,
@@ -1205,9 +1215,10 @@ public class ConfigCommandTest {
ConfigResource resourceCustom = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1");
ConfigEntry configEntry = new ConfigEntry("metrics", "*");
- KafkaFutureImpl<Map<ConfigResource, Config>> future = new
KafkaFutureImpl<>();
+ KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+ future.complete(new Config(List.of(configEntry)));
DescribeConfigsResult describeResult =
mock(DescribeConfigsResult.class);
- when(describeResult.all()).thenReturn(future);
+ when(describeResult.values()).thenReturn(Map.of(resourceCustom,
future));
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
@@ -1218,14 +1229,13 @@ public class ConfigCommandTest {
ConfigResource resource = resources.iterator().next();
assertEquals(ConfigResource.Type.CLIENT_METRICS,
resource.type());
assertEquals(resourceCustom.name(), resource.name());
- future.complete(Map.of(resourceCustom, new
Config(List.of(configEntry))));
return describeResult;
}
};
mockAdminClient.incrementalAlterConfigs(Map.of(resourceCustom,
List.of(new AlterConfigOp(configEntry,
AlterConfigOp.OpType.SET))), new AlterConfigsOptions());
ConfigCommand.describeConfig(mockAdminClient, describeOpts);
- verify(describeResult).all();
+ verify(describeResult).values();
}
@Test
@@ -1298,9 +1308,10 @@ public class ConfigCommandTest {
private void verifyDescribeGroupConfig(ConfigCommand.ConfigCommandOptions
describeOpts, String resourceName) throws Exception {
ConfigResource resourceCustom = new
ConfigResource(ConfigResource.Type.GROUP, resourceName);
ConfigEntry configEntry = new
ConfigEntry("consumer.heartbeat.interval.ms", "6000");
- KafkaFutureImpl<Map<ConfigResource, Config>> future = new
KafkaFutureImpl<>();
+ KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+ future.complete(new Config(List.of(configEntry)));
DescribeConfigsResult describeResult =
mock(DescribeConfigsResult.class);
- when(describeResult.all()).thenReturn(future);
+ when(describeResult.values()).thenReturn(Map.of(resourceCustom,
future));
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
@@ -1311,14 +1322,13 @@ public class ConfigCommandTest {
ConfigResource resource = resources.iterator().next();
assertEquals(ConfigResource.Type.GROUP, resource.type());
assertEquals(resourceCustom.name(), resource.name());
- future.complete(Map.of(resourceCustom, new
Config(List.of(configEntry))));
return describeResult;
}
};
mockAdminClient.incrementalAlterConfigs(Map.of(resourceCustom,
List.of(new AlterConfigOp(configEntry,
AlterConfigOp.OpType.SET))), new AlterConfigsOptions());
ConfigCommand.describeConfig(mockAdminClient, describeOpts);
- verify(describeResult).all();
+ verify(describeResult).values();
}
@Test