This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 27e132bbb6c [feat] PIP-468: Filter scalable topic listing by multiple
properties (AND) (#25639)
27e132bbb6c is described below
commit 27e132bbb6c900e4d492a66c91036eb182349cf7
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 06:15:06 2026 -0700
[feat] PIP-468: Filter scalable topic listing by multiple properties (AND)
(#25639)
---
.../broker/resources/ScalableTopicResources.java | 63 ++++++++++++++------
.../pulsar/broker/admin/v2/ScalableTopics.java | 43 ++++++++++----
.../admin/ScalableTopicsListByPropertyTest.java | 31 ++++++----
.../resources/ScalableTopicPropertyIndexTest.java | 67 +++++++++++++++++-----
.../apache/pulsar/client/admin/ScalableTopics.java | 26 ++++-----
.../client/admin/internal/ScalableTopicsImpl.java | 20 ++++---
.../apache/pulsar/admin/cli/CmdScalableTopics.java | 22 +++----
7 files changed, 184 insertions(+), 88 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 263abe50439..3e015808476 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -118,32 +118,57 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
}
/**
- * List scalable topics in a namespace whose {@code properties} map
contains the given
- * key/value pair. On stores with native secondary index support (Oxia)
this is served
- * by the index registered at create/update time; otherwise it falls back
to a children
- * scan + per-record property check.
+ * List scalable topics in a namespace whose {@code properties} map
contains every
+ * key/value pair in {@code propertyFilters} (AND semantics).
*
- * @param ns the namespace to scope the query to
- * @param propertyKey property name to filter on
- * @param propertyValue exact property value to match
- * @return fully qualified scalable topic names matching the property
+ * <p>Stores with native secondary-index support (Oxia) serve the
most-restrictive
+ * lookup via the index for one of the filters, then a record-level check
rejects
+ * anything that doesn't satisfy the rest. Stores without native index
support fall
+ * through to a children scan + the same predicate. An empty {@code
propertyFilters}
+ * map degenerates to {@link #listScalableTopicsAsync}.
+ *
+ * @param ns the namespace to scope the query to
+ * @param propertyFilters property name/value pairs that all must match
(AND)
+ * @return fully qualified scalable topic names matching every filter
*/
- public CompletableFuture<List<String>> findScalableTopicsByPropertyAsync(
- NamespaceName ns, String propertyKey, String propertyValue) {
+ public CompletableFuture<List<String>> findScalableTopicsByPropertiesAsync(
+ NamespaceName ns, Map<String, String> propertyFilters) {
+ if (propertyFilters == null || propertyFilters.isEmpty()) {
+ return listScalableTopicsAsync(ns);
+ }
String scanPathPrefix = joinPath(SCALABLE_TOPIC_PATH, ns.toString());
ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
- return getStore().findByIndex(scanPathPrefix, propertyKey,
propertyValue, result -> {
- // Fallback path (no native index): re-check the property
on the loaded record.
- try {
- ScalableTopicMetadata md =
- mapper.readValue(result.getValue(),
ScalableTopicMetadata.class);
- return md.getProperties() != null
- &&
propertyValue.equals(md.getProperties().get(propertyKey));
- } catch (IOException e) {
+
+ // Pick any single filter to drive the index lookup (native stores
will use it
+ // to narrow the candidate set; iteration order is acceptable since we
don't
+ // know index cardinalities up front). The predicate then enforces AND
across
+ // every filter on the loaded record.
+ Map.Entry<String, String> indexFilter =
propertyFilters.entrySet().iterator().next();
+ java.util.function.Predicate<org.apache.pulsar.metadata.api.GetResult>
matchesAll = result -> {
+ try {
+ ScalableTopicMetadata md =
+ mapper.readValue(result.getValue(),
ScalableTopicMetadata.class);
+ Map<String, String> props = md.getProperties();
+ if (props == null) {
+ return false;
+ }
+ for (Map.Entry<String, String> e : propertyFilters.entrySet())
{
+ if (!e.getValue().equals(props.get(e.getKey()))) {
return false;
}
- })
+ }
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ };
+ return getStore().findByIndex(scanPathPrefix,
+ indexFilter.getKey(), indexFilter.getValue(),
matchesAll)
+ // Native-index implementations don't apply the fallback
predicate, so
+ // re-check here. On the fallback path this is a no-op
(predicate already
+ // applied) but cheap.
.thenApply(results -> results.stream()
+ .filter(matchesAll)
.map(r -> {
String path = r.getStat().getPath();
String encoded =
path.substring(path.lastIndexOf('/') + 1);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 1994fd8de34..01b2320f99a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -94,18 +95,16 @@ public class ScalableTopics extends AdminResource {
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
- @ApiParam(value = "Filter by topic property name (must be paired
with propertyValue)")
- @QueryParam("propertyKey") String propertyKey,
- @ApiParam(value = "Filter by topic property value (must be paired
with propertyKey)")
- @QueryParam("propertyValue") String propertyValue) {
+ @ApiParam(value = "Filter to topics whose properties contain every
key=value pair."
+ + " Each repetition of the parameter adds one filter (AND
semantics).")
+ @QueryParam("property") List<String> properties) {
validateNamespaceName(tenant, namespace);
- boolean filterByProperty = propertyKey != null &&
!propertyKey.isEmpty()
- && propertyValue != null;
+ Map<String, String> propertyFilters = parseKeyValuePairs(properties);
validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPICS)
- .thenCompose(__ -> filterByProperty
- ? resources().findScalableTopicsByPropertyAsync(
- namespaceName, propertyKey, propertyValue)
- : resources().listScalableTopicsAsync(namespaceName))
+ .thenCompose(__ -> propertyFilters.isEmpty()
+ ? resources().listScalableTopicsAsync(namespaceName)
+ : resources().findScalableTopicsByPropertiesAsync(
+ namespaceName, propertyFilters))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error().attr("clientAppId",
clientAppId()).attr("namespace", namespaceName)
@@ -115,6 +114,30 @@ public class ScalableTopics extends AdminResource {
});
}
+ /**
+ * Parse {@code key=value} entries from a list of query parameter values
into a map.
+ * Accepts {@code null} / empty input. Rejects malformed entries (no
{@code =}, empty
+ * key, or empty value) with a 412.
+ */
+ private static Map<String, String> parseKeyValuePairs(List<String>
entries) {
+ if (entries == null || entries.isEmpty()) {
+ return Map.of();
+ }
+ Map<String, String> result = new
java.util.LinkedHashMap<>(entries.size());
+ for (String entry : entries) {
+ if (entry == null || entry.isEmpty()) {
+ continue;
+ }
+ int eq = entry.indexOf('=');
+ if (eq <= 0 || eq == entry.length() - 1) {
+ throw new RestException(Response.Status.fromStatusCode(412),
+ "property filter must be in the form key=value, got: "
+ entry);
+ }
+ result.put(entry.substring(0, eq), entry.substring(eq + 1));
+ }
+ return result;
+ }
+
// --- Create ---
@PUT
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
index f775fe3f2df..515c1992c51 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
@@ -51,28 +51,37 @@ public class ScalableTopicsListByPropertyTest extends
SharedPulsarBaseTest {
String bobTopic = topicName("bob");
String carolTopic = topicName("carol");
- // Each topic gets a different owner; alice and bob share a team. The
filter
- // should be able to surface either subset on demand.
+ // alice and bob share team=platform; alice and carol share
owner=alice. We
+ // can hit each consumer-driven slice via different filter
combinations below.
admin.scalableTopics().createScalableTopic(aliceTopic, 1,
Map.of("owner", "alice", "team", "platform"));
admin.scalableTopics().createScalableTopic(bobTopic, 1,
Map.of("owner", "bob", "team", "platform"));
admin.scalableTopics().createScalableTopic(carolTopic, 1,
- Map.of("owner", "carol", "team", "data"));
+ Map.of("owner", "alice", "team", "data"));
- // Filter by owner=alice — single match.
- List<String> alice = admin.scalableTopics()
- .listScalableTopicsByProperty(namespace(), "owner", "alice");
- assertEquals(alice, List.of(aliceTopic));
+ // Single-property filter: owner=bob — single match.
+ List<String> bob = admin.scalableTopics()
+ .listScalableTopicsByProperties(namespace(), Map.of("owner",
"bob"));
+ assertEquals(bob, List.of(bobTopic));
- // Filter by team=platform — alice + bob.
+ // Single-property filter: team=platform — alice + bob.
Set<String> platform = new HashSet<>(admin.scalableTopics()
- .listScalableTopicsByProperty(namespace(), "team",
"platform"));
+ .listScalableTopicsByProperties(namespace(), Map.of("team",
"platform")));
assertEquals(platform, Set.of(aliceTopic, bobTopic));
- // Unmatched value — empty result.
+ // Multi-property AND filter: owner=alice AND team=platform — narrows
to
+ // exactly aliceTopic, even though carol also has owner=alice and bob
also
+ // has team=platform.
+ List<String> aliceOnPlatform = admin.scalableTopics()
+ .listScalableTopicsByProperties(namespace(),
+ Map.of("owner", "alice", "team", "platform"));
+ assertEquals(aliceOnPlatform, List.of(aliceTopic));
+
+ // Unmatched combination — empty result.
assertTrue(admin.scalableTopics()
- .listScalableTopicsByProperty(namespace(), "owner",
"nonexistent")
+ .listScalableTopicsByProperties(namespace(),
+ Map.of("owner", "alice", "team", "ops"))
.isEmpty());
// Sanity-check: the un-filtered listing still returns every topic in
the namespace.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
index f4cd6d1f648..77b7fa9ca18 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
@@ -35,10 +35,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
- * Coverage for the {@code findScalableTopicsByPropertyAsync} entry point on
+ * Coverage for the {@code findScalableTopicsByPropertiesAsync} entry point on
* {@link ScalableTopicResources}: verifies that topic properties registered at
* create/update time are queryable via the secondary-index API, that updates
- * refresh the index, and that the filter is correctly scoped to a namespace.
+ * refresh the index, that the filter is correctly scoped to a namespace, and
+ * that multi-property filters AND the conditions together.
*
* <p>Uses {@link LocalMemoryMetadataStore} which does not implement native
* secondary indexes, so this exercises the fallback scan + per-record property
@@ -87,19 +88,57 @@ public class ScalableTopicPropertyIndexTest {
resources.createScalableTopicAsync(topicIn(ns, "t-carol"),
metaWithProps(Map.of("owner", "carol", "team", "data"))).get();
- // Filter by owner=alice — only t-alice matches.
+ // Single-property filter: owner=alice — only t-alice matches.
List<String> aliceOwned = resources
- .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+ .findScalableTopicsByPropertiesAsync(ns, Map.of("owner",
"alice")).get();
assertEquals(aliceOwned, List.of("topic://tenant/ns-a/t-alice"));
- // Filter by team=platform — both alice and bob.
+ // Single-property filter: team=platform — both alice and bob.
Set<String> platform = new HashSet<>(resources
- .findScalableTopicsByPropertyAsync(ns, "team",
"platform").get());
+ .findScalableTopicsByPropertiesAsync(ns, Map.of("team",
"platform")).get());
assertEquals(platform, Set.of(
"topic://tenant/ns-a/t-alice",
"topic://tenant/ns-a/t-bob"));
}
+ @Test
+ public void andsMultiplePropertyFilters() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-and");
+
+ // alice/platform and bob/platform share team; alice/data and
alice/platform
+ // share owner. The AND of (team=platform, owner=alice) must narrow to
the
+ // single record that satisfies both.
+ resources.createScalableTopicAsync(topicIn(ns, "t-1"),
+ metaWithProps(Map.of("owner", "alice", "team",
"platform"))).get();
+ resources.createScalableTopicAsync(topicIn(ns, "t-2"),
+ metaWithProps(Map.of("owner", "alice", "team", "data"))).get();
+ resources.createScalableTopicAsync(topicIn(ns, "t-3"),
+ metaWithProps(Map.of("owner", "bob", "team",
"platform"))).get();
+
+ List<String> match = resources.findScalableTopicsByPropertiesAsync(ns,
+ Map.of("owner", "alice", "team", "platform")).get();
+ assertEquals(match, List.of("topic://tenant/ns-and/t-1"));
+
+ // No record satisfies both — empty result.
+ assertTrue(resources.findScalableTopicsByPropertiesAsync(ns,
+ Map.of("owner", "alice", "team", "ops")).get().isEmpty());
+ }
+
+ @Test
+ public void emptyFilterReturnsAllTopicsInNamespace() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-empty-filter");
+ resources.createScalableTopicAsync(topicIn(ns, "t-1"),
+ metaWithProps(Map.of("owner", "alice"))).get();
+ resources.createScalableTopicAsync(topicIn(ns, "t-2"),
+ metaWithProps(Map.of("owner", "bob"))).get();
+
+ Set<String> all = new HashSet<>(resources
+ .findScalableTopicsByPropertiesAsync(ns, Map.of()).get());
+ assertEquals(all, Set.of(
+ "topic://tenant/ns-empty-filter/t-1",
+ "topic://tenant/ns-empty-filter/t-2"));
+ }
+
@Test
public void findIsScopedToNamespace() throws Exception {
NamespaceName nsA = NamespaceName.get("tenant/ns-a");
@@ -112,11 +151,11 @@ public class ScalableTopicPropertyIndexTest {
metaWithProps(Map.of("owner", "alice"))).get();
List<String> inNsA = resources
- .findScalableTopicsByPropertyAsync(nsA, "owner",
"alice").get();
+ .findScalableTopicsByPropertiesAsync(nsA, Map.of("owner",
"alice")).get();
assertEquals(inNsA, List.of("topic://tenant/ns-a/t1"));
List<String> inNsB = resources
- .findScalableTopicsByPropertyAsync(nsB, "owner",
"alice").get();
+ .findScalableTopicsByPropertiesAsync(nsB, Map.of("owner",
"alice")).get();
assertEquals(inNsB, List.of("topic://tenant/ns-b/t2"));
}
@@ -127,11 +166,11 @@ public class ScalableTopicPropertyIndexTest {
metaWithProps(Map.of("owner", "alice"))).get();
// Wrong value
- assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"bob")
+ assertTrue(resources.findScalableTopicsByPropertiesAsync(ns,
Map.of("owner", "bob"))
.get().isEmpty());
// Wrong key
- assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "team",
"alice")
+ assertTrue(resources.findScalableTopicsByPropertiesAsync(ns,
Map.of("team", "alice"))
.get().isEmpty());
}
@@ -142,7 +181,7 @@ public class ScalableTopicPropertyIndexTest {
resources.createScalableTopicAsync(tn,
metaWithProps(Map.of("owner", "alice"))).get();
- assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"alice").get(),
+ assertEquals(resources.findScalableTopicsByPropertiesAsync(ns,
Map.of("owner", "alice")).get(),
List.of(tn.toString()));
// Reassign owner via update — the new owner must be queryable, and the
@@ -152,9 +191,9 @@ public class ScalableTopicPropertyIndexTest {
return current;
}).get();
- assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"bob").get(),
+ assertEquals(resources.findScalableTopicsByPropertiesAsync(ns,
Map.of("owner", "bob")).get(),
List.of(tn.toString()));
- assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"alice")
+ assertTrue(resources.findScalableTopicsByPropertiesAsync(ns,
Map.of("owner", "alice"))
.get().isEmpty());
}
@@ -168,7 +207,7 @@ public class ScalableTopicPropertyIndexTest {
// Filtering by any property must skip the un-tagged record.
List<String> matches = resources
- .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+ .findScalableTopicsByPropertiesAsync(ns, Map.of("owner",
"alice")).get();
assertEquals(matches, List.of("topic://tenant/ns-noprops/t-tagged"));
}
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index 74f8a223399..d04da28e35f 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -51,26 +51,26 @@ public interface ScalableTopics {
/**
* Get the list of scalable topics under a namespace whose properties
contain
- * the given key/value pair.
+ * every key/value pair in {@code propertyFilters} (AND semantics).
*
- * <p>Backed by a secondary index registered on the topic property at
create/update
- * time, so the lookup is efficient and does not scan every topic in the
namespace.
+ * <p>Backed by the secondary index registered on the topic properties at
+ * create/update time. On stores with native index support the lookup uses
one
+ * filter to narrow the candidate set and verifies the rest on the loaded
record;
+ * stores without index support fall back to a per-record check.
*
- * @param namespace Namespace name in the format "tenant/namespace"
- * @param propertyKey Property name to filter on
- * @param propertyValue Exact property value to match
- * @return list of matching scalable topic names
+ * @param namespace Namespace name in the format "tenant/namespace"
+ * @param propertyFilters Property names and exact values that all must
match
+ * @return list of matching scalable topic names; an empty filter returns
the full
+ * namespace listing
*/
- List<String> listScalableTopicsByProperty(String namespace, String
propertyKey, String propertyValue)
+ List<String> listScalableTopicsByProperties(String namespace, Map<String,
String> propertyFilters)
throws PulsarAdminException;
/**
- * Get the list of scalable topics under a namespace whose properties
contain
- * the given key/value pair, asynchronously.
+ * Async variant of {@link #listScalableTopicsByProperties(String, Map)}.
*/
- CompletableFuture<List<String>> listScalableTopicsByPropertyAsync(String
namespace,
- String
propertyKey,
- String
propertyValue);
+ CompletableFuture<List<String>> listScalableTopicsByPropertiesAsync(String
namespace,
+
Map<String, String> propertyFilters);
/**
* Create a new scalable topic.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index cda78328166..950ae667dca 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -65,19 +65,23 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
}
@Override
- public List<String> listScalableTopicsByProperty(String namespace, String
propertyKey, String propertyValue)
+ public List<String> listScalableTopicsByProperties(String namespace,
Map<String, String> propertyFilters)
throws PulsarAdminException {
- return sync(() -> listScalableTopicsByPropertyAsync(namespace,
propertyKey, propertyValue));
+ return sync(() -> listScalableTopicsByPropertiesAsync(namespace,
propertyFilters));
}
@Override
- public CompletableFuture<List<String>>
listScalableTopicsByPropertyAsync(String namespace,
-
String propertyKey,
-
String propertyValue) {
+ public CompletableFuture<List<String>>
listScalableTopicsByPropertiesAsync(String namespace,
+
Map<String, String> propertyFilters) {
NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns)
- .queryParam("propertyKey", propertyKey)
- .queryParam("propertyValue", propertyValue);
+ WebTarget path = namespacePath(ns);
+ if (propertyFilters != null) {
+ // Each filter becomes a repeated `?property=k=v` query parameter;
the broker
+ // collects them into a list and AND-merges before issuing the
index lookup.
+ for (Map.Entry<String, String> e : propertyFilters.entrySet()) {
+ path = path.queryParam("property", e.getKey() + "=" +
e.getValue());
+ }
+ }
return asyncGetRequest(path, new GenericType<List<String>>() {});
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index 8991632a694..c4a308cbdce 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -35,30 +35,26 @@ public class CmdScalableTopics extends CmdBase {
}
@Command(description = "Get the list of scalable topics under a namespace,
optionally"
- + " filtered to those whose properties contain a given key/value
pair")
+ + " filtered to those whose properties contain every given
key=value pair")
private class ListCmd extends CliCommand {
@Parameters(description = "tenant/namespace", arity = "1")
private String namespace;
@Option(names = {"-p", "--property"},
- description = "Filter to topics whose properties contain this
key=value pair")
- private String property;
+ description = "Filter to topics whose properties contain this
key=value pair."
+ + " Repeat to AND multiple filters together.",
+ arity = "0..*")
+ private List<String> properties;
@Override
void run() throws Exception {
String ns = validateNamespace(namespace);
- if (property == null || property.isEmpty()) {
+ Map<String, String> filters = parseListKeyValueMap(properties);
+ if (filters == null || filters.isEmpty()) {
print(scalableTopics().listScalableTopics(ns));
- return;
- }
- int eq = property.indexOf('=');
- if (eq <= 0 || eq == property.length() - 1) {
- throw new IllegalArgumentException(
- "--property must be in the form key=value, got: " +
property);
+ } else {
+ print(scalableTopics().listScalableTopicsByProperties(ns,
filters));
}
- String key = property.substring(0, eq);
- String value = property.substring(eq + 1);
- print(scalableTopics().listScalableTopicsByProperty(ns, key,
value));
}
}