This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f662459769e [feat] PIP-468: Multi-broker shared cluster + V5 
cross-broker tests (#25633)
f662459769e is described below

commit f662459769e364e4b54e11bca539419e2bc1e8b2
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 14:47:19 2026 -0700

    [feat] PIP-468: Multi-broker shared cluster + V5 cross-broker tests (#25633)
---
 pulsar-broker/build.gradle.kts                     |   1 +
 .../broker/service/scalable/DagWatchSession.java   |  10 +
 .../service/scalable/ScalableTopicController.java  |  44 +--
 .../service/SharedMultiBrokerPulsarBaseTest.java   | 146 +++++++
 .../service/SharedMultiBrokerPulsarCluster.java    | 295 ++++++++++++++
 .../client/api/v5/V5MultiBrokerClientBaseTest.java | 116 ++++++
 .../api/v5/V5MultiBrokerScalableTopicTest.java     | 439 +++++++++++++++++++++
 7 files changed, 1016 insertions(+), 35 deletions(-)

diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index 38c5b917932..5fdd16c30c5 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -132,6 +132,7 @@ dependencies {
     testImplementation(libs.jetty.ee8.proxy)
     testImplementation(libs.jetty.websocket.jetty.client)
     testImplementation(libs.opentelemetry.sdk.testing)
+    testImplementation(libs.oxia.testcontainers)
     testRuntimeOnly(libs.avro.protobuf) {
         exclude(group = "com.google.protobuf")
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 51a8ed20c47..ef0f8947b50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -169,6 +169,16 @@ public class DagWatchSession {
             }
         }
 
+        // Propagate the controller-broker URL so V5 clients can connect to 
the right broker
+        // for scalable-topic subscribe. Without this the client falls back to 
its configured
+        // service URL, which on a multi-broker cluster is rarely the 
controller leader.
+        if (response.controllerBrokerUrl() != null) {
+            dag.setControllerBrokerUrl(response.controllerBrokerUrl());
+        }
+        if (response.controllerBrokerUrlTls() != null) {
+            dag.setControllerBrokerUrlTls(response.controllerBrokerUrlTls());
+        }
+
         return dag;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index b63ee847e34..8b1805cd315 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -21,10 +21,8 @@ package org.apache.pulsar.broker.service.scalable;
 import io.github.merlimat.slog.Logger;
 import java.time.Duration;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import lombok.Getter;
@@ -233,9 +231,11 @@ public class ScalableTopicController {
         SegmentInfo parent = currentLayout.getAllSegments().get(segmentId);
         String parentTopicName = toSegmentPersistentName(parent);
 
-        // Step 1: Discover subscriptions on the parent segment, then create 
child
-        // segment topics with those subscriptions (routed to owning brokers 
via admin API)
-        return discoverSubscriptions(parentTopicName)
+        // Step 1: Read the scalable topic's subscriptions from metadata (the 
single source
+        // of truth — segment topics may live on different brokers, but the 
subscription set
+        // is tracked here), then create child segment topics with those 
subscriptions
+        // already provisioned (the create call routes to each segment's 
owning broker).
+        return resources.listSubscriptionsAsync(topicName)
           .thenCompose(parentSubs -> {
               var subList = new java.util.ArrayList<>(parentSubs);
               return createSegmentTopic(child1, subList)
@@ -277,13 +277,10 @@ public class ScalableTopicController {
         String parent1Topic = toSegmentPersistentName(parent1);
         String parent2Topic = toSegmentPersistentName(parent2);
 
-        // Step 1: Discover subscriptions from both parents (union), then 
create merged segment
-        return discoverSubscriptions(parent1Topic)
-          .thenCombine(discoverSubscriptions(parent2Topic), (subs1, subs2) -> {
-              Set<String> allSubs = new LinkedHashSet<>(subs1);
-              allSubs.addAll(subs2);
-              return allSubs;
-          })
+        // Step 1: Read the scalable topic's subscriptions from metadata 
(single source of
+        // truth, see splitSegment), then create the merged segment topic with 
those
+        // subscriptions provisioned.
+        return resources.listSubscriptionsAsync(topicName)
           .thenCompose(parentSubs -> createSegmentTopic(merged, new 
java.util.ArrayList<>(parentSubs)))
 
           // Step 2: Terminate both parent segment topics
@@ -582,29 +579,6 @@ public class ScalableTopicController {
         }
     }
 
-    /**
-     * Discover all subscription names on a segment topic. Works whether the 
topic is
-     * on this broker or a remote one by using the admin client.
-     */
-    private CompletableFuture<Set<String>> discoverSubscriptions(String 
segmentTopicName) {
-        // Try local first (avoids RPC if the segment is on this broker)
-        return brokerService.getTopicIfExists(segmentTopicName)
-                .thenCompose(optTopic -> {
-                    if (optTopic.isPresent()) {
-                        return CompletableFuture.completedFuture(
-                                new 
LinkedHashSet<>(optTopic.get().getSubscriptions().keySet()));
-                    }
-                    // Topic is on a remote broker — use admin client
-                    try {
-                        return brokerService.getPulsar().getAdminClient()
-                                
.topics().getSubscriptionsAsync(segmentTopicName)
-                                .thenApply(LinkedHashSet::new);
-                    } catch (PulsarServerException e) {
-                        return CompletableFuture.failedFuture(e);
-                    }
-                });
-    }
-
     private CompletableFuture<Void> notifySubscriptions(SegmentLayout layout) {
         CompletableFuture<?>[] futures = subscriptions.values().stream()
                 .map(coordinator -> coordinator.onLayoutChange(layout))
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java
new file mode 100644
index 00000000000..85ae5890993
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Base class for tests that need a shared multi-broker cluster across test 
classes.
+ *
+ * <p>Companion to {@link SharedPulsarBaseTest}. Use this when a test 
specifically depends on
+ * behavior that only manifests across brokers — namespace ownership transfer, 
controller-leader
+ * failover, segment placement on different brokers, V5 client reconnect to a 
different broker.
+ * For everything else, prefer the single-broker {@link SharedPulsarBaseTest}: 
it's faster, has
+ * fewer moving parts, and is sufficient for most coverage.
+ *
+ * <p>Each test method gets a fresh namespace under {@link 
SharedMultiBrokerPulsarCluster#TENANT_NAME}
+ * (created in {@link #setupSharedMultiBrokerTest()} and force-deleted in
+ * {@link #cleanupSharedMultiBrokerTest()}). The cluster itself is JVM-wide 
and reused across
+ * every test class that extends this — see {@link 
SharedMultiBrokerPulsarCluster}.
+ *
+ * <p>Subclasses get:
+ * <ul>
+ *   <li>{@link #admin} / {@link #pulsarClient} — handles aimed at broker 0; 
lookups against any
+ *       broker correctly redirect to topic owners, so most tests should just 
use these.</li>
+ *   <li>{@link #brokers} / {@link #admins} / {@link #clients} — full 
per-broker lists, in start
+ *       order, for tests that need to address a specific broker (e.g. 
asserting topic
+ *       ownership, killing a specific broker).</li>
+ *   <li>{@link #newTopicName()} — generates a unique topic in the test 
namespace.</li>
+ * </ul>
+ */
+@CustomLog
+public abstract class SharedMultiBrokerPulsarBaseTest {
+
+    /** All brokers in the shared cluster, in start order. */
+    protected List<PulsarService> brokers;
+    /** Per-broker admin handles, aligned with {@link #brokers}. */
+    protected List<PulsarAdmin> admins;
+    /** Per-broker client handles, aligned with {@link #brokers}. */
+    protected List<PulsarClient> clients;
+
+    /** Convenience: broker 0's admin. */
+    protected PulsarAdmin admin;
+    /** Convenience: broker 0's client. */
+    protected PulsarClient pulsarClient;
+
+    private final List<String> namespaces = new ArrayList<>();
+
+    /** Returns the unique namespace assigned to the current test method. */
+    protected String getNamespace() {
+        return namespaces.get(0);
+    }
+
+    /** Returns the broker service URL for broker {@code index}. */
+    protected String getBrokerServiceUrl(int index) {
+        return brokers.get(index).getBrokerServiceUrl();
+    }
+
+    /** Returns the web service URL for broker {@code index}. */
+    protected String getWebServiceUrl(int index) {
+        return brokers.get(index).getWebServiceAddress();
+    }
+
+    /**
+     * Creates a new {@link PulsarClient} connected to broker {@code index}. 
Callers are
+     * responsible for closing the returned client.
+     */
+    protected PulsarClient newPulsarClient(int index) throws 
PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(brokers.get(index).getBrokerServiceUrl()).build();
+    }
+
+    /**
+     * Initializes (lazily) the shared cluster singleton and wires the 
per-class fields. Called
+     * once per test class.
+     */
+    @BeforeClass(alwaysRun = true)
+    public void setupSharedMultiBrokerCluster() throws Exception {
+        SharedMultiBrokerPulsarCluster cluster = 
SharedMultiBrokerPulsarCluster.get();
+        brokers = cluster.getBrokers();
+        admins = cluster.getAdmins();
+        clients = cluster.getClients();
+        admin = cluster.getAdmin();
+        pulsarClient = cluster.getClient();
+    }
+
+    /** Creates a unique namespace for the current test method. */
+    @BeforeMethod(alwaysRun = true)
+    public void setupSharedMultiBrokerTest() throws Exception {
+        createNewNamespace();
+    }
+
+    /** Force-deletes all namespaces created during the test method. */
+    @AfterMethod(alwaysRun = true)
+    public void cleanupSharedMultiBrokerTest() throws Exception {
+        for (String ns : namespaces) {
+            try {
+                admin.namespaces().deleteNamespace(ns, true);
+                log.info().attr("testNamespace", ns).log("Deleted test 
namespace");
+            } catch (Exception e) {
+                log.warn().attr("deleteNamespace", 
ns).exceptionMessage(e).log("Failed to delete namespace");
+            }
+        }
+        namespaces.clear();
+    }
+
+    /** Creates a new namespace under the shared tenant and registers it for 
cleanup. */
+    protected String createNewNamespace() throws Exception {
+        String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
+        String ns = SharedMultiBrokerPulsarCluster.TENANT_NAME + "/" + nsName;
+        admin.namespaces().createNamespace(ns, 
Set.of(SharedMultiBrokerPulsarCluster.CLUSTER_NAME));
+        namespaces.add(ns);
+        log.info().attr("testNamespace", ns).log("Created test namespace");
+        return ns;
+    }
+
+    /** Generates a unique persistent topic name within the current test 
namespace. */
+    protected String newTopicName() {
+        return "persistent://" + getNamespace() + "/topic-" + 
UUID.randomUUID().toString().substring(0, 8);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java
new file mode 100644
index 00000000000..fde85c06ddb
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.oxia.testcontainers.OxiaContainer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import lombok.CustomLog;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+import org.apache.pulsar.tests.ThreadLeakDetectorListener;
+
+/**
+ * JVM-wide singleton that manages a lightweight multi-broker Pulsar cluster 
for integration tests.
+ *
+ * <p>Companion to {@link SharedPulsarCluster}, but spins up {@value 
#NUM_BROKERS} Pulsar brokers
+ * sharing a single bookie and an in-memory metadata store. Use this when a 
test specifically
+ * needs to exercise behavior that only manifests across brokers — namespace 
ownership transfer,
+ * controller-leader failover, segment placement on different brokers, V5 
client reconnect to a
+ * different broker, etc.
+ *
+ * <p>The first broker's admin and client are exposed as the "primary" handles 
via
+ * {@link #getAdmin()} and {@link #getClient()}; per-broker handles are 
available via
+ * {@link #getBrokers()}, {@link #getAdmins()}, and {@link #getClients()}. 
Lookups against any
+ * broker correctly redirect to the broker that owns the requested topic, so 
most tests should
+ * just use the primary handles.
+ *
+ * <p>Lazy on first call to {@link #get()}; closed via JVM shutdown hook.
+ *
+ * @see SharedMultiBrokerPulsarBaseTest
+ */
+@CustomLog
+public class SharedMultiBrokerPulsarCluster {
+
+    public static final String CLUSTER_NAME = "multi-broker-test-cluster";
+    public static final String TENANT_NAME = "multi-broker-test-tenant";
+
+    /**
+     * Number of brokers in the shared cluster. Three is the minimum that lets 
us exercise
+     * controller-leader failover (one leader + at least two followers) and 
segment placement
+     * across more than one broker.
+     */
+    public static final int NUM_BROKERS = 3;
+
+    private static volatile SharedMultiBrokerPulsarCluster instance;
+
+    private OxiaContainer oxiaServer;
+    private String metadataStoreUrl;
+    private BKCluster bkCluster;
+    private final List<PulsarService> brokers = new ArrayList<>(NUM_BROKERS);
+    private final List<PulsarAdmin> admins = new ArrayList<>(NUM_BROKERS);
+    private final List<PulsarClient> clients = new ArrayList<>(NUM_BROKERS);
+
+    /** Returns the singleton instance, starting the cluster on first 
invocation. */
+    public static SharedMultiBrokerPulsarCluster get() throws Exception {
+        if (instance == null) {
+            synchronized (SharedMultiBrokerPulsarCluster.class) {
+                if (instance == null) {
+                    SharedMultiBrokerPulsarCluster cluster = new 
SharedMultiBrokerPulsarCluster();
+                    cluster.start();
+                    instance = cluster;
+                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                        try {
+                            instance.close();
+                        } catch (Exception e) {
+                            log.error().exception(e).log("Failed to close 
SharedMultiBrokerPulsarCluster");
+                        }
+                    }));
+                }
+            }
+        }
+        return instance;
+    }
+
+    /** All brokers in the cluster, in start order. */
+    public List<PulsarService> getBrokers() {
+        return Collections.unmodifiableList(brokers);
+    }
+
+    /** Per-broker {@link PulsarAdmin} handles, in the same order as {@link 
#getBrokers()}. */
+    public List<PulsarAdmin> getAdmins() {
+        return Collections.unmodifiableList(admins);
+    }
+
+    /** Per-broker {@link PulsarClient} handles, in the same order as {@link 
#getBrokers()}. */
+    public List<PulsarClient> getClients() {
+        return Collections.unmodifiableList(clients);
+    }
+
+    /** Convenience: the first broker's admin. Lookups redirect to 
topic-owning brokers. */
+    public PulsarAdmin getAdmin() {
+        return admins.get(0);
+    }
+
+    /** Convenience: the first broker's client. Lookups redirect to 
topic-owning brokers. */
+    public PulsarClient getClient() {
+        return clients.get(0);
+    }
+
+    @SuppressWarnings("deprecation")
+    private void start() throws Exception {
+        log.info().attr("brokers", NUM_BROKERS).log("Starting 
SharedMultiBrokerPulsarCluster");
+
+        // Real Oxia server (not the in-memory metadata store). Per-topic 
leader election
+        // (used by the ScalableTopicController) relies on per-session 
ephemeral nodes, and
+        // the in-memory store treats every connection on the same JVM as the 
same session
+        // — so multiple brokers all "win" the same election simultaneously. 
Oxia gives each
+        // broker its own session and the proper ephemeral / CAS semantics. 
Container-based
+        // because oxia ships no in-process server; tests skip cleanly when 
Docker isn't
+        // available.
+        oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME);
+        oxiaServer.start();
+        metadataStoreUrl = "oxia://" + oxiaServer.getServiceAddress();
+
+        // Single shared bookie. Same minimal config as SharedPulsarCluster — 
write quorum stays
+        // at 1 across brokers because the bookie count is the limiting 
factor, not the brokers.
+        ServerConfiguration bkConf = new ServerConfiguration();
+        bkConf.setProperty("dbStorage_writeCacheMaxSizeMb", 32);
+        bkConf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 4);
+        bkConf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 4);
+        bkConf.setProperty("dbStorage_rocksDB_blockCacheSize", 4 * 1024 * 
1024);
+        bkConf.setJournalSyncData(false);
+        bkConf.setJournalWriteData(false);
+        bkConf.setProperty("journalMaxGroupWaitMSec", 0L);
+        bkConf.setProperty("journalPreAllocSizeMB", 1);
+        bkConf.setFlushInterval(60000);
+        bkConf.setGcWaitTime(60000);
+        bkConf.setAllowLoopback(true);
+        bkConf.setAdvertisedAddress("127.0.0.1");
+        bkConf.setAllowEphemeralPorts(true);
+        bkConf.setNumAddWorkerThreads(0);
+        bkConf.setNumReadWorkerThreads(0);
+        bkConf.setNumHighPriorityWorkerThreads(0);
+        bkConf.setNumJournalCallbackThreads(0);
+        bkConf.setServerNumIOThreads(1);
+        bkConf.setNumLongPollWorkerThreads(1);
+        bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+        
bkConf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+
+        bkCluster = BKCluster.builder()
+                .baseServerConfiguration(bkConf)
+                .metadataServiceUri(metadataStoreUrl)
+                .numBookies(1)
+                .clearOldData(true)
+                .build();
+
+        // Start NUM_BROKERS brokers. The first one provisions the cluster + 
tenant; the rest
+        // discover them through the shared metadata store.
+        for (int i = 0; i < NUM_BROKERS; i++) {
+            PulsarService broker = startBroker(i);
+            brokers.add(broker);
+
+            PulsarAdmin admin = PulsarAdmin.builder()
+                    .serviceHttpUrl(broker.getWebServiceAddress())
+                    .build();
+            admins.add(admin);
+
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(broker.getBrokerServiceUrl())
+                    .build();
+            clients.add(client);
+
+            if (i == 0) {
+                admin.clusters().createCluster(CLUSTER_NAME,
+                        ClusterData.builder()
+                                .serviceUrl(broker.getWebServiceAddress())
+                                .brokerServiceUrl(broker.getBrokerServiceUrl())
+                                .build());
+                admin.tenants().createTenant(TENANT_NAME,
+                        TenantInfo.builder()
+                                .allowedClusters(Set.of(CLUSTER_NAME))
+                                .build());
+            }
+        }
+
+        log.info()
+                .attr("brokers", 
brokers.stream().map(PulsarService::getBrokerServiceUrl).toList())
+                .log("SharedMultiBrokerPulsarCluster started");
+
+        // Reset thread-leak baseline so cluster threads aren't reported 
against the first test.
+        ThreadLeakDetectorListener.resetCapturedThreads();
+    }
+
+    private PulsarService startBroker(int index) throws Exception {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setMetadataStoreUrl(metadataStoreUrl);
+        config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
+        config.setClusterName(CLUSTER_NAME);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setManagedLedgerDefaultEnsembleSize(1);
+        config.setManagedLedgerDefaultWriteQuorum(1);
+        config.setManagedLedgerDefaultAckQuorum(1);
+        // More bundles than brokers so the load balancer has room to spread 
ownership.
+        config.setDefaultNumberOfNamespaceBundles(NUM_BROKERS * 2);
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        config.setNumExecutorThreadPoolSize(5);
+        config.setManagedLedgerCacheSizeMB(8);
+        config.setActiveConsumerFailoverDelayTimeMillis(0);
+        config.setAllowAutoTopicCreationType(
+                
org.apache.pulsar.common.policies.data.TopicType.NON_PARTITIONED);
+        config.setBookkeeperNumberOfChannelsPerBookie(1);
+        config.setBookkeeperClientExposeStatsToPrometheus(false);
+        config.setDispatcherRetryBackoffInitialTimeInMs(0);
+        config.setDispatcherRetryBackoffMaxTimeInMs(0);
+        config.setForceDeleteNamespaceAllowed(true);
+        config.setForceDeleteTenantAllowed(true);
+        config.setBrokerDeleteInactiveTopicsEnabled(false);
+        config.setBrokerDeduplicationEnabled(true);
+
+        // Reduce thread pool sizes — three brokers each spinning up the 
default counts is heavy.
+        config.setNumIOThreads(2);
+        config.setNumOrderedExecutorThreads(2);
+        config.setNumHttpServerThreads(4);
+        config.setBookkeeperClientNumWorkerThreads(2);
+        config.setBookkeeperClientNumIoThreads(2);
+        config.setNumCacheExecutorThreadPoolSize(1);
+        config.setManagedLedgerNumSchedulerThreads(2);
+        config.setTopicOrderedExecutorThreadNum(4);
+
+        // Load balancer is what makes this multi-broker. Disable shedding to 
keep tests
+        // deterministic: bundles assigned at first lookup don't move around 
mid-test.
+        config.setLoadBalancerEnabled(true);
+        config.setLoadBalancerSheddingEnabled(false);
+
+        log.info().attr("index", index).log("Starting broker");
+        PulsarService broker = new PulsarService(config);
+        broker.start();
+        log.info().attr("index", index)
+                .attr("broker", broker.getBrokerServiceUrl())
+                .attr("web", broker.getWebServiceAddress())
+                .log("Broker started");
+        return broker;
+    }
+
+    private void close() throws Exception {
+        log.info("Closing SharedMultiBrokerPulsarCluster");
+        // Tear down in reverse order: clients first so they don't interfere 
with broker shutdown.
+        for (int i = clients.size() - 1; i >= 0; i--) {
+            try {
+                clients.get(i).close();
+            } catch (Exception e) {
+                log.warn().attr("index", i).exceptionMessage(e).log("Failed to 
close client");
+            }
+        }
+        for (int i = admins.size() - 1; i >= 0; i--) {
+            try {
+                admins.get(i).close();
+            } catch (Exception e) {
+                log.warn().attr("index", i).exceptionMessage(e).log("Failed to 
close admin");
+            }
+        }
+        for (int i = brokers.size() - 1; i >= 0; i--) {
+            try {
+                brokers.get(i).close();
+            } catch (Exception e) {
+                log.warn().attr("index", i).exceptionMessage(e).log("Failed to 
close broker");
+            }
+        }
+        if (bkCluster != null) {
+            bkCluster.close();
+        }
+        if (oxiaServer != null) {
+            oxiaServer.close();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java
new file mode 100644
index 00000000000..609da36a58e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedMultiBrokerPulsarBaseTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * Base class for V5 client end-to-end tests that need a multi-broker cluster.
+ *
+ * <p>Companion to {@link V5ClientBaseTest} (single-broker). Use this when a 
test specifically
+ * exercises broker-side coordination across brokers — controller-leader 
placement,
+ * admin-operation redirects to the controller-owning broker, V5 consumer 
reconnection to a
+ * different broker, etc. For non-multi-broker concerns prefer the 
single-broker variant: it's
+ * faster and has fewer moving parts.
+ *
+ * <p>Builds one V5 {@link PulsarClient} per broker in the shared cluster 
(initialized in
+ * {@code @BeforeClass}, closed in {@code @AfterClass}) so tests can drive 
operations through a
+ * specific broker and observe how the system routes them. {@link #v5Client} 
aliases the first
+ * client; {@link #v5Clients} exposes the full list aligned with {@link 
#brokers}.
+ */
+public abstract class V5MultiBrokerClientBaseTest extends 
SharedMultiBrokerPulsarBaseTest {
+
+    /** V5 client connected to broker 0. Shorthand for {@code 
v5Clients.get(0)}. */
+    protected PulsarClient v5Client;
+
+    /** Per-broker V5 clients, in the same order as {@link #brokers}. */
+    protected List<PulsarClient> v5Clients;
+
+    private final List<PulsarClient> trackedClients = new ArrayList<>();
+
+    @BeforeClass(alwaysRun = true)
+    public void setupSharedV5MultiBrokerClients() throws Exception {
+        List<PulsarClient> built = new ArrayList<>(brokers.size());
+        for (var broker : brokers) {
+            built.add(PulsarClient.builder()
+                    .serviceUrl(broker.getBrokerServiceUrl())
+                    .build());
+        }
+        v5Clients = Collections.unmodifiableList(built);
+        v5Client = v5Clients.get(0);
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void closeSharedV5MultiBrokerClients() throws Exception {
+        if (v5Clients != null) {
+            for (PulsarClient c : v5Clients) {
+                try {
+                    c.close();
+                } catch (Exception ignored) {
+                    // best-effort cleanup
+                }
+            }
+            v5Clients = null;
+            v5Client = null;
+        }
+    }
+
+    /**
+     * Build a fresh V5 client connected to the given broker. The returned 
client is
+     * registered for automatic close at the end of the current test method — 
useful for
+     * tests that want a dedicated client (e.g. to sever just its connection).
+     */
+    protected PulsarClient newV5Client(int brokerIndex) throws Exception {
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(brokers.get(brokerIndex).getBrokerServiceUrl())
+                .build();
+        trackedClients.add(client);
+        return client;
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void closeTrackedV5Clients() {
+        for (int i = trackedClients.size() - 1; i >= 0; i--) {
+            try {
+                trackedClients.get(i).close();
+            } catch (Exception ignored) {
+                // best-effort cleanup
+            }
+        }
+        trackedClients.clear();
+    }
+
+    /**
+     * Create a scalable topic in the current test namespace and return its
+     * {@code topic://...} name.
+     */
+    protected String newScalableTopic(int numInitialSegments) throws Exception 
{
+        String name = "topic://" + getNamespace() + "/scalable-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        admin.scalableTopics().createScalableTopic(name, numInitialSegments);
+        return name;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java
new file mode 100644
index 00000000000..b6e747ea634
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for V5 scalable topics across a multi-broker cluster.
+ *
+ * <p>Each scalable topic has a single {@link 
org.apache.pulsar.broker.service.scalable.ScalableTopicController}
+ * leader; the leader's brokerId is recorded in metadata. With three brokers 
and many topics
+ * the leader role spreads naturally across the cluster, so tests need to cope 
with admin
+ * operations and consumer sessions that target a non-owning broker. These 
tests assert that
+ * the routing layers do their job:
+ *
+ * <ul>
+ *   <li>controller leadership distributes across brokers given enough 
topics;</li>
+ *   <li>{@code admin.scalableTopics()} mutating calls (split / merge) follow 
the HTTP 307
+ *       redirect emitted by non-leader brokers and complete on the 
leader;</li>
+ *   <li>V5 StreamConsumer attached through a non-owning broker still reaches 
the right
+ *       controller via the lookup → controller-URL path and receives 
messages;</li>
+ *   <li>V5 CheckpointConsumer with {@code consumerGroup(...)} (the only 
Checkpoint mode that
+ *       runs through the controller) does the same.</li>
+ * </ul>
+ */
+public class V5MultiBrokerScalableTopicTest extends 
V5MultiBrokerClientBaseTest {
+
+    /**
+     * Smoke: create a scalable topic, produce on broker 0's V5 client, 
consume on the last
+     * broker's V5 client. Lookups across brokers must converge on the segment 
owners.
+     */
+    @Test
+    public void testProduceConsumeAcrossBrokers() throws Exception {
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = 
v5Clients.get(0).newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        QueueConsumer<String> consumer = v5Clients.get(v5Clients.size() - 1)
+                .newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("smoke-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 30;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missing message #" + i);
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent);
+    }
+
+    /**
+     * Drive controller materialization through a different broker each 
iteration so the
+     * leader-election race naturally lands on different brokers. The first 
broker to call
+     * {@code getOrCreateController} for a topic wins (subsequent peers just 
observe the
+     * existing leader); cycling the "first" broker spreads leadership across 
the cluster.
+     */
+    @Test
+    public void testControllerLeadershipDistributesAcrossBrokers() throws 
Exception {
+        int numTopics = brokers.size() * 4;
+        Set<String> leaders = new HashSet<>();
+        for (int i = 0; i < numTopics; i++) {
+            String topic = newScalableTopic(1);
+            int firstBroker = i % brokers.size();
+            // Force this broker to materialize the controller first → it 
becomes leader.
+            
brokers.get(firstBroker).getBrokerService().getScalableTopicService()
+                    .getOrCreateController(TopicName.get(topic))
+                    .get(5, java.util.concurrent.TimeUnit.SECONDS);
+            String leader = findControllerLeader(topic);
+            assertNotNull(leader, "controller leader must be elected for " + 
topic);
+            leaders.add(leader);
+        }
+        assertEquals(leaders.size(), brokers.size(),
+                "expected controller leadership to spread across every broker, 
got " + leaders);
+    }
+
+    /**
+     * Splitting a scalable topic must succeed when the request hits a 
non-leader broker:
+     * the admin layer redirects the call to the controller-leader broker via 
307, the admin
+     * client follows the redirect, and the metadata reflects the split.
+     */
+    @Test
+    public void testSplitFromNonLeaderBrokerRedirectsToOwner() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        // Force the parent segment topic to load by producing a message — 
split's
+        // terminate step requires the segment to exist on its owning broker.
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        producer.newMessage().value("warm-up").send();
+
+        int leaderIndex = findControllerLeaderIndex(topic);
+        int nonLeaderIndex = (leaderIndex + 1) % brokers.size();
+
+        long activeId = singleActiveSegmentId(topic);
+        // Issue the split through a non-leader broker's admin. Without 
redirect this would
+        // fail with a "not the leader" error; with redirect the leader 
applies the split.
+        admins.get(nonLeaderIndex).scalableTopics().splitSegment(topic, 
activeId);
+
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var meta = admin.scalableTopics().getMetadata(topic);
+            for (var seg : meta.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2, "split must produce 2 active children");
+        });
+    }
+
+    /**
+     * Same as {@link #testSplitFromNonLeaderBrokerRedirectsToOwner()}, but 
for merge:
+     * prepare a topic with two adjacent active children (via a split), then 
merge them
+     * through a non-leader broker's admin and assert the merge completed.
+     */
+    @Test
+    public void testMergeFromNonLeaderBrokerRedirectsToOwner() throws 
Exception {
+        String topic = newScalableTopic(1);
+        // Warm-up — same reason as split: the parent segment topic must be 
loaded
+        // on its owning broker before split/merge can terminate it.
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        producer.newMessage().value("warm-up").send();
+
+        long parentId = singleActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, parentId);
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            for (var seg : 
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2);
+        });
+
+        var meta = admin.scalableTopics().getMetadata(topic);
+        long[] activeIds = new long[2];
+        int idx = 0;
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeIds[idx++] = seg.getSegmentId();
+            }
+        }
+
+        int leaderIndex = findControllerLeaderIndex(topic);
+        int nonLeaderIndex = (leaderIndex + 1) % brokers.size();
+        admins.get(nonLeaderIndex).scalableTopics()
+                .mergeSegments(topic, activeIds[0], activeIds[1]);
+
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            for (var seg : 
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 1, "merge must collapse to a single active 
segment");
+        });
+    }
+
+    /**
+     * A V5 StreamConsumer subscribed via a non-owning broker must still reach 
the controller
+     * leader through the DAG-watch lookup → controller-URL path. Two 
consumers sharing the
+     * subscription on different brokers should split segments via the 
controller and together
+     * deliver every message exactly once.
+     */
+    @Test
+    public void testStreamConsumerControllerCoordinationAcrossBrokers() throws 
Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "cross-broker-stream";
+
+        @Cleanup
+        Producer<String> producer = 
v5Clients.get(0).newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        int leaderIndex = findControllerLeaderIndex(topic);
+        int nonLeaderA = (leaderIndex + 1) % brokers.size();
+        int nonLeaderB = (leaderIndex + 2) % brokers.size();
+
+        @Cleanup
+        StreamConsumer<String> a = 
v5Clients.get(nonLeaderA).newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("alice")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        StreamConsumer<String> b = 
v5Clients.get(nonLeaderB).newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("bob")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 80;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = ConcurrentHashMap.newKeySet();
+        Set<String> aGot = ConcurrentHashMap.newKeySet();
+        Set<String> bGot = ConcurrentHashMap.newKeySet();
+        Thread ta = drainStream(a, received, aGot);
+        Thread tb = drainStream(b, received, bGot);
+        ta.join();
+        tb.join();
+
+        assertEquals(received, sent, "every message must be delivered exactly 
once across the group");
+        Set<String> overlap = new HashSet<>(aGot);
+        overlap.retainAll(bGot);
+        assertTrue(overlap.isEmpty(), "no message should be delivered to both 
consumers, overlap=" + overlap);
+        assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+                "controller must split segments across both consumers (a=" + 
aGot.size()
+                        + " b=" + bGot.size() + ")");
+    }
+
+    /**
+     * V5 CheckpointConsumer with {@code consumerGroup(...)} routes its 
session through the
+     * topic controller (same path as StreamConsumer). The same cross-broker 
test as above:
+     * two checkpoint consumers in a group, attached via non-owning brokers, 
must receive
+     * disjoint subsets that together cover every produced message.
+     */
+    @Test
+    public void testCheckpointConsumerControllerCoordinationAcrossBrokers() 
throws Exception {
+        String topic = newScalableTopic(4);
+        String group = "cross-broker-checkpoint-group";
+
+        @Cleanup
+        Producer<String> producer = 
v5Clients.get(0).newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        int leaderIndex = findControllerLeaderIndex(topic);
+        int nonLeaderA = (leaderIndex + 1) % brokers.size();
+        int nonLeaderB = (leaderIndex + 2) % brokers.size();
+
+        @Cleanup
+        CheckpointConsumer<String> a = v5Clients.get(nonLeaderA)
+                .newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+        @Cleanup
+        CheckpointConsumer<String> b = v5Clients.get(nonLeaderB)
+                .newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int n = 80;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = ConcurrentHashMap.newKeySet();
+        Set<String> aGot = ConcurrentHashMap.newKeySet();
+        Set<String> bGot = ConcurrentHashMap.newKeySet();
+        Thread ta = drainCheckpoint(a, received, aGot);
+        Thread tb = drainCheckpoint(b, received, bGot);
+        ta.join();
+        tb.join();
+
+        assertEquals(received, sent, "every message must be delivered exactly 
once across the group");
+        Set<String> overlap = new HashSet<>(aGot);
+        overlap.retainAll(bGot);
+        assertTrue(overlap.isEmpty(),
+                "no message should be delivered to both checkpoint consumers, 
overlap=" + overlap);
+        assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+                "controller must split segments across both consumers (a=" + 
aGot.size()
+                        + " b=" + bGot.size() + ")");
+    }
+
+    // --- Helpers ---
+
+    /**
+     * Returns the brokerId of the controller leader for {@code topic}. Forces 
the controller
+     * to materialize on every broker (so leader election runs), then waits 
until every
+     * broker's metadata store reflects the elected leader. The latter wait is 
what makes the
+     * subsequent V5 subscribe deterministic: the DAG-watch lookup from any 
broker reads the
+     * controller znode via its own metadata store, and we need that read to 
return the
+     * leader URL — not the empty fallback that pushes the client onto a 
non-leader broker.
+     */
+    private String findControllerLeader(String topic) throws Exception {
+        TopicName tn = TopicName.get(topic);
+        // Step 1: force controller materialization + leader election on every 
broker.
+        for (PulsarService broker : brokers) {
+            
broker.getBrokerService().getScalableTopicService().getOrCreateController(tn)
+                    .get(5, java.util.concurrent.TimeUnit.SECONDS);
+        }
+        // Step 2: wait until each broker's metadata store sees the 
controller-lock znode.
+        // Without this, a lookup against a follower can return an empty 
controller URL —
+        // the watch hasn't propagated yet — and the client subscribes to the 
wrong broker.
+        Awaitility.await().untilAsserted(() -> {
+            for (PulsarService broker : brokers) {
+                var resources = 
broker.getPulsarResources().getScalableTopicResources();
+                var optValue = 
resources.getStore().get(resources.controllerLockPath(tn))
+                        .get(5, java.util.concurrent.TimeUnit.SECONDS);
+                assertTrue(optValue.isPresent(),
+                        "broker " + broker.getBrokerId()
+                                + " must see controller lock for " + topic);
+            }
+        });
+        var controller = 
brokers.get(0).getBrokerService().getScalableTopicService()
+                .getOrCreateController(tn).get();
+        return controller.getLeaderBrokerId().get().orElseThrow();
+    }
+
+    private int findControllerLeaderIndex(String topic) throws Exception {
+        String leaderBrokerId = findControllerLeader(topic);
+        for (int i = 0; i < brokers.size(); i++) {
+            if (brokers.get(i).getBrokerId().equals(leaderBrokerId)) {
+                return i;
+            }
+        }
+        throw new AssertionError("controller leader '" + leaderBrokerId
+                + "' does not match any broker in cluster");
+    }
+
+    /**
+     * Returns the segment id of the (single) active segment of {@code topic}. 
Convenience
+     * for tests that work on a freshly-created scalable topic with one 
initial segment.
+     */
+    private long singleActiveSegmentId(String topic) throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        long active = -1;
+        int count = 0;
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                active = seg.getSegmentId();
+                count++;
+            }
+        }
+        assertEquals(count, 1, "expected exactly one active segment");
+        assertNotEquals(active, -1L);
+        return active;
+    }
+
+    private Thread drainStream(StreamConsumer<String> consumer, Set<String> 
all, Set<String> mine) {
+        Thread t = new Thread(() -> {
+            try {
+                MessageId last = null;
+                while (true) {
+                    Message<String> msg = 
consumer.receive(Duration.ofSeconds(1));
+                    if (msg == null) {
+                        if (last != null) {
+                            consumer.acknowledgeCumulative(last);
+                        }
+                        return;
+                    }
+                    all.add(msg.value());
+                    mine.add(msg.value());
+                    last = msg.id();
+                }
+            } catch (Exception ignored) {
+            }
+        }, "stream-consumer-drainer");
+        t.start();
+        return t;
+    }
+
+    private Thread drainCheckpoint(CheckpointConsumer<String> consumer,
+                                   Set<String> all, Set<String> mine) {
+        Thread t = new Thread(() -> {
+            try {
+                while (true) {
+                    Message<String> msg = 
consumer.receive(Duration.ofSeconds(1));
+                    if (msg == null) {
+                        return;
+                    }
+                    all.add(msg.value());
+                    mine.add(msg.value());
+                }
+            } catch (Exception ignored) {
+            }
+        }, "checkpoint-consumer-drainer");
+        t.start();
+        return t;
+    }
+}

Reply via email to