This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f73b437fcf [feat][broker] PIP-468: Filter scalable topics by property 
via secondary index (#25632)
4f73b437fcf is described below

commit 4f73b437fcf45373c03356155bebf5dd0af53f4a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 13:51:45 2026 -0700

    [feat][broker] PIP-468: Filter scalable topics by property via secondary 
index (#25632)
---
 .../broker/resources/ScalableTopicResources.java   |  56 ++++++-
 .../pulsar/broker/admin/v2/ScalableTopics.java     |  13 +-
 .../admin/ScalableTopicsListByPropertyTest.java    |  83 ++++++++++
 .../resources/ScalableTopicPropertyIndexTest.java  | 174 +++++++++++++++++++++
 .../apache/pulsar/client/admin/ScalableTopics.java |  23 +++
 .../client/admin/internal/ScalableTopicsImpl.java  |  17 ++
 .../apache/pulsar/admin/cli/CmdScalableTopics.java |  21 ++-
 .../apache/pulsar/metadata/api/MetadataCache.java  |  40 +++++
 .../metadata/cache/impl/MetadataCacheImpl.java     |  40 ++++-
 .../metadata/MetadataCacheSecondaryIndexTest.java  | 166 ++++++++++++++++++++
 10 files changed, 623 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 235f17e2d31..263abe50439 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.broker.resources;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
@@ -28,6 +31,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -54,6 +58,16 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
     private static final String CONSUMERS_SEGMENT = "consumers";
 
+    /**
+     * Use the topic's {@code properties} map verbatim as the secondary-index 
entries.
+     * Each property {@code k -> v} is registered as the index named {@code k} 
with
+     * secondary key {@code v}; querying by that key/value pair via
+     * {@link MetadataStore#findByIndex} returns the record. Index names live 
in a
+     * per-record-type namespace, so there's no need to disambiguate them with 
a prefix.
+     */
+    private static final Function<ScalableTopicMetadata, Map<String, String>> 
PROPERTY_INDEX_EXTRACTOR =
+            metadata -> metadata.getProperties() != null ? 
metadata.getProperties() : Map.of();
+
     private final MetadataCache<SubscriptionMetadata> subscriptionCache;
     private final MetadataCache<ConsumerRegistration> 
consumerRegistrationCache;
 
@@ -64,7 +78,7 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     }
 
     public CompletableFuture<Void> createScalableTopicAsync(TopicName tn, 
ScalableTopicMetadata metadata) {
-        return createAsync(topicPath(tn), metadata);
+        return getCache().create(topicPath(tn), metadata, 
PROPERTY_INDEX_EXTRACTOR);
     }
 
     public CompletableFuture<Optional<ScalableTopicMetadata>> 
getScalableTopicMetadataAsync(TopicName tn) {
@@ -82,7 +96,10 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     public CompletableFuture<Void> updateScalableTopicAsync(TopicName tn,
                                                              
Function<ScalableTopicMetadata,
                                                                      
ScalableTopicMetadata> updateFunction) {
-        return setAsync(topicPath(tn), updateFunction);
+        // Refresh property indexes on every update — the modify function may 
add or remove
+        // properties and the underlying store needs to see the 
post-modification view.
+        return getCache().readModifyUpdate(topicPath(tn), updateFunction, 
PROPERTY_INDEX_EXTRACTOR)
+                .thenApply(__ -> null);
     }
 
     public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
@@ -100,6 +117,41 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
                         .collect(Collectors.toList()));
     }
 
+    /**
+     * List scalable topics in a namespace whose {@code properties} map 
contains the given
+     * key/value pair. On stores with native secondary index support (Oxia) 
this is served
+     * by the index registered at create/update time; otherwise it falls back 
to a children
+     * scan + per-record property check.
+     *
+     * @param ns            the namespace to scope the query to
+     * @param propertyKey   property name to filter on
+     * @param propertyValue exact property value to match
+     * @return fully qualified scalable topic names matching the property
+     */
+    public CompletableFuture<List<String>> findScalableTopicsByPropertyAsync(
+            NamespaceName ns, String propertyKey, String propertyValue) {
+        String scanPathPrefix = joinPath(SCALABLE_TOPIC_PATH, ns.toString());
+        ObjectMapper mapper = 
ObjectMapperFactory.getMapper().getObjectMapper();
+        return getStore().findByIndex(scanPathPrefix, propertyKey, 
propertyValue, result -> {
+                    // Fallback path (no native index): re-check the property 
on the loaded record.
+                    try {
+                        ScalableTopicMetadata md =
+                                mapper.readValue(result.getValue(), 
ScalableTopicMetadata.class);
+                        return md.getProperties() != null
+                                && 
propertyValue.equals(md.getProperties().get(propertyKey));
+                    } catch (IOException e) {
+                        return false;
+                    }
+                })
+                .thenApply(results -> results.stream()
+                        .map(r -> {
+                            String path = r.getStat().getPath();
+                            String encoded = 
path.substring(path.lastIndexOf('/') + 1);
+                            return TopicName.get("topic", ns, 
Codec.decode(encoded)).toString();
+                        })
+                        .collect(Collectors.toList()));
+    }
+
     // --- Subscriptions ---
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 6e7aa7f6723..1994fd8de34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -93,10 +93,19 @@ public class ScalableTopics extends AdminResource {
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
-            @PathParam("namespace") String namespace) {
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Filter by topic property name (must be paired 
with propertyValue)")
+            @QueryParam("propertyKey") String propertyKey,
+            @ApiParam(value = "Filter by topic property value (must be paired 
with propertyKey)")
+            @QueryParam("propertyValue") String propertyValue) {
         validateNamespaceName(tenant, namespace);
+        boolean filterByProperty = propertyKey != null && 
!propertyKey.isEmpty()
+                && propertyValue != null;
         validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.GET_TOPICS)
-                .thenCompose(__ -> 
resources().listScalableTopicsAsync(namespaceName))
+                .thenCompose(__ -> filterByProperty
+                        ? resources().findScalableTopicsByPropertyAsync(
+                                namespaceName, propertyKey, propertyValue)
+                        : resources().listScalableTopicsAsync(namespaceName))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error().attr("clientAppId", 
clientAppId()).attr("namespace", namespaceName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
new file mode 100644
index 00000000000..f775fe3f2df
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the property-filtered list endpoint
+ * ({@code GET 
/admin/v2/scalable/{tenant}/{namespace}?propertyKey&propertyValue}).
+ * Drives the full HTTP path through the {@link 
org.apache.pulsar.client.admin.PulsarAdmin}
+ * client against a real shared broker, verifying that topics created with
+ * properties through the admin API are queryable via the secondary index.
+ */
+public class ScalableTopicsListByPropertyTest extends SharedPulsarBaseTest {
+
+    private String namespace() {
+        return getNamespace();
+    }
+
+    private String topicName(String suffix) {
+        return "topic://" + namespace() + "/" + suffix + "-" + 
UUID.randomUUID().toString().substring(0, 8);
+    }
+
+    @Test
+    public void listScalableTopicsFilteredByProperty() throws Exception {
+        String aliceTopic = topicName("alice");
+        String bobTopic = topicName("bob");
+        String carolTopic = topicName("carol");
+
+        // Each topic gets a different owner; alice and bob share a team. The 
filter
+        // should be able to surface either subset on demand.
+        admin.scalableTopics().createScalableTopic(aliceTopic, 1,
+                Map.of("owner", "alice", "team", "platform"));
+        admin.scalableTopics().createScalableTopic(bobTopic, 1,
+                Map.of("owner", "bob", "team", "platform"));
+        admin.scalableTopics().createScalableTopic(carolTopic, 1,
+                Map.of("owner", "carol", "team", "data"));
+
+        // Filter by owner=alice — single match.
+        List<String> alice = admin.scalableTopics()
+                .listScalableTopicsByProperty(namespace(), "owner", "alice");
+        assertEquals(alice, List.of(aliceTopic));
+
+        // Filter by team=platform — alice + bob.
+        Set<String> platform = new HashSet<>(admin.scalableTopics()
+                .listScalableTopicsByProperty(namespace(), "team", 
"platform"));
+        assertEquals(platform, Set.of(aliceTopic, bobTopic));
+
+        // Unmatched value — empty result.
+        assertTrue(admin.scalableTopics()
+                .listScalableTopicsByProperty(namespace(), "owner", 
"nonexistent")
+                .isEmpty());
+
+        // Sanity-check: the un-filtered listing still returns every topic in 
the namespace.
+        Set<String> all = new 
HashSet<>(admin.scalableTopics().listScalableTopics(namespace()));
+        assertTrue(all.containsAll(Set.of(aliceTopic, bobTopic, carolTopic)),
+                "expected all three created topics to appear in the unfiltered 
list, got " + all);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
new file mode 100644
index 00000000000..f4cd6d1f648
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@code findScalableTopicsByPropertyAsync} entry point on
+ * {@link ScalableTopicResources}: verifies that topic properties registered at
+ * create/update time are queryable via the secondary-index API, that updates
+ * refresh the index, and that the filter is correctly scoped to a namespace.
+ *
+ * <p>Uses {@link LocalMemoryMetadataStore} which does not implement native
+ * secondary indexes, so this exercises the fallback scan + per-record property
+ * predicate path. The Oxia-native path (where the index is consulted directly)
+ * is covered by the metadata-store-level secondary index tests.
+ */
+public class ScalableTopicPropertyIndexTest {
+
+    private MetadataStoreExtended store;
+    private ScalableTopicResources resources;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = new LocalMemoryMetadataStore("memory:local",
+                MetadataStoreConfig.builder().build());
+        resources = new ScalableTopicResources(store, 30);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    private TopicName topicIn(NamespaceName ns, String localName) {
+        return TopicName.get("topic://" + ns + "/" + localName);
+    }
+
+    private ScalableTopicMetadata metaWithProps(Map<String, String> props) {
+        return ScalableTopicMetadata.builder()
+                .epoch(0)
+                .nextSegmentId(1)
+                .properties(new HashMap<>(props))
+                .build();
+    }
+
+    @Test
+    public void findsTopicByExactPropertyValue() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-a");
+
+        resources.createScalableTopicAsync(topicIn(ns, "t-alice"),
+                metaWithProps(Map.of("owner", "alice", "team", 
"platform"))).get();
+        resources.createScalableTopicAsync(topicIn(ns, "t-bob"),
+                metaWithProps(Map.of("owner", "bob", "team", 
"platform"))).get();
+        resources.createScalableTopicAsync(topicIn(ns, "t-carol"),
+                metaWithProps(Map.of("owner", "carol", "team", "data"))).get();
+
+        // Filter by owner=alice — only t-alice matches.
+        List<String> aliceOwned = resources
+                .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+        assertEquals(aliceOwned, List.of("topic://tenant/ns-a/t-alice"));
+
+        // Filter by team=platform — both alice and bob.
+        Set<String> platform = new HashSet<>(resources
+                .findScalableTopicsByPropertyAsync(ns, "team", 
"platform").get());
+        assertEquals(platform, Set.of(
+                "topic://tenant/ns-a/t-alice",
+                "topic://tenant/ns-a/t-bob"));
+    }
+
+    @Test
+    public void findIsScopedToNamespace() throws Exception {
+        NamespaceName nsA = NamespaceName.get("tenant/ns-a");
+        NamespaceName nsB = NamespaceName.get("tenant/ns-b");
+
+        // Same property in two namespaces — find must return only the one we 
asked for.
+        resources.createScalableTopicAsync(topicIn(nsA, "t1"),
+                metaWithProps(Map.of("owner", "alice"))).get();
+        resources.createScalableTopicAsync(topicIn(nsB, "t2"),
+                metaWithProps(Map.of("owner", "alice"))).get();
+
+        List<String> inNsA = resources
+                .findScalableTopicsByPropertyAsync(nsA, "owner", 
"alice").get();
+        assertEquals(inNsA, List.of("topic://tenant/ns-a/t1"));
+
+        List<String> inNsB = resources
+                .findScalableTopicsByPropertyAsync(nsB, "owner", 
"alice").get();
+        assertEquals(inNsB, List.of("topic://tenant/ns-b/t2"));
+    }
+
+    @Test
+    public void noMatchReturnsEmptyList() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-empty");
+        resources.createScalableTopicAsync(topicIn(ns, "t1"),
+                metaWithProps(Map.of("owner", "alice"))).get();
+
+        // Wrong value
+        assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"bob")
+                .get().isEmpty());
+
+        // Wrong key
+        assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "team", 
"alice")
+                .get().isEmpty());
+    }
+
+    @Test
+    public void updateRefreshesIndex() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-update");
+        TopicName tn = topicIn(ns, "t-mutating");
+
+        resources.createScalableTopicAsync(tn,
+                metaWithProps(Map.of("owner", "alice"))).get();
+        assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"alice").get(),
+                List.of(tn.toString()));
+
+        // Reassign owner via update — the new owner must be queryable, and the
+        // old owner's entry must no longer match this topic.
+        resources.updateScalableTopicAsync(tn, current -> {
+            current.getProperties().put("owner", "bob");
+            return current;
+        }).get();
+
+        assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"bob").get(),
+                List.of(tn.toString()));
+        assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", 
"alice")
+                .get().isEmpty());
+    }
+
+    @Test
+    public void topicWithoutPropertiesIsNotMatched() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-noprops");
+        resources.createScalableTopicAsync(topicIn(ns, "t-anon"),
+                metaWithProps(Map.of())).get();
+        resources.createScalableTopicAsync(topicIn(ns, "t-tagged"),
+                metaWithProps(Map.of("owner", "alice"))).get();
+
+        // Filtering by any property must skip the un-tagged record.
+        List<String> matches = resources
+                .findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
+        assertEquals(matches, List.of("topic://tenant/ns-noprops/t-tagged"));
+    }
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index 7db81e0cf7b..74f8a223399 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -49,6 +49,29 @@ public interface ScalableTopics {
      */
     CompletableFuture<List<String>> listScalableTopicsAsync(String namespace);
 
+    /**
+     * Get the list of scalable topics under a namespace whose properties 
contain
+     * the given key/value pair.
+     *
+     * <p>Backed by a secondary index registered on the topic property at 
create/update
+     * time, so the lookup is efficient and does not scan every topic in the 
namespace.
+     *
+     * @param namespace     Namespace name in the format "tenant/namespace"
+     * @param propertyKey   Property name to filter on
+     * @param propertyValue Exact property value to match
+     * @return list of matching scalable topic names
+     */
+    List<String> listScalableTopicsByProperty(String namespace, String 
propertyKey, String propertyValue)
+            throws PulsarAdminException;
+
+    /**
+     * Get the list of scalable topics under a namespace whose properties 
contain
+     * the given key/value pair, asynchronously.
+     */
+    CompletableFuture<List<String>> listScalableTopicsByPropertyAsync(String 
namespace,
+                                                                      String 
propertyKey,
+                                                                      String 
propertyValue);
+
     /**
      * Create a new scalable topic.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index a027b946b72..cda78328166 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -64,6 +64,23 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncGetRequest(namespacePath(ns), new 
GenericType<List<String>>() {});
     }
 
+    @Override
+    public List<String> listScalableTopicsByProperty(String namespace, String 
propertyKey, String propertyValue)
+            throws PulsarAdminException {
+        return sync(() -> listScalableTopicsByPropertyAsync(namespace, 
propertyKey, propertyValue));
+    }
+
+    @Override
+    public CompletableFuture<List<String>> 
listScalableTopicsByPropertyAsync(String namespace,
+                                                                              
String propertyKey,
+                                                                              
String propertyValue) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns)
+                .queryParam("propertyKey", propertyKey)
+                .queryParam("propertyValue", propertyValue);
+        return asyncGetRequest(path, new GenericType<List<String>>() {});
+    }
+
     // --- Create ---
 
     @Override
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index 7badc943783..8991632a694 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -34,14 +34,31 @@ public class CmdScalableTopics extends CmdBase {
         return getAdmin().scalableTopics();
     }
 
-    @Command(description = "Get the list of scalable topics under a namespace")
+    @Command(description = "Get the list of scalable topics under a namespace, 
optionally"
+            + " filtered to those whose properties contain a given key/value 
pair")
     private class ListCmd extends CliCommand {
         @Parameters(description = "tenant/namespace", arity = "1")
         private String namespace;
 
+        @Option(names = {"-p", "--property"},
+                description = "Filter to topics whose properties contain this 
key=value pair")
+        private String property;
+
         @Override
         void run() throws Exception {
-            
print(scalableTopics().listScalableTopics(validateNamespace(namespace)));
+            String ns = validateNamespace(namespace);
+            if (property == null || property.isEmpty()) {
+                print(scalableTopics().listScalableTopics(ns));
+                return;
+            }
+            int eq = property.indexOf('=');
+            if (eq <= 0 || eq == property.length() - 1) {
+                throw new IllegalArgumentException(
+                        "--property must be in the form key=value, got: " + 
property);
+            }
+            String key = property.substring(0, eq);
+            String value = property.substring(eq + 1);
+            print(scalableTopics().listScalableTopicsByProperty(ns, key, 
value));
         }
     }
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 4af712d3357..641889b3c18 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata.api;
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
@@ -115,6 +116,24 @@ public interface MetadataCache<T> {
      */
     CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction);
 
+    /**
+     * Atomic read-modify-update with secondary index entries refreshed from 
the new value.
+     *
+     * <p>Equivalent to {@link #readModifyUpdate(String, Function)} but on 
each successful
+     * write the {@code indexExtractor} is invoked on the new value and the 
resulting
+     * {@code indexName -> secondaryKey} map is associated with the record. See
+     * {@link #create(String, Object, Function)} for the index-extractor 
contract.
+     *
+     * @param path           the path of the object in the metadata store
+     * @param modifyFunction read-modify-update function
+     * @param indexExtractor function that derives secondary index entries 
from the new value
+     * @return a future that completes with the new stored value
+     */
+    default CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction,
+                                                  Function<T, Map<String, 
String>> indexExtractor) {
+        return readModifyUpdate(path, modifyFunction);
+    }
+
     /**
      * Create a new object in the metadata store.
      * <p>
@@ -130,6 +149,27 @@ public interface MetadataCache<T> {
      */
     CompletableFuture<Void> create(String path, T value);
 
+    /**
+     * Create a new object in the metadata store with secondary index entries 
derived from
+     * the value.
+     *
+     * <p>The {@code indexExtractor} is invoked with the new value and returns 
a map of
+     * {@code indexName -> secondaryKey}. Stores that support native secondary 
indexes
+     * (e.g. Oxia) persist these alongside the record so that
+     * {@link MetadataStore#findByIndex} can serve queries efficiently. Stores 
that don't
+     * support indexes ignore the hints; the value is still written normally.
+     *
+     * @param path the path of the object in the metadata store
+     * @param value the object to insert
+     * @param indexExtractor function that derives secondary index entries 
from the value
+     * @return a future to track the completion of the operation
+     * @throws AlreadyExistsException if the object is already present
+     */
+    default CompletableFuture<Void> create(String path, T value,
+                                           Function<T, Map<String, String>> 
indexExtractor) {
+        return create(path, value);
+    }
+
     /**
      * Create or update the value of the given path in the metadata store 
without version comparison.
      * <p>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index b9bfdd8e6d2..cab2c3a2e0a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -27,8 +27,10 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -232,6 +234,12 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     @Override
     public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction) {
+        return readModifyUpdate(path, modifyFunction, v -> 
Collections.emptyMap());
+    }
+
+    @Override
+    public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction,
+                                                 Function<T, Map<String, 
String>> indexExtractor) {
         final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
                 .thenComposeAsync(optEntry -> {
@@ -254,9 +262,12 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                         return FutureUtils.exception(t);
                     }
 
-                    return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
-                        refresh(path);
-                    }).thenApply(__ -> newValueObj);
+                    final T finalNewValue = newValueObj;
+                    return putWithIndexes(path, newValue, 
Optional.of(expectedVersion),
+                                    EnumSet.noneOf(CreateOption.class),
+                                    indexExtractor.apply(finalNewValue))
+                            .thenAccept(__ -> refresh(path))
+                            .thenApply(__ -> finalNewValue);
                 }, executor), path);
     }
 
@@ -274,8 +285,16 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     @Override
     public CompletableFuture<Void> create(String path, T value) {
+        return create(path, value, v -> Collections.emptyMap());
+    }
+
+    @Override
+    public CompletableFuture<Void> create(String path, T value,
+                                          Function<T, Map<String, String>> 
indexExtractor) {
         final var future = new CompletableFuture<Void>();
-        serialize(path, value).thenCompose(content -> store.put(path, content, 
Optional.of(-1L)))
+        serialize(path, value).thenCompose(content -> putWithIndexes(
+                        path, content, Optional.of(-1L), 
EnumSet.noneOf(CreateOption.class),
+                        indexExtractor.apply(value)))
             // Make sure we have the value cached before the operation is 
completed
             // In addition to caching the value, we need to add a watch on the 
path,
             // so when/if it changes on any other node, we are notified and we 
can
@@ -294,6 +313,19 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         return future;
     }
 
+    /**
+     * Route writes through the extended store API when secondary indexes are 
present and
+     * the underlying store supports them. Falls back to the base {@code put} 
otherwise.
+     */
+    private CompletableFuture<?> putWithIndexes(String path, byte[] content, 
Optional<Long> version,
+                                                EnumSet<CreateOption> options,
+                                                Map<String, String> indexes) {
+        if (storeExtended != null && indexes != null && !indexes.isEmpty()) {
+            return storeExtended.put(path, content, version, options, indexes);
+        }
+        return store.put(path, content, version);
+    }
+
     @Override
     public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
         return serialize(path, value).thenCompose(bytes -> {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
new file mode 100644
index 00000000000..1469dddee8c
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@link MetadataCache} overloads that accept an
+ * {@code indexExtractor}: every cache write must register the indexes derived
+ * from the value with the underlying store, so a subsequent
+ * {@link org.apache.pulsar.metadata.api.MetadataStore#findByIndex} returns the
+ * record. Updates must refresh the indexes so that the post-update state is
+ * what's queryable.
+ *
+ * <p>Runs against every metadata-store implementation. Stores without native
+ * secondary index support exercise the fallback scan + predicate path inside
+ * {@code findByIndex}; native-index stores (Oxia) consult the index directly.
+ */
+public class MetadataCacheSecondaryIndexTest extends BaseMetadataStoreTest {
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    static class IndexedValue {
+        String owner;
+        String team;
+    }
+
+    /**
+     * Predicate used in the fallback path to identify records matching a 
property.
+     * Deserialises the stored bytes to {@link IndexedValue} and checks the 
field.
+     */
+    private static java.util.function.Predicate<GetResult> matchOwner(String 
owner) {
+        return result -> {
+            try {
+                IndexedValue v = 
com.fasterxml.jackson.databind.json.JsonMapper.builder()
+                        .build().readValue(result.getValue(), 
IndexedValue.class);
+                return owner.equals(v.getOwner());
+            } catch (Exception e) {
+                return false;
+            }
+        };
+    }
+
+    private static java.util.function.Predicate<GetResult> matchTeam(String 
team) {
+        return result -> {
+            try {
+                IndexedValue v = 
com.fasterxml.jackson.databind.json.JsonMapper.builder()
+                        .build().readValue(result.getValue(), 
IndexedValue.class);
+                return team.equals(v.getTeam());
+            } catch (Exception e) {
+                return false;
+            }
+        };
+    }
+
+    @Test(dataProvider = "impl")
+    public void createWithIndexExtractorRegistersIndexes(String provider,
+                                                          Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        MetadataCache<IndexedValue> cache = 
store.getMetadataCache(IndexedValue.class);
+
+        String basePath = newKey();
+
+        // Two records share an owner; one is on a different owner. The 
extractor
+        // exposes both fields as separate indexes so we can query either one.
+        cache.create(basePath + "/r1", new IndexedValue("alice", "platform"),
+                v -> Map.of("by-owner", v.getOwner(), "by-team", 
v.getTeam())).join();
+        cache.create(basePath + "/r2", new IndexedValue("alice", "data"),
+                v -> Map.of("by-owner", v.getOwner(), "by-team", 
v.getTeam())).join();
+        cache.create(basePath + "/r3", new IndexedValue("bob", "platform"),
+                v -> Map.of("by-owner", v.getOwner(), "by-team", 
v.getTeam())).join();
+
+        // Owner=alice should return r1 + r2.
+        List<GetResult> aliceOwned = store.findByIndex(basePath, "by-owner", 
"alice",
+                matchOwner("alice")).join();
+        assertEquals(aliceOwned.size(), 2);
+
+        // Team=platform should return r1 + r3.
+        Set<String> platformPaths = new HashSet<>();
+        for (GetResult r : store.findByIndex(basePath, "by-team", "platform",
+                matchTeam("platform")).join()) {
+            platformPaths.add(r.getStat().getPath());
+        }
+        assertEquals(platformPaths, Set.of(basePath + "/r1", basePath + 
"/r3"));
+    }
+
+    @Test(dataProvider = "impl")
+    public void readModifyUpdateRefreshesIndexes(String provider,
+                                                  Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        MetadataCache<IndexedValue> cache = 
store.getMetadataCache(IndexedValue.class);
+
+        String basePath = newKey();
+        java.util.function.Function<IndexedValue, Map<String, String>> 
extractor =
+                v -> Map.of("by-owner", v.getOwner());
+
+        cache.create(basePath + "/r1", new IndexedValue("alice", "platform"), 
extractor).join();
+        assertEquals(store.findByIndex(basePath, "by-owner", "alice", 
matchOwner("alice"))
+                .join().size(), 1);
+
+        // Reassign owner via update — the new owner becomes the queryable one 
and the
+        // old owner's lookup must no longer surface this record.
+        cache.readModifyUpdate(basePath + "/r1", current -> new 
IndexedValue("bob", current.getTeam()),
+                extractor).join();
+
+        assertEquals(store.findByIndex(basePath, "by-owner", "bob", 
matchOwner("bob"))
+                .join().size(), 1);
+        assertEquals(store.findByIndex(basePath, "by-owner", "alice", 
matchOwner("alice"))
+                .join().size(), 0);
+    }
+
+    @Test(dataProvider = "impl")
+    public void emptyExtractorBehavesLikePlainCreate(String provider,
+                                                       Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        MetadataCache<IndexedValue> cache = 
store.getMetadataCache(IndexedValue.class);
+
+        String path = newKey();
+
+        // An extractor returning an empty map must not register any indexes — 
but the
+        // record must still be written and readable.
+        cache.create(path, new IndexedValue("alice", "platform"), v -> 
Map.of()).join();
+
+        assertTrue(cache.get(path).join().isPresent());
+        // No index registered, so a lookup with the would-be index name 
returns nothing.
+        assertEquals(store.findByIndex(path, "by-owner", "alice", r -> 
false).join().size(), 0);
+    }
+}


Reply via email to