This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06d42a2baa4 MINOR: Optimize `describeConfigs` queries and address 
cleanups for `ConfigCommand` (#22114)
06d42a2baa4 is described below

commit 06d42a2baa41166f77c6542b6da0d08644d5c7f9
Author: Nick Guo <[email protected]>
AuthorDate: Sat May 9 14:40:32 2026 +0800

    MINOR: Optimize `describeConfigs` queries and address cleanups for 
`ConfigCommand` (#22114)
    
    see discussion :
    https://github.com/apache/kafka/pull/22013#pullrequestreview-4133302711
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../java/org/apache/kafka/tools/ConfigCommand.java | 115 +++++++++++++--------
 .../org/apache/kafka/tools/ConfigCommandTest.java  |  70 +++++++------
 2 files changed, 114 insertions(+), 71 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
index 6bd5b82fd80..2a93bcdc557 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.clients.admin.UserScramCredentialAlteration;
 import org.apache.kafka.clients.admin.UserScramCredentialDeletion;
 import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
 import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -71,6 +72,8 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -93,7 +96,7 @@ import joptsimple.OptionSpec;
  * An entity described or altered by the command may be one of:
  * <ul>
  *     <li> topic: {@code --topic <topic>} OR {@code --entity-type topics 
--entity-name <topic>}
- *     <li> client: {@code --client <client>} OR {@code --entity-type clients 
--entity-name <client-id>}
+ *     <li> client: {@code --client <client-id>} OR {@code --entity-type 
clients --entity-name <client-id>}
  *     <li> user: {@code --user <user-principal>} OR {@code --entity-type 
users --entity-name <user-principal>}
  *     <li> {@code <user, client>}: {@code --user <user-principal> --client 
<client-id>} OR
  *                          {@code --entity-type users --entity-name 
<user-principal> --entity-type clients --entity-name <client-id>}
@@ -113,7 +116,6 @@ public class ConfigCommand {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConfigCommand.class);
 
     private static final String BROKER_DEFAULT_ENTITY_NAME = "";
-    private static final List<String> BROKER_SUPPORTED_CONFIG_TYPES;
     private static final int DEFAULT_SCRAM_ITERATIONS = 4096;
     private static final String TOPIC_TYPE = ConfigType.TOPIC.value();
     private static final String CLIENT_METRICS_TYPE = 
ConfigType.CLIENT_METRICS.value();
@@ -124,13 +126,10 @@ public class ConfigCommand {
     private static final String IP_TYPE = ConfigType.IP.value();
 
     static final String BROKER_LOGGER_CONFIG_TYPE = "broker-loggers";
-    static {
-        BROKER_SUPPORTED_CONFIG_TYPES = new ArrayList<>();
-        BROKER_SUPPORTED_CONFIG_TYPES.add(BROKER_LOGGER_CONFIG_TYPE);
-        for (ConfigType configType : ConfigType.values()) {
-            BROKER_SUPPORTED_CONFIG_TYPES.add(configType.value());
-        }
-    }
+    private static final List<String> BROKER_SUPPORTED_CONFIG_TYPES = 
Stream.concat(
+            Stream.of(BROKER_LOGGER_CONFIG_TYPE),
+            Stream.of(ConfigType.values()).map(ConfigType::value)
+    ).toList();
 
     public static void main(String[] args) {
         try {
@@ -276,7 +275,7 @@ public class ConfigCommand {
                 throw new InvalidConfigurationException("Invalid broker 
logger(s): " + String.join(",", invalidBrokerLoggers));
 
             ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName);
-            AlterConfigsOptions alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+            AlterConfigsOptions alterOptions = new 
AlterConfigsOptions().timeoutMs(30000);
             List<AlterConfigOp> addEntries = 
configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, 
AlterConfigOp.OpType.SET)).toList();
             List<AlterConfigOp> deleteEntries = 
configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), 
AlterConfigOp.OpType.DELETE)).toList();
             Collection<AlterConfigOp> alterEntries = 
Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
@@ -444,7 +443,7 @@ public class ConfigCommand {
         }
         ClientQuotaEntity entity = new ClientQuotaEntity(alterEntityMap);
 
-        AlterClientQuotasOptions alterOptions = new 
AlterClientQuotasOptions().validateOnly(false);
+        AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions();
 
         List<ClientQuotaAlteration.Op> addOps = 
configsToBeAddedMap.entrySet().stream()
                 .map(entry -> {
@@ -530,40 +529,58 @@ public class ConfigCommand {
             }
         }
 
-        List<String> entities;
+        Set<String> entities;
         if (entityName.isPresent()) {
-            entities = List.of(entityName.get());
+            entities = Set.of(entityName.get());
         } else {
             if (TOPIC_TYPE.equals(entityType)) {
-                entities = new ArrayList<>(adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names().get());
+                entities = new LinkedHashSet<>(adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names().get());
             } else if (BROKER_TYPE.equals(entityType) || 
BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
-                List<String> brokerIds = adminClient.describeCluster(new 
DescribeClusterOptions()).nodes().get().stream()
+                Set<String> brokerIds = adminClient.describeCluster(new 
DescribeClusterOptions()).nodes().get().stream()
                         .map(Node::idString)
-                        .collect(Collectors.toList());
+                        .collect(Collectors.toCollection(LinkedHashSet::new));
                 brokerIds.add(BROKER_DEFAULT_ENTITY_NAME);
                 entities = brokerIds;
             } else if (CLIENT_METRICS_TYPE.equals(entityType)) {
                 entities = 
adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new 
ListConfigResourcesOptions()).all().get().stream()
                         .map(ConfigResource::name)
-                        .toList();
+                        .collect(Collectors.toCollection(LinkedHashSet::new));
             } else if (GROUP_TYPE.equals(entityType)) {
                 Set<String> groupIds = 
adminClient.listGroups().all().get().stream()
                         .map(GroupListing::groupId)
-                        .collect(Collectors.toSet());
+                        .collect(Collectors.toCollection(LinkedHashSet::new));
                 Set<String> groupResources = 
listGroupConfigResources(adminClient)
                         .map(resources -> resources.stream()
                                 .map(ConfigResource::name)
-                                .collect(Collectors.toSet()))
-                        .orElse(Set.of());
-                Set<String> combined = new HashSet<>(groupIds);
+                                
.collect(Collectors.toCollection(LinkedHashSet::new)))
+                        .orElseGet(LinkedHashSet::new);
+                Set<String> combined = new LinkedHashSet<>(groupIds);
                 combined.addAll(groupResources);
-                entities = new ArrayList<>(combined);
+                entities = combined;
             } else {
                 throw new IllegalArgumentException("Invalid entity type: " + 
entityType);
             }
         }
 
+        if (entities.isEmpty()) {
+            return;
+        }
+
+        Map<String, DescribeConfigContext> contextsByEntity = new 
LinkedHashMap<>();
         for (String entity : entities) {
+            contextsByEntity.put(entity, describeConfigContext(entityType, 
entity));
+        }
+
+        DescribeConfigsOptions describeOptions = new 
DescribeConfigsOptions().includeSynonyms(true);
+        Map<ConfigResource, KafkaFuture<Config>> configs = 
adminClient.describeConfigs(
+                contextsByEntity.values().stream()
+                        .map(DescribeConfigContext::configResource)
+                        .toList(),
+                describeOptions
+        ).values();
+
+        for (String entity : entities) {
+            DescribeConfigContext context = contextsByEntity.get(entity);
             if (BROKER_DEFAULT_ENTITY_NAME.equals(entity)) {
                 System.out.println("Default configs for " + entityType + " in 
the cluster are:");
             } else {
@@ -571,7 +588,12 @@ public class ConfigCommand {
                 String entityTypeSingular = entityType.substring(0, 
entityType.length() - 1);
                 System.out.println(configSourceStr + " configs for " + 
entityTypeSingular + " " + entity + " are:");
             }
-            getResourceConfig(adminClient, entityType, entity, true, 
describeAll).forEach(entry -> {
+
+            Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll
+                    ? Optional.empty()
+                    : Optional.of(context.dynamicConfigSource());
+            Config config = configs.get(context.configResource()).get(30, 
TimeUnit.SECONDS);
+            filterAndSortEntries(config, configSourceFilter).forEach(entry -> {
                 String synonyms = entry.synonyms().stream()
                         .map(synonym -> synonym.source() + ":" + 
synonym.name() + "=" + synonym.value())
                         .collect(Collectors.joining(", ", "{", "}"));
@@ -582,7 +604,7 @@ public class ConfigCommand {
 
     private static void alterResourceConfig(Admin adminClient, String 
entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> 
configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, 
InterruptedException, TimeoutException {
         ConfigResource configResource = new ConfigResource(resourceType, 
entityNameHead);
-        AlterConfigsOptions alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+        AlterConfigsOptions alterOptions = new 
AlterConfigsOptions().timeoutMs(30000);
         List<AlterConfigOp> addEntries = 
configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, 
AlterConfigOp.OpType.SET)).toList();
         List<AlterConfigOp> deleteEntries = configsToBeDeleted.stream().map(k 
-> new AlterConfigOp(new ConfigEntry(k, ""), 
AlterConfigOp.OpType.DELETE)).toList();
         Collection<AlterConfigOp> alterEntries = 
Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
@@ -597,54 +619,65 @@ public class ConfigCommand {
         }
     }
 
-    private static List<ConfigEntry> getResourceConfig(Admin adminClient, 
String entityType, String entityName, boolean includeSynonyms, boolean 
describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+    private record DescribeConfigContext(ConfigResource configResource, 
ConfigEntry.ConfigSource dynamicConfigSource) {
+    }
+
+    private static DescribeConfigContext describeConfigContext(String 
entityType, String entityName) {
         ConfigResource.Type configResourceType;
-        Optional<ConfigEntry.ConfigSource> dynamicConfigSource;
+        ConfigEntry.ConfigSource dynamicConfigSource;
 
         if (TOPIC_TYPE.equals(entityType)) {
             if (!entityName.isEmpty()) {
                 Topic.validate(entityName);
             }
             configResourceType = ConfigResource.Type.TOPIC;
-            dynamicConfigSource = 
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
+            dynamicConfigSource = 
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;
         } else if (BROKER_TYPE.equals(entityType)) {
+            configResourceType = ConfigResource.Type.BROKER;
             if (BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
-                configResourceType = ConfigResource.Type.BROKER;
-                dynamicConfigSource = 
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
+                dynamicConfigSource = 
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
             } else {
                 validateBrokerId(entityName, entityType);
-                configResourceType = ConfigResource.Type.BROKER;
-                dynamicConfigSource = 
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG);
+                dynamicConfigSource = 
ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
             }
         } else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
             if (!entityName.isEmpty()) {
                 validateBrokerId(entityName, entityType);
             }
             configResourceType = ConfigResource.Type.BROKER_LOGGER;
-            dynamicConfigSource = Optional.empty();
+            dynamicConfigSource = 
ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG;
         } else if (CLIENT_METRICS_TYPE.equals(entityType)) {
             configResourceType = ConfigResource.Type.CLIENT_METRICS;
-            dynamicConfigSource = 
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG);
+            dynamicConfigSource = 
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG;
         } else if (GROUP_TYPE.equals(entityType)) {
             configResourceType = ConfigResource.Type.GROUP;
-            dynamicConfigSource = 
Optional.of(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG);
+            dynamicConfigSource = 
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG;
         } else {
             throw new IllegalArgumentException("Invalid entity type: " + 
entityType);
         }
 
-        Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll ? 
Optional.empty() : dynamicConfigSource;
-
-        ConfigResource configResource = new ConfigResource(configResourceType, 
entityName);
-        DescribeConfigsOptions describeOptions = new 
DescribeConfigsOptions().includeSynonyms(includeSynonyms);
-        Map<ConfigResource, Config> configs = 
adminClient.describeConfigs(Collections.singleton(configResource), 
describeOptions)
-                    .all().get(30, TimeUnit.SECONDS);
+        return new DescribeConfigContext(new 
ConfigResource(configResourceType, entityName), dynamicConfigSource);
+    }
 
-        return configs.get(configResource).entries().stream()
+    private static List<ConfigEntry> filterAndSortEntries(Config config, 
Optional<ConfigEntry.ConfigSource> configSourceFilter) {
+        return config.entries().stream()
                 .filter(entry -> configSourceFilter.isEmpty() || 
entry.source() == configSourceFilter.get())
                 .sorted(Comparator.comparing(ConfigEntry::name))
                 .toList();
     }
 
+    private static List<ConfigEntry> getResourceConfig(Admin adminClient, 
String entityType, String entityName, boolean includeSynonyms, boolean 
describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+        DescribeConfigContext context = describeConfigContext(entityType, 
entityName);
+        Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll
+                ? Optional.empty()
+                : Optional.of(context.dynamicConfigSource());
+        DescribeConfigsOptions describeOptions = new 
DescribeConfigsOptions().includeSynonyms(includeSynonyms);
+        Map<ConfigResource, Config> configs = 
adminClient.describeConfigs(Collections.singleton(context.configResource()), 
describeOptions)
+                    .all().get(30, TimeUnit.SECONDS);
+
+        return filterAndSortEntries(configs.get(context.configResource()), 
configSourceFilter);
+    }
+
     private static void describeQuotaConfigs(Admin adminClient, List<String> 
entityTypes, List<String> entityNames) throws ExecutionException, 
InterruptedException, TimeoutException {
         Map<ClientQuotaEntity, Map<String, Double>> quotaConfigs = 
getAllClientQuotasConfigs(adminClient, entityTypes, entityNames);
         quotaConfigs.forEach((entity, entries) -> {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 29092e0c304..7e7787a824b 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -78,7 +78,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -846,6 +845,10 @@ public class ConfigCommandTest {
         return ConfigTest.newConfigEntry(name, value, 
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, List.of());
     }
 
+    private ConfigEntry newBrokerLoggerConfigEntry(String name, String value) {
+        return ConfigTest.newConfigEntry(name, value, 
ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, false, false, List.of());
+    }
+
     @Test
     public void shouldDescribeConfigSynonyms() throws Exception {
         String resourceName = "my-topic";
@@ -856,10 +859,10 @@ public class ConfigCommandTest {
             "--all"));
 
         ConfigResource resource = new 
ConfigResource(ConfigResource.Type.TOPIC, resourceName);
-        KafkaFutureImpl<Map<ConfigResource, Config>> future = new 
KafkaFutureImpl<>();
-        future.complete(Map.of(resource, new Config(List.of())));
+        KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+        future.complete(new Config(List.of()));
         DescribeConfigsResult describeResult = 
mock(DescribeConfigsResult.class);
-        when(describeResult.all()).thenReturn(future);
+        when(describeResult.values()).thenReturn(Map.of(resource, future));
 
         Node node = new Node(1, "localhost", 9092);
         MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
@@ -871,16 +874,16 @@ public class ConfigCommandTest {
             }
         };
         ConfigCommand.describeConfig(mockAdminClient, describeOpts);
-        verify(describeResult).all();
+        verify(describeResult).values();
     }
 
     @Test
     public void shouldAddBrokerLoggerConfig() throws Exception {
         Node node = new Node(1, "localhost", 9092);
         verifyAlterBrokerLoggerConfig(node, "1", "1", List.of(
-            new ConfigEntry("kafka.log.LogCleaner", "INFO"),
-            new ConfigEntry("kafka.server.ReplicaManager", "INFO"),
-            new ConfigEntry("kafka.server.KafkaApi", "INFO")
+            newBrokerLoggerConfigEntry("kafka.log.LogCleaner", "INFO"),
+            newBrokerLoggerConfigEntry("kafka.server.ReplicaManager", "INFO"),
+            newBrokerLoggerConfigEntry("kafka.server.KafkaApi", "INFO")
         ));
     }
 
@@ -944,9 +947,17 @@ public class ConfigCommandTest {
         Node node = new Node(1, "localhost", 9092);
         // verifyAlterBrokerLoggerConfig tries to alter kafka.log.LogCleaner, 
kafka.server.ReplicaManager and kafka.server.KafkaApi
         // yet, we make it so DescribeConfigs returns only one logger, 
implying that kafka.server.ReplicaManager and kafka.log.LogCleaner are invalid
-        assertThrows(InvalidConfigurationException.class, () -> 
verifyAlterBrokerLoggerConfig(node, "1", "1", List.of(
-            new ConfigEntry("kafka.server.KafkaApi", "INFO")
-        )));
+        InvalidConfigurationException exception = assertThrows(
+                InvalidConfigurationException.class,
+                () -> verifyAlterBrokerLoggerConfig(node, "1", "1", List.of(
+                        newBrokerLoggerConfigEntry("kafka.server.KafkaApi", 
"INFO")
+                ))
+        );
+        assertEquals(
+                "Invalid broker logger(s): 
kafka.server.ReplicaManager,kafka.log.LogCleaner",
+                exception.getMessage()
+        );
+
     }
 
     @Test
@@ -1005,30 +1016,29 @@ public class ConfigCommandTest {
         String brokerDefaultEntityName = "";
         ConfigResource resourceCustom = new 
ConfigResource(ConfigResource.Type.BROKER, "1");
         ConfigResource resourceDefault = new 
ConfigResource(ConfigResource.Type.BROKER, brokerDefaultEntityName);
-        KafkaFutureImpl<Map<ConfigResource, Config>> future = new 
KafkaFutureImpl<>();
         Config emptyConfig = new Config(List.of());
-        Map<ConfigResource, Config> resultMap = new HashMap<>();
-        resultMap.put(resourceCustom, emptyConfig);
-        resultMap.put(resourceDefault, emptyConfig);
-        future.complete(resultMap);
+        KafkaFutureImpl<Config> customFuture = new KafkaFutureImpl<>();
+        customFuture.complete(emptyConfig);
+        KafkaFutureImpl<Config> defaultFuture = new KafkaFutureImpl<>();
+        defaultFuture.complete(emptyConfig);
         DescribeConfigsResult describeResult = 
mock(DescribeConfigsResult.class);
         // make sure it will be called 2 times: (1) for broker "1" (2) for 
default broker ""
-        when(describeResult.all()).thenReturn(future);
+        when(describeResult.values()).thenReturn(Map.of(
+            resourceCustom, customFuture,
+            resourceDefault, defaultFuture
+        ));
 
         Node node = new Node(1, "localhost", 9092);
         MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
             @Override
             public synchronized DescribeConfigsResult 
describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions 
options) {
                 assertTrue(options.includeSynonyms(), "Synonyms not 
requested");
-                ConfigResource resource = resources.iterator().next();
-                assertEquals(ConfigResource.Type.BROKER, resource.type());
-                assertTrue(Objects.equals(resourceCustom.name(), 
resource.name()) || Objects.equals(resourceDefault.name(), resource.name()));
-                assertEquals(1, resources.size());
+                assertEquals(Set.of(resourceCustom, resourceDefault), new 
HashSet<>(resources));
                 return describeResult;
             }
         };
         ConfigCommand.describeConfig(mockAdminClient, describeOpts);
-        verify(describeResult, times(2)).all();
+        verify(describeResult).values();
     }
 
     private void verifyAlterBrokerLoggerConfig(Node node, String resourceName, 
String entityName,
@@ -1205,9 +1215,10 @@ public class ConfigCommandTest {
 
         ConfigResource resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1");
         ConfigEntry configEntry = new ConfigEntry("metrics", "*");
-        KafkaFutureImpl<Map<ConfigResource, Config>> future = new 
KafkaFutureImpl<>();
+        KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+        future.complete(new Config(List.of(configEntry)));
         DescribeConfigsResult describeResult = 
mock(DescribeConfigsResult.class);
-        when(describeResult.all()).thenReturn(future);
+        when(describeResult.values()).thenReturn(Map.of(resourceCustom, 
future));
 
         Node node = new Node(1, "localhost", 9092);
         MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
@@ -1218,14 +1229,13 @@ public class ConfigCommandTest {
                 ConfigResource resource = resources.iterator().next();
                 assertEquals(ConfigResource.Type.CLIENT_METRICS, 
resource.type());
                 assertEquals(resourceCustom.name(), resource.name());
-                future.complete(Map.of(resourceCustom, new 
Config(List.of(configEntry))));
                 return describeResult;
             }
         };
         mockAdminClient.incrementalAlterConfigs(Map.of(resourceCustom,
             List.of(new AlterConfigOp(configEntry, 
AlterConfigOp.OpType.SET))), new AlterConfigsOptions());
         ConfigCommand.describeConfig(mockAdminClient, describeOpts);
-        verify(describeResult).all();
+        verify(describeResult).values();
     }
 
     @Test
@@ -1298,9 +1308,10 @@ public class ConfigCommandTest {
     private void verifyDescribeGroupConfig(ConfigCommand.ConfigCommandOptions 
describeOpts, String resourceName) throws Exception {
         ConfigResource resourceCustom = new 
ConfigResource(ConfigResource.Type.GROUP, resourceName);
         ConfigEntry configEntry = new 
ConfigEntry("consumer.heartbeat.interval.ms", "6000");
-        KafkaFutureImpl<Map<ConfigResource, Config>> future = new 
KafkaFutureImpl<>();
+        KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+        future.complete(new Config(List.of(configEntry)));
         DescribeConfigsResult describeResult = 
mock(DescribeConfigsResult.class);
-        when(describeResult.all()).thenReturn(future);
+        when(describeResult.values()).thenReturn(Map.of(resourceCustom, 
future));
 
         Node node = new Node(1, "localhost", 9092);
         MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), 
node) {
@@ -1311,14 +1322,13 @@ public class ConfigCommandTest {
                 ConfigResource resource = resources.iterator().next();
                 assertEquals(ConfigResource.Type.GROUP, resource.type());
                 assertEquals(resourceCustom.name(), resource.name());
-                future.complete(Map.of(resourceCustom, new 
Config(List.of(configEntry))));
                 return describeResult;
             }
         };
         mockAdminClient.incrementalAlterConfigs(Map.of(resourceCustom,
             List.of(new AlterConfigOp(configEntry, 
AlterConfigOp.OpType.SET))), new AlterConfigsOptions());
         ConfigCommand.describeConfig(mockAdminClient, describeOpts);
-        verify(describeResult).all();
+        verify(describeResult).values();
     }
 
     @Test

Reply via email to