This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4f73b437fcf [feat][broker] PIP-468: Filter scalable topics by property
via secondary index (#25632)
4f73b437fcf is described below
commit 4f73b437fcf45373c03356155bebf5dd0af53f4a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 13:51:45 2026 -0700
[feat][broker] PIP-468: Filter scalable topics by property via secondary
index (#25632)
---
.../broker/resources/ScalableTopicResources.java | 56 ++++++-
.../pulsar/broker/admin/v2/ScalableTopics.java | 13 +-
.../admin/ScalableTopicsListByPropertyTest.java | 83 ++++++++++
.../resources/ScalableTopicPropertyIndexTest.java | 174 +++++++++++++++++++++
.../apache/pulsar/client/admin/ScalableTopics.java | 23 +++
.../client/admin/internal/ScalableTopicsImpl.java | 17 ++
.../apache/pulsar/admin/cli/CmdScalableTopics.java | 21 ++-
.../apache/pulsar/metadata/api/MetadataCache.java | 40 +++++
.../metadata/cache/impl/MetadataCacheImpl.java | 40 ++++-
.../metadata/MetadataCacheSecondaryIndexTest.java | 166 ++++++++++++++++++++
10 files changed, 623 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 235f17e2d31..263abe50439 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.broker.resources;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -28,6 +31,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -54,6 +58,16 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
private static final String CONSUMERS_SEGMENT = "consumers";
+ /**
+ * Use the topic's {@code properties} map verbatim as the secondary-index
entries.
+ * Each property {@code k -> v} is registered as the index named {@code k}
with
+ * secondary key {@code v}; querying by that key/value pair via
+ * {@link MetadataStore#findByIndex} returns the record. Index names live
in a
+ * per-record-type namespace, so there's no need to disambiguate them with
a prefix.
+ */
+ private static final Function<ScalableTopicMetadata, Map<String, String>>
PROPERTY_INDEX_EXTRACTOR =
+ metadata -> metadata.getProperties() != null ?
metadata.getProperties() : Map.of();
+
private final MetadataCache<SubscriptionMetadata> subscriptionCache;
private final MetadataCache<ConsumerRegistration>
consumerRegistrationCache;
@@ -64,7 +78,7 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
}
public CompletableFuture<Void> createScalableTopicAsync(TopicName tn,
ScalableTopicMetadata metadata) {
- return createAsync(topicPath(tn), metadata);
+ return getCache().create(topicPath(tn), metadata,
PROPERTY_INDEX_EXTRACTOR);
}
public CompletableFuture<Optional<ScalableTopicMetadata>>
getScalableTopicMetadataAsync(TopicName tn) {
@@ -82,7 +96,10 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
public CompletableFuture<Void> updateScalableTopicAsync(TopicName tn,
Function<ScalableTopicMetadata,
ScalableTopicMetadata> updateFunction) {
- return setAsync(topicPath(tn), updateFunction);
+ // Refresh property indexes on every update — the modify function may
add or remove
+ // properties and the underlying store needs to see the
post-modification view.
+ return getCache().readModifyUpdate(topicPath(tn), updateFunction,
PROPERTY_INDEX_EXTRACTOR)
+ .thenApply(__ -> null);
}
public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
@@ -100,6 +117,41 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
.collect(Collectors.toList()));
}
+ /**
+ * List scalable topics in a namespace whose {@code properties} map
contains the given
+ * key/value pair. On stores with native secondary index support (Oxia)
this is served
+ * by the index registered at create/update time; otherwise it falls back
to a children
+ * scan + per-record property check.
+ *
+ * @param ns the namespace to scope the query to
+ * @param propertyKey property name to filter on
+ * @param propertyValue exact property value to match
+ * @return fully qualified scalable topic names matching the property
+ */
+ public CompletableFuture<List<String>> findScalableTopicsByPropertyAsync(
+ NamespaceName ns, String propertyKey, String propertyValue) {
+ String scanPathPrefix = joinPath(SCALABLE_TOPIC_PATH, ns.toString());
+ ObjectMapper mapper =
ObjectMapperFactory.getMapper().getObjectMapper();
+ return getStore().findByIndex(scanPathPrefix, propertyKey,
propertyValue, result -> {
+ // Fallback path (no native index): re-check the property
on the loaded record.
+ try {
+ ScalableTopicMetadata md =
+ mapper.readValue(result.getValue(),
ScalableTopicMetadata.class);
+ return md.getProperties() != null
+ &&
propertyValue.equals(md.getProperties().get(propertyKey));
+ } catch (IOException e) {
+ return false;
+ }
+ })
+ .thenApply(results -> results.stream()
+ .map(r -> {
+ String path = r.getStat().getPath();
+ String encoded =
path.substring(path.lastIndexOf('/') + 1);
+ return TopicName.get("topic", ns,
Codec.decode(encoded)).toString();
+ })
+ .collect(Collectors.toList()));
+ }
+
// --- Subscriptions ---
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 6e7aa7f6723..1994fd8de34 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -93,10 +93,19 @@ public class ScalableTopics extends AdminResource {
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
- @PathParam("namespace") String namespace) {
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Filter by topic property name (must be paired
with propertyValue)")
+ @QueryParam("propertyKey") String propertyKey,
+ @ApiParam(value = "Filter by topic property value (must be paired
with propertyKey)")
+ @QueryParam("propertyValue") String propertyValue) {
validateNamespaceName(tenant, namespace);
+ boolean filterByProperty = propertyKey != null &&
!propertyKey.isEmpty()
+ && propertyValue != null;
validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPICS)
- .thenCompose(__ ->
resources().listScalableTopicsAsync(namespaceName))
+ .thenCompose(__ -> filterByProperty
+ ? resources().findScalableTopicsByPropertyAsync(
+ namespaceName, propertyKey, propertyValue)
+ : resources().listScalableTopicsAsync(namespaceName))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error().attr("clientAppId",
clientAppId()).attr("namespace", namespaceName)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
new file mode 100644
index 00000000000..f775fe3f2df
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the property-filtered list endpoint
+ * ({@code GET
/admin/v2/scalable/{tenant}/{namespace}?propertyKey&propertyValue}).
+ * Drives the full HTTP path through the {@link
org.apache.pulsar.client.admin.PulsarAdmin}
+ * client against a real shared broker, verifying that topics created with
+ * properties through the admin API are queryable via the secondary index.
+ */
+public class ScalableTopicsListByPropertyTest extends SharedPulsarBaseTest {
+
+ private String namespace() {
+ return getNamespace();
+ }
+
+ private String topicName(String suffix) {
+ return "topic://" + namespace() + "/" + suffix + "-" +
UUID.randomUUID().toString().substring(0, 8);
+ }
+
+ @Test
+ public void listScalableTopicsFilteredByProperty() throws Exception {
+ String aliceTopic = topicName("alice");
+ String bobTopic = topicName("bob");
+ String carolTopic = topicName("carol");
+
+ // Each topic gets a different owner; alice and bob share a team. The
filter
+ // should be able to surface either subset on demand.
+ admin.scalableTopics().createScalableTopic(aliceTopic, 1,
+ Map.of("owner", "alice", "team", "platform"));
+ admin.scalableTopics().createScalableTopic(bobTopic, 1,
+ Map.of("owner", "bob", "team", "platform"));
+ admin.scalableTopics().createScalableTopic(carolTopic, 1,
+ Map.of("owner", "carol", "team", "data"));
+
+ // Filter by owner=alice — single match.
+ List<String> alice = admin.scalableTopics()
+ .listScalableTopicsByProperty(namespace(), "owner", "alice");
+ assertEquals(alice, List.of(aliceTopic));
+
+ // Filter by team=platform — alice + bob.
+ Set<String> platform = new HashSet<>(admin.scalableTopics()
+ .listScalableTopicsByProperty(namespace(), "team",
"platform"));
+ assertEquals(platform, Set.of(aliceTopic, bobTopic));
+
+ // Unmatched value — empty result.
+ assertTrue(admin.scalableTopics()
+ .listScalableTopicsByProperty(namespace(), "owner",
"nonexistent")
+ .isEmpty());
+
+ // Sanity-check: the un-filtered listing still returns every topic in
the namespace.
+ Set<String> all = new
HashSet<>(admin.scalableTopics().listScalableTopics(namespace()));
+ assertTrue(all.containsAll(Set.of(aliceTopic, bobTopic, carolTopic)),
+ "expected all three created topics to appear in the unfiltered
list, got " + all);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
new file mode 100644
index 00000000000..f4cd6d1f648
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@code findScalableTopicsByPropertyAsync} entry point on
+ * {@link ScalableTopicResources}: verifies that topic properties registered at
+ * create/update time are queryable via the secondary-index API, that updates
+ * refresh the index, and that the filter is correctly scoped to a namespace.
+ *
+ * <p>Uses {@link LocalMemoryMetadataStore} which does not implement native
+ * secondary indexes, so this exercises the fallback scan + per-record property
+ * predicate path. The Oxia-native path (where the index is consulted directly)
+ * is covered by the metadata-store-level secondary index tests.
+ */
+public class ScalableTopicPropertyIndexTest {
+
+ private MetadataStoreExtended store;
+ private ScalableTopicResources resources;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = new LocalMemoryMetadataStore("memory:local",
+ MetadataStoreConfig.builder().build());
+ resources = new ScalableTopicResources(store, 30);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ private TopicName topicIn(NamespaceName ns, String localName) {
+ return TopicName.get("topic://" + ns + "/" + localName);
+ }
+
+ private ScalableTopicMetadata metaWithProps(Map<String, String> props) {
+ return ScalableTopicMetadata.builder()
+ .epoch(0)
+ .nextSegmentId(1)
+ .properties(new HashMap<>(props))
+ .build();
+ }
+
+ @Test
+ public void findsTopicByExactPropertyValue() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-a");
+
+ resources.createScalableTopicAsync(topicIn(ns, "t-alice"),
+ metaWithProps(Map.of("owner", "alice", "team",
"platform"))).get();
+ resources.createScalableTopicAsync(topicIn(ns, "t-bob"),
+ metaWithProps(Map.of("owner", "bob", "team",
"platform"))).get();
+ resources.createScalableTopicAsync(topicIn(ns, "t-carol"),
+ metaWithProps(Map.of("owner", "carol", "team", "data"))).get();
+
+ // Filter by owner=alice — only t-alice matches.
+ List<String> aliceOwned = resources
+ .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+ assertEquals(aliceOwned, List.of("topic://tenant/ns-a/t-alice"));
+
+ // Filter by team=platform — both alice and bob.
+ Set<String> platform = new HashSet<>(resources
+ .findScalableTopicsByPropertyAsync(ns, "team",
"platform").get());
+ assertEquals(platform, Set.of(
+ "topic://tenant/ns-a/t-alice",
+ "topic://tenant/ns-a/t-bob"));
+ }
+
+ @Test
+ public void findIsScopedToNamespace() throws Exception {
+ NamespaceName nsA = NamespaceName.get("tenant/ns-a");
+ NamespaceName nsB = NamespaceName.get("tenant/ns-b");
+
+ // Same property in two namespaces — find must return only the one we
asked for.
+ resources.createScalableTopicAsync(topicIn(nsA, "t1"),
+ metaWithProps(Map.of("owner", "alice"))).get();
+ resources.createScalableTopicAsync(topicIn(nsB, "t2"),
+ metaWithProps(Map.of("owner", "alice"))).get();
+
+ List<String> inNsA = resources
+ .findScalableTopicsByPropertyAsync(nsA, "owner",
"alice").get();
+ assertEquals(inNsA, List.of("topic://tenant/ns-a/t1"));
+
+ List<String> inNsB = resources
+ .findScalableTopicsByPropertyAsync(nsB, "owner",
"alice").get();
+ assertEquals(inNsB, List.of("topic://tenant/ns-b/t2"));
+ }
+
+ @Test
+ public void noMatchReturnsEmptyList() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-empty");
+ resources.createScalableTopicAsync(topicIn(ns, "t1"),
+ metaWithProps(Map.of("owner", "alice"))).get();
+
+ // Wrong value
+ assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"bob")
+ .get().isEmpty());
+
+ // Wrong key
+ assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "team",
"alice")
+ .get().isEmpty());
+ }
+
+ @Test
+ public void updateRefreshesIndex() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-update");
+ TopicName tn = topicIn(ns, "t-mutating");
+
+ resources.createScalableTopicAsync(tn,
+ metaWithProps(Map.of("owner", "alice"))).get();
+ assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"alice").get(),
+ List.of(tn.toString()));
+
+ // Reassign owner via update — the new owner must be queryable, and the
+ // old owner's entry must no longer match this topic.
+ resources.updateScalableTopicAsync(tn, current -> {
+ current.getProperties().put("owner", "bob");
+ return current;
+ }).get();
+
+ assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"bob").get(),
+ List.of(tn.toString()));
+ assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner",
"alice")
+ .get().isEmpty());
+ }
+
+ @Test
+ public void topicWithoutPropertiesIsNotMatched() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-noprops");
+ resources.createScalableTopicAsync(topicIn(ns, "t-anon"),
+ metaWithProps(Map.of())).get();
+ resources.createScalableTopicAsync(topicIn(ns, "t-tagged"),
+ metaWithProps(Map.of("owner", "alice"))).get();
+
+ // Filtering by any property must skip the un-tagged record.
+ List<String> matches = resources
+ .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+ assertEquals(matches, List.of("topic://tenant/ns-noprops/t-tagged"));
+ }
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index 7db81e0cf7b..74f8a223399 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -49,6 +49,29 @@ public interface ScalableTopics {
*/
CompletableFuture<List<String>> listScalableTopicsAsync(String namespace);
+ /**
+ * Get the list of scalable topics under a namespace whose properties
contain
+ * the given key/value pair.
+ *
+ * <p>Backed by a secondary index registered on the topic property at
create/update
+ * time, so the lookup is efficient and does not scan every topic in the
namespace.
+ *
+ * @param namespace Namespace name in the format "tenant/namespace"
+ * @param propertyKey Property name to filter on
+ * @param propertyValue Exact property value to match
+ * @return list of matching scalable topic names
+ */
+ List<String> listScalableTopicsByProperty(String namespace, String
propertyKey, String propertyValue)
+ throws PulsarAdminException;
+
+ /**
+ * Get the list of scalable topics under a namespace whose properties
contain
+ * the given key/value pair, asynchronously.
+ */
+ CompletableFuture<List<String>> listScalableTopicsByPropertyAsync(String
namespace,
+ String
propertyKey,
+ String
propertyValue);
+
/**
* Create a new scalable topic.
*
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index a027b946b72..cda78328166 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -64,6 +64,23 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
return asyncGetRequest(namespacePath(ns), new
GenericType<List<String>>() {});
}
+ @Override
+ public List<String> listScalableTopicsByProperty(String namespace, String
propertyKey, String propertyValue)
+ throws PulsarAdminException {
+ return sync(() -> listScalableTopicsByPropertyAsync(namespace,
propertyKey, propertyValue));
+ }
+
+ @Override
+ public CompletableFuture<List<String>>
listScalableTopicsByPropertyAsync(String namespace,
+
String propertyKey,
+
String propertyValue) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns)
+ .queryParam("propertyKey", propertyKey)
+ .queryParam("propertyValue", propertyValue);
+ return asyncGetRequest(path, new GenericType<List<String>>() {});
+ }
+
// --- Create ---
@Override
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index 7badc943783..8991632a694 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -34,14 +34,31 @@ public class CmdScalableTopics extends CmdBase {
return getAdmin().scalableTopics();
}
- @Command(description = "Get the list of scalable topics under a namespace")
+ @Command(description = "Get the list of scalable topics under a namespace,
optionally"
+ + " filtered to those whose properties contain a given key/value
pair")
private class ListCmd extends CliCommand {
@Parameters(description = "tenant/namespace", arity = "1")
private String namespace;
+ @Option(names = {"-p", "--property"},
+ description = "Filter to topics whose properties contain this
key=value pair")
+ private String property;
+
@Override
void run() throws Exception {
-
print(scalableTopics().listScalableTopics(validateNamespace(namespace)));
+ String ns = validateNamespace(namespace);
+ if (property == null || property.isEmpty()) {
+ print(scalableTopics().listScalableTopics(ns));
+ return;
+ }
+ int eq = property.indexOf('=');
+ if (eq <= 0 || eq == property.length() - 1) {
+ throw new IllegalArgumentException(
+ "--property must be in the form key=value, got: " +
property);
+ }
+ String key = property.substring(0, eq);
+ String value = property.substring(eq + 1);
+ print(scalableTopics().listScalableTopicsByProperty(ns, key,
value));
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 4af712d3357..641889b3c18 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata.api;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -115,6 +116,24 @@ public interface MetadataCache<T> {
*/
CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction);
+ /**
+ * Atomic read-modify-update with secondary index entries refreshed from
the new value.
+ *
+ * <p>Equivalent to {@link #readModifyUpdate(String, Function)} but on
each successful
+ * write the {@code indexExtractor} is invoked on the new value and the
resulting
+ * {@code indexName -> secondaryKey} map is associated with the record. See
+ * {@link #create(String, Object, Function)} for the index-extractor
contract.
+ *
+ * @param path the path of the object in the metadata store
+ * @param modifyFunction read-modify-update function
+ * @param indexExtractor function that derives secondary index entries
from the new value
+ * @return a future that completes with the new stored value
+ */
+ default CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction,
+ Function<T, Map<String,
String>> indexExtractor) {
+ return readModifyUpdate(path, modifyFunction);
+ }
+
/**
* Create a new object in the metadata store.
* <p>
@@ -130,6 +149,27 @@ public interface MetadataCache<T> {
*/
CompletableFuture<Void> create(String path, T value);
+ /**
+ * Create a new object in the metadata store with secondary index entries
derived from
+ * the value.
+ *
+ * <p>The {@code indexExtractor} is invoked with the new value and returns
a map of
+ * {@code indexName -> secondaryKey}. Stores that support native secondary
indexes
+ * (e.g. Oxia) persist these alongside the record so that
+ * {@link MetadataStore#findByIndex} can serve queries efficiently. Stores
that don't
+ * support indexes ignore the hints; the value is still written normally.
+ *
+ * @param path the path of the object in the metadata store
+ * @param value the object to insert
+ * @param indexExtractor function that derives secondary index entries
from the value
+ * @return a future to track the completion of the operation
+ * @throws AlreadyExistsException if the object is already present
+ */
+ default CompletableFuture<Void> create(String path, T value,
+ Function<T, Map<String, String>>
indexExtractor) {
+ return create(path, value);
+ }
+
/**
* Create or update the value of the given path in the metadata store
without version comparison.
* <p>
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index b9bfdd8e6d2..cab2c3a2e0a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -27,8 +27,10 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -232,6 +234,12 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
@Override
public CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction) {
+ return readModifyUpdate(path, modifyFunction, v ->
Collections.emptyMap());
+ }
+
+ @Override
+ public CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction,
+ Function<T, Map<String,
String>> indexExtractor) {
final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
.thenComposeAsync(optEntry -> {
@@ -254,9 +262,12 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return FutureUtils.exception(t);
}
- return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
- refresh(path);
- }).thenApply(__ -> newValueObj);
+ final T finalNewValue = newValueObj;
+ return putWithIndexes(path, newValue,
Optional.of(expectedVersion),
+ EnumSet.noneOf(CreateOption.class),
+ indexExtractor.apply(finalNewValue))
+ .thenAccept(__ -> refresh(path))
+ .thenApply(__ -> finalNewValue);
}, executor), path);
}
@@ -274,8 +285,16 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
@Override
public CompletableFuture<Void> create(String path, T value) {
+ return create(path, value, v -> Collections.emptyMap());
+ }
+
+ @Override
+ public CompletableFuture<Void> create(String path, T value,
+ Function<T, Map<String, String>>
indexExtractor) {
final var future = new CompletableFuture<Void>();
- serialize(path, value).thenCompose(content -> store.put(path, content,
Optional.of(-1L)))
+ serialize(path, value).thenCompose(content -> putWithIndexes(
+ path, content, Optional.of(-1L),
EnumSet.noneOf(CreateOption.class),
+ indexExtractor.apply(value)))
// Make sure we have the value cached before the operation is
completed
// In addition to caching the value, we need to add a watch on the
path,
// so when/if it changes on any other node, we are notified and we
can
@@ -294,6 +313,19 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return future;
}
+ /**
+ * Route writes through the extended store API when secondary indexes are
present and
+ * the underlying store supports them. Falls back to the base {@code put}
otherwise.
+ */
+ private CompletableFuture<?> putWithIndexes(String path, byte[] content,
Optional<Long> version,
+ EnumSet<CreateOption> options,
+ Map<String, String> indexes) {
+ if (storeExtended != null && indexes != null && !indexes.isEmpty()) {
+ return storeExtended.put(path, content, version, options, indexes);
+ }
+ return store.put(path, content, version);
+ }
+
@Override
public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
return serialize(path, value).thenCompose(bytes -> {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
new file mode 100644
index 00000000000..1469dddee8c
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@link MetadataCache} overloads that accept an
+ * {@code indexExtractor}: every cache write must register the indexes derived
+ * from the value with the underlying store, so a subsequent
+ * {@link org.apache.pulsar.metadata.api.MetadataStore#findByIndex} returns the
+ * record. Updates must refresh the indexes so that the post-update state is
+ * what's queryable.
+ *
+ * <p>Runs against every metadata-store implementation. Stores without native
+ * secondary index support exercise the fallback scan + predicate path inside
+ * {@code findByIndex}; native-index stores (Oxia) consult the index directly.
+ */
+public class MetadataCacheSecondaryIndexTest extends BaseMetadataStoreTest {
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ static class IndexedValue {
+ String owner;
+ String team;
+ }
+
+ /**
+ * Predicate used in the fallback path to identify records matching a
property.
+ * Deserialises the stored bytes to {@link IndexedValue} and checks the
field.
+ */
+ private static java.util.function.Predicate<GetResult> matchOwner(String
owner) {
+ return result -> {
+ try {
+ IndexedValue v =
com.fasterxml.jackson.databind.json.JsonMapper.builder()
+ .build().readValue(result.getValue(),
IndexedValue.class);
+ return owner.equals(v.getOwner());
+ } catch (Exception e) {
+ return false;
+ }
+ };
+ }
+
+ private static java.util.function.Predicate<GetResult> matchTeam(String
team) {
+ return result -> {
+ try {
+ IndexedValue v =
com.fasterxml.jackson.databind.json.JsonMapper.builder()
+ .build().readValue(result.getValue(),
IndexedValue.class);
+ return team.equals(v.getTeam());
+ } catch (Exception e) {
+ return false;
+ }
+ };
+ }
+
+ @Test(dataProvider = "impl")
+ public void createWithIndexExtractorRegistersIndexes(String provider,
+ Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+ MetadataCache<IndexedValue> cache =
store.getMetadataCache(IndexedValue.class);
+
+ String basePath = newKey();
+
+ // Two records share an owner; one is on a different owner. The
extractor
+ // exposes both fields as separate indexes so we can query either one.
+ cache.create(basePath + "/r1", new IndexedValue("alice", "platform"),
+ v -> Map.of("by-owner", v.getOwner(), "by-team",
v.getTeam())).join();
+ cache.create(basePath + "/r2", new IndexedValue("alice", "data"),
+ v -> Map.of("by-owner", v.getOwner(), "by-team",
v.getTeam())).join();
+ cache.create(basePath + "/r3", new IndexedValue("bob", "platform"),
+ v -> Map.of("by-owner", v.getOwner(), "by-team",
v.getTeam())).join();
+
+ // Owner=alice should return r1 + r2.
+ List<GetResult> aliceOwned = store.findByIndex(basePath, "by-owner",
"alice",
+ matchOwner("alice")).join();
+ assertEquals(aliceOwned.size(), 2);
+
+ // Team=platform should return r1 + r3.
+ Set<String> platformPaths = new HashSet<>();
+ for (GetResult r : store.findByIndex(basePath, "by-team", "platform",
+ matchTeam("platform")).join()) {
+ platformPaths.add(r.getStat().getPath());
+ }
+ assertEquals(platformPaths, Set.of(basePath + "/r1", basePath +
"/r3"));
+ }
+
+ @Test(dataProvider = "impl")
+ public void readModifyUpdateRefreshesIndexes(String provider,
+ Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+ MetadataCache<IndexedValue> cache =
store.getMetadataCache(IndexedValue.class);
+
+ String basePath = newKey();
+ java.util.function.Function<IndexedValue, Map<String, String>>
extractor =
+ v -> Map.of("by-owner", v.getOwner());
+
+ cache.create(basePath + "/r1", new IndexedValue("alice", "platform"),
extractor).join();
+ assertEquals(store.findByIndex(basePath, "by-owner", "alice",
matchOwner("alice"))
+ .join().size(), 1);
+
+ // Reassign owner via update — the new owner becomes the queryable one
and the
+ // old owner's lookup must no longer surface this record.
+ cache.readModifyUpdate(basePath + "/r1", current -> new
IndexedValue("bob", current.getTeam()),
+ extractor).join();
+
+ assertEquals(store.findByIndex(basePath, "by-owner", "bob",
matchOwner("bob"))
+ .join().size(), 1);
+ assertEquals(store.findByIndex(basePath, "by-owner", "alice",
matchOwner("alice"))
+ .join().size(), 0);
+ }
+
+ @Test(dataProvider = "impl")
+ public void emptyExtractorBehavesLikePlainCreate(String provider,
+ Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+ MetadataCache<IndexedValue> cache =
store.getMetadataCache(IndexedValue.class);
+
+ String path = newKey();
+
+ // An extractor returning an empty map must not register any indexes —
but the
+ // record must still be written and readable.
+ cache.create(path, new IndexedValue("alice", "platform"), v ->
Map.of()).join();
+
+ assertTrue(cache.get(path).join().isPresent());
+ // No index registered, so a lookup with the would-be index name
returns nothing.
+ assertEquals(store.findByIndex(path, "by-owner", "alice", r ->
false).join().size(), 0);
+ }
+}