This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 27e132bbb6c [feat] PIP-468: Filter scalable topic listing by multiple 
properties (AND) (#25639)
27e132bbb6c is described below

commit 27e132bbb6c900e4d492a66c91036eb182349cf7
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 06:15:06 2026 -0700

    [feat] PIP-468: Filter scalable topic listing by multiple properties (AND) 
(#25639)
---
 .../broker/resources/ScalableTopicResources.java   | 63 ++++++++++++++------
 .../pulsar/broker/admin/v2/ScalableTopics.java     | 43 ++++++++++----
 .../admin/ScalableTopicsListByPropertyTest.java    | 31 ++++++----
 .../resources/ScalableTopicPropertyIndexTest.java  | 67 +++++++++++++++++-----
 .../apache/pulsar/client/admin/ScalableTopics.java | 26 ++++-----
 .../client/admin/internal/ScalableTopicsImpl.java  | 20 ++++---
 .../apache/pulsar/admin/cli/CmdScalableTopics.java | 22 +++----
 7 files changed, 184 insertions(+), 88 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 263abe50439..3e015808476 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -118,32 +118,57 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     }
 
     /**
-     * List scalable topics in a namespace whose {@code properties} map 
contains the given
-     * key/value pair. On stores with native secondary index support (Oxia) 
this is served
-     * by the index registered at create/update time; otherwise it falls back 
to a children
-     * scan + per-record property check.
+     * List scalable topics in a namespace whose {@code properties} map 
contains every
+     * key/value pair in {@code propertyFilters} (AND semantics).
      *
-     * @param ns            the namespace to scope the query to
-     * @param propertyKey   property name to filter on
-     * @param propertyValue exact property value to match
-     * @return fully qualified scalable topic names matching the property
+     * <p>Stores with native secondary-index support (Oxia) serve the 
most-restrictive
+     * lookup via the index for one of the filters, then a record-level check 
rejects
+     * anything that doesn't satisfy the rest. Stores without native index 
support fall
+     * through to a children scan + the same predicate. An empty {@code 
propertyFilters}
+     * map degenerates to {@link #listScalableTopicsAsync}.
+     *
+     * @param ns              the namespace to scope the query to
+     * @param propertyFilters property name/value pairs that all must match 
(AND)
+     * @return fully qualified scalable topic names matching every filter
      */
-    public CompletableFuture<List<String>> findScalableTopicsByPropertyAsync(
-            NamespaceName ns, String propertyKey, String propertyValue) {
+    public CompletableFuture<List<String>> findScalableTopicsByPropertiesAsync(
+            NamespaceName ns, Map<String, String> propertyFilters) {
+        if (propertyFilters == null || propertyFilters.isEmpty()) {
+            return listScalableTopicsAsync(ns);
+        }
         String scanPathPrefix = joinPath(SCALABLE_TOPIC_PATH, ns.toString());
         ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
-        return getStore().findByIndex(scanPathPrefix, propertyKey, 
propertyValue, result -> {
-                    // Fallback path (no native index): re-check the property 
on the loaded record.
-                    try {
-                        ScalableTopicMetadata md =
-                                mapper.readValue(result.getValue(), 
ScalableTopicMetadata.class);
-                        return md.getProperties() != null
-                                && 
propertyValue.equals(md.getProperties().get(propertyKey));
-                    } catch (IOException e) {
+
+        // Pick any single filter to drive the index lookup (native stores 
will use it
+        // to narrow the candidate set; iteration order is acceptable since we 
don't
+        // know index cardinalities up front). The predicate then enforces AND 
across
+        // every filter on the loaded record.
+        Map.Entry<String, String> indexFilter = 
propertyFilters.entrySet().iterator().next();
+        java.util.function.Predicate<org.apache.pulsar.metadata.api.GetResult> 
matchesAll = result -> {
+            try {
+                ScalableTopicMetadata md =
+                        mapper.readValue(result.getValue(), 
ScalableTopicMetadata.class);
+                Map<String, String> props = md.getProperties();
+                if (props == null) {
+                    return false;
+                }
+                for (Map.Entry<String, String> e : propertyFilters.entrySet()) 
{
+                    if (!e.getValue().equals(props.get(e.getKey()))) {
                         return false;
                     }
-                })
+                }
+                return true;
+            } catch (IOException e) {
+                return false;
+            }
+        };
+        return getStore().findByIndex(scanPathPrefix,
+                        indexFilter.getKey(), indexFilter.getValue(), 
matchesAll)
+                // Native-index implementations don't apply the fallback 
predicate, so
+                // re-check here. On the fallback path this is a no-op 
(predicate already
+                // applied) but cheap.
                 .thenApply(results -> results.stream()
+                        .filter(matchesAll)
                         .map(r -> {
                             String path = r.getStat().getPath();
                             String encoded = 
path.substring(path.lastIndexOf('/') + 1);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 1994fd8de34..01b2320f99a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
@@ -94,18 +95,16 @@ public class ScalableTopics extends AdminResource {
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace,
-            @ApiParam(value = "Filter by topic property name (must be paired 
with propertyValue)")
-            @QueryParam("propertyKey") String propertyKey,
-            @ApiParam(value = "Filter by topic property value (must be paired 
with propertyKey)")
-            @QueryParam("propertyValue") String propertyValue) {
+            @ApiParam(value = "Filter to topics whose properties contain every 
key=value pair."
+                    + " Each repetition of the parameter adds one filter (AND 
semantics).")
+            @QueryParam("property") List<String> properties) {
         validateNamespaceName(tenant, namespace);
-        boolean filterByProperty = propertyKey != null && 
!propertyKey.isEmpty()
-                && propertyValue != null;
+        Map<String, String> propertyFilters = parseKeyValuePairs(properties);
         validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.GET_TOPICS)
-                .thenCompose(__ -> filterByProperty
-                        ? resources().findScalableTopicsByPropertyAsync(
-                                namespaceName, propertyKey, propertyValue)
-                        : resources().listScalableTopicsAsync(namespaceName))
+                .thenCompose(__ -> propertyFilters.isEmpty()
+                        ? resources().listScalableTopicsAsync(namespaceName)
+                        : resources().findScalableTopicsByPropertiesAsync(
+                                namespaceName, propertyFilters))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error().attr("clientAppId", 
clientAppId()).attr("namespace", namespaceName)
@@ -115,6 +114,30 @@ public class ScalableTopics extends AdminResource {
                 });
     }
 
+    /**
+     * Parse {@code key=value} entries from a list of query parameter values 
into a map.
+     * Accepts {@code null} / empty input. Rejects malformed entries (no 
{@code =}, empty
+     * key, or empty value) with a 412.
+     */
+    private static Map<String, String> parseKeyValuePairs(List<String> 
entries) {
+        if (entries == null || entries.isEmpty()) {
+            return Map.of();
+        }
+        Map<String, String> result = new 
java.util.LinkedHashMap<>(entries.size());
+        for (String entry : entries) {
+            if (entry == null || entry.isEmpty()) {
+                continue;
+            }
+            int eq = entry.indexOf('=');
+            if (eq <= 0 || eq == entry.length() - 1) {
+                throw new RestException(Response.Status.fromStatusCode(412),
+                        "property filter must be in the form key=value, got: " 
+ entry);
+            }
+            result.put(entry.substring(0, eq), entry.substring(eq + 1));
+        }
+        return result;
+    }
+
     // --- Create ---
 
     @PUT
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
index f775fe3f2df..515c1992c51 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
@@ -51,28 +51,37 @@ public class ScalableTopicsListByPropertyTest extends 
SharedPulsarBaseTest {
         String bobTopic = topicName("bob");
         String carolTopic = topicName("carol");
 
-        // Each topic gets a different owner; alice and bob share a team. The 
filter
-        // should be able to surface either subset on demand.
+        // alice and bob share team=platform; alice and carol share 
owner=alice. We
+        // can hit each consumer-driven slice via different filter 
combinations below.
         admin.scalableTopics().createScalableTopic(aliceTopic, 1,
                 Map.of("owner", "alice", "team", "platform"));
         admin.scalableTopics().createScalableTopic(bobTopic, 1,
                 Map.of("owner", "bob", "team", "platform"));
         admin.scalableTopics().createScalableTopic(carolTopic, 1,
-                Map.of("owner", "carol", "team", "data"));
+                Map.of("owner", "alice", "team", "data"));
 
-        // Filter by owner=alice — single match.
-        List<String> alice = admin.scalableTopics()
-                .listScalableTopicsByProperty(namespace(), "owner", "alice");
-        assertEquals(alice, List.of(aliceTopic));
+        // Single-property filter: owner=bob — single match.
+        List<String> bob = admin.scalableTopics()
+                .listScalableTopicsByProperties(namespace(), Map.of("owner", 
"bob"));
+        assertEquals(bob, List.of(bobTopic));
 
-        // Filter by team=platform — alice + bob.
+        // Single-property filter: team=platform — alice + bob.
         Set<String> platform = new HashSet<>(admin.scalableTopics()
-                .listScalableTopicsByProperty(namespace(), "team", 
"platform"));
+                .listScalableTopicsByProperties(namespace(), Map.of("team", 
"platform")));
         assertEquals(platform, Set.of(aliceTopic, bobTopic));
 
-        // Unmatched value — empty result.
+        // Multi-property AND filter: owner=alice AND team=platform — narrows 
to
+        // exactly aliceTopic, even though carol also has owner=alice and bob 
also
+        // has team=platform.
+        List<String> aliceOnPlatform = admin.scalableTopics()
+                .listScalableTopicsByProperties(namespace(),
+                        Map.of("owner", "alice", "team", "platform"));
+        assertEquals(aliceOnPlatform, List.of(aliceTopic));
+
+        // Unmatched combination — empty result.
         assertTrue(admin.scalableTopics()
-                .listScalableTopicsByProperty(namespace(), "owner", 
"nonexistent")
+                .listScalableTopicsByProperties(namespace(),
+                        Map.of("owner", "alice", "team", "ops"))
                 .isEmpty());
 
         // Sanity-check: the un-filtered listing still returns every topic in 
the namespace.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
index f4cd6d1f648..77b7fa9ca18 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
@@ -35,10 +35,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
- * Coverage for the {@code findScalableTopicsByPropertyAsync} entry point on
+ * Coverage for the {@code findScalableTopicsByPropertiesAsync} entry point on
  * {@link ScalableTopicResources}: verifies that topic properties registered at
  * create/update time are queryable via the secondary-index API, that updates
- * refresh the index, and that the filter is correctly scoped to a namespace.
+ * refresh the index, that the filter is correctly scoped to a namespace, and
+ * that multi-property filters AND the conditions together.
  *
  * <p>Uses {@link LocalMemoryMetadataStore} which does not implement native
  * secondary indexes, so this exercises the fallback scan + per-record property
@@ -87,19 +88,57 @@ public class ScalableTopicPropertyIndexTest {
         resources.createScalableTopicAsync(topicIn(ns, "t-carol"),
                 metaWithProps(Map.of("owner", "carol", "team", "data"))).get();
 
-        // Filter by owner=alice — only t-alice matches.
+        // Single-property filter: owner=alice — only t-alice matches.
         List<String> aliceOwned = resources
-                .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+                .findScalableTopicsByPropertiesAsync(ns, Map.of("owner", 
"alice")).get();
         assertEquals(aliceOwned, List.of("topic://tenant/ns-a/t-alice"));
 
-        // Filter by team=platform — both alice and bob.
+        // Single-property filter: team=platform — both alice and bob.
         Set<String> platform = new HashSet<>(resources
-                .findScalableTopicsByPropertyAsync(ns, "team", 
"platform").get());
+                .findScalableTopicsByPropertiesAsync(ns, Map.of("team", 
"platform")).get());
         assertEquals(platform, Set.of(
                 "topic://tenant/ns-a/t-alice",
                 "topic://tenant/ns-a/t-bob"));
     }
 
+    @Test
+    public void andsMultiplePropertyFilters() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-and");
+
+        // alice/platform and bob/platform share team; alice/data and 
alice/platform
+        // share owner. The AND of (team=platform, owner=alice) must narrow to 
the
+        // single record that satisfies both.
+        resources.createScalableTopicAsync(topicIn(ns, "t-1"),
+                metaWithProps(Map.of("owner", "alice", "team", 
"platform"))).get();
+        resources.createScalableTopicAsync(topicIn(ns, "t-2"),
+                metaWithProps(Map.of("owner", "alice", "team", "data"))).get();
+        resources.createScalableTopicAsync(topicIn(ns, "t-3"),
+                metaWithProps(Map.of("owner", "bob", "team", 
"platform"))).get();
+
+        List<String> match = resources.findScalableTopicsByPropertiesAsync(ns,
+                Map.of("owner", "alice", "team", "platform")).get();
+        assertEquals(match, List.of("topic://tenant/ns-and/t-1"));
+
+        // No record satisfies both — empty result.
+        assertTrue(resources.findScalableTopicsByPropertiesAsync(ns,
+                Map.of("owner", "alice", "team", "ops")).get().isEmpty());
+    }
+
+    @Test
+    public void emptyFilterReturnsAllTopicsInNamespace() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-empty-filter");
+        resources.createScalableTopicAsync(topicIn(ns, "t-1"),
+                metaWithProps(Map.of("owner", "alice"))).get();
+        resources.createScalableTopicAsync(topicIn(ns, "t-2"),
+                metaWithProps(Map.of("owner", "bob"))).get();
+
+        Set<String> all = new HashSet<>(resources
+                .findScalableTopicsByPropertiesAsync(ns, Map.of()).get());
+        assertEquals(all, Set.of(
+                "topic://tenant/ns-empty-filter/t-1",
+                "topic://tenant/ns-empty-filter/t-2"));
+    }
+
     @Test
     public void findIsScopedToNamespace() throws Exception {
         NamespaceName nsA = NamespaceName.get("tenant/ns-a");
@@ -112,11 +151,11 @@ public class ScalableTopicPropertyIndexTest {
                 metaWithProps(Map.of("owner", "alice"))).get();
 
         List<String> inNsA = resources
-                .findScalableTopicsByPropertyAsync(nsA, "owner", 
"alice").get();
+                .findScalableTopicsByPropertiesAsync(nsA, Map.of("owner", 
"alice")).get();
         assertEquals(inNsA, List.of("topic://tenant/ns-a/t1"));
 
         List<String> inNsB = resources
-                .findScalableTopicsByPropertyAsync(nsB, "owner", 
"alice").get();
+                .findScalableTopicsByPropertiesAsync(nsB, Map.of("owner", 
"alice")).get();
         assertEquals(inNsB, List.of("topic://tenant/ns-b/t2"));
     }
 
@@ -127,11 +166,11 @@ public class ScalableTopicPropertyIndexTest {
                 metaWithProps(Map.of("owner", "alice"))).get();
 
         // Wrong value
-        assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"bob")
+        assertTrue(resources.findScalableTopicsByPropertiesAsync(ns, 
Map.of("owner", "bob"))
                 .get().isEmpty());
 
         // Wrong key
-        assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "team", 
"alice")
+        assertTrue(resources.findScalableTopicsByPropertiesAsync(ns, 
Map.of("team", "alice"))
                 .get().isEmpty());
     }
 
@@ -142,7 +181,7 @@ public class ScalableTopicPropertyIndexTest {
 
         resources.createScalableTopicAsync(tn,
                 metaWithProps(Map.of("owner", "alice"))).get();
-        assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"alice").get(),
+        assertEquals(resources.findScalableTopicsByPropertiesAsync(ns, 
Map.of("owner", "alice")).get(),
                 List.of(tn.toString()));
 
         // Reassign owner via update — the new owner must be queryable, and the
@@ -152,9 +191,9 @@ public class ScalableTopicPropertyIndexTest {
             return current;
         }).get();
 
-        assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"bob").get(),
+        assertEquals(resources.findScalableTopicsByPropertiesAsync(ns, 
Map.of("owner", "bob")).get(),
                 List.of(tn.toString()));
-        assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"alice")
+        assertTrue(resources.findScalableTopicsByPropertiesAsync(ns, 
Map.of("owner", "alice"))
                 .get().isEmpty());
     }
 
@@ -168,7 +207,7 @@ public class ScalableTopicPropertyIndexTest {
 
         // Filtering by any property must skip the un-tagged record.
         List<String> matches = resources
-                .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+                .findScalableTopicsByPropertiesAsync(ns, Map.of("owner", 
"alice")).get();
         assertEquals(matches, List.of("topic://tenant/ns-noprops/t-tagged"));
     }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index 74f8a223399..d04da28e35f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -51,26 +51,26 @@ public interface ScalableTopics {
 
     /**
      * Get the list of scalable topics under a namespace whose properties 
contain
-     * the given key/value pair.
+     * every key/value pair in {@code propertyFilters} (AND semantics).
      *
-     * <p>Backed by a secondary index registered on the topic property at 
create/update
-     * time, so the lookup is efficient and does not scan every topic in the 
namespace.
+     * <p>Backed by the secondary index registered on the topic properties at
+     * create/update time. On stores with native index support the lookup uses 
one
+     * filter to narrow the candidate set and verifies the rest on the loaded 
record;
+     * stores without index support fall back to a per-record check.
      *
-     * @param namespace     Namespace name in the format "tenant/namespace"
-     * @param propertyKey   Property name to filter on
-     * @param propertyValue Exact property value to match
-     * @return list of matching scalable topic names
+     * @param namespace       Namespace name in the format "tenant/namespace"
+     * @param propertyFilters Property names and exact values that all must 
match
+     * @return list of matching scalable topic names; an empty filter returns 
the full
+     *         namespace listing
      */
-    List<String> listScalableTopicsByProperty(String namespace, String 
propertyKey, String propertyValue)
+    List<String> listScalableTopicsByProperties(String namespace, Map<String, 
String> propertyFilters)
             throws PulsarAdminException;
 
     /**
-     * Get the list of scalable topics under a namespace whose properties 
contain
-     * the given key/value pair, asynchronously.
+     * Async variant of {@link #listScalableTopicsByProperties(String, Map)}.
      */
-    CompletableFuture<List<String>> listScalableTopicsByPropertyAsync(String 
namespace,
-                                                                      String 
propertyKey,
-                                                                      String 
propertyValue);
+    CompletableFuture<List<String>> listScalableTopicsByPropertiesAsync(String 
namespace,
+                                                                         
Map<String, String> propertyFilters);
 
     /**
      * Create a new scalable topic.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index cda78328166..950ae667dca 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -65,19 +65,23 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
     }
 
     @Override
-    public List<String> listScalableTopicsByProperty(String namespace, String 
propertyKey, String propertyValue)
+    public List<String> listScalableTopicsByProperties(String namespace, 
Map<String, String> propertyFilters)
             throws PulsarAdminException {
-        return sync(() -> listScalableTopicsByPropertyAsync(namespace, 
propertyKey, propertyValue));
+        return sync(() -> listScalableTopicsByPropertiesAsync(namespace, 
propertyFilters));
     }
 
     @Override
-    public CompletableFuture<List<String>> 
listScalableTopicsByPropertyAsync(String namespace,
-                                                                              
String propertyKey,
-                                                                              
String propertyValue) {
+    public CompletableFuture<List<String>> 
listScalableTopicsByPropertiesAsync(String namespace,
+                                                                               
 Map<String, String> propertyFilters) {
         NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns)
-                .queryParam("propertyKey", propertyKey)
-                .queryParam("propertyValue", propertyValue);
+        WebTarget path = namespacePath(ns);
+        if (propertyFilters != null) {
+            // Each filter becomes a repeated `?property=k=v` query parameter; 
the broker
+            // collects them into a list and AND-merges before issuing the 
index lookup.
+            for (Map.Entry<String, String> e : propertyFilters.entrySet()) {
+                path = path.queryParam("property", e.getKey() + "=" + 
e.getValue());
+            }
+        }
         return asyncGetRequest(path, new GenericType<List<String>>() {});
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index 8991632a694..c4a308cbdce 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -35,30 +35,26 @@ public class CmdScalableTopics extends CmdBase {
     }
 
     @Command(description = "Get the list of scalable topics under a namespace, 
optionally"
-            + " filtered to those whose properties contain a given key/value 
pair")
+            + " filtered to those whose properties contain every given 
key=value pair")
     private class ListCmd extends CliCommand {
         @Parameters(description = "tenant/namespace", arity = "1")
         private String namespace;
 
         @Option(names = {"-p", "--property"},
-                description = "Filter to topics whose properties contain this 
key=value pair")
-        private String property;
+                description = "Filter to topics whose properties contain this 
key=value pair."
+                        + " Repeat to AND multiple filters together.",
+                arity = "0..*")
+        private List<String> properties;
 
         @Override
         void run() throws Exception {
             String ns = validateNamespace(namespace);
-            if (property == null || property.isEmpty()) {
+            Map<String, String> filters = parseListKeyValueMap(properties);
+            if (filters == null || filters.isEmpty()) {
                 print(scalableTopics().listScalableTopics(ns));
-                return;
-            }
-            int eq = property.indexOf('=');
-            if (eq <= 0 || eq == property.length() - 1) {
-                throw new IllegalArgumentException(
-                        "--property must be in the form key=value, got: " + 
property);
+            } else {
+                print(scalableTopics().listScalableTopicsByProperties(ns, 
filters));
             }
-            String key = property.substring(0, eq);
-            String value = property.substring(eq + 1);
-            print(scalableTopics().listScalableTopicsByProperty(ns, key, 
value));
         }
     }
 

Reply via email to