This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f662459769e [feat] PIP-468: Multi-broker shared cluster + V5
cross-broker tests (#25633)
f662459769e is described below
commit f662459769e364e4b54e11bca539419e2bc1e8b2
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 14:47:19 2026 -0700
[feat] PIP-468: Multi-broker shared cluster + V5 cross-broker tests (#25633)
---
pulsar-broker/build.gradle.kts | 1 +
.../broker/service/scalable/DagWatchSession.java | 10 +
.../service/scalable/ScalableTopicController.java | 44 +--
.../service/SharedMultiBrokerPulsarBaseTest.java | 146 +++++++
.../service/SharedMultiBrokerPulsarCluster.java | 295 ++++++++++++++
.../client/api/v5/V5MultiBrokerClientBaseTest.java | 116 ++++++
.../api/v5/V5MultiBrokerScalableTopicTest.java | 439 +++++++++++++++++++++
7 files changed, 1016 insertions(+), 35 deletions(-)
diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index 38c5b917932..5fdd16c30c5 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -132,6 +132,7 @@ dependencies {
testImplementation(libs.jetty.ee8.proxy)
testImplementation(libs.jetty.websocket.jetty.client)
testImplementation(libs.opentelemetry.sdk.testing)
+ testImplementation(libs.oxia.testcontainers)
testRuntimeOnly(libs.avro.protobuf) {
exclude(group = "com.google.protobuf")
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 51a8ed20c47..ef0f8947b50 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -169,6 +169,16 @@ public class DagWatchSession {
}
}
+ // Propagate the controller-broker URL so V5 clients can connect to
the right broker
+ // for scalable-topic subscribe. Without this the client falls back to
its configured
+ // service URL, which on a multi-broker cluster is rarely the
controller leader.
+ if (response.controllerBrokerUrl() != null) {
+ dag.setControllerBrokerUrl(response.controllerBrokerUrl());
+ }
+ if (response.controllerBrokerUrlTls() != null) {
+ dag.setControllerBrokerUrlTls(response.controllerBrokerUrlTls());
+ }
+
return dag;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index b63ee847e34..8b1805cd315 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -21,10 +21,8 @@ package org.apache.pulsar.broker.service.scalable;
import io.github.merlimat.slog.Logger;
import java.time.Duration;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
@@ -233,9 +231,11 @@ public class ScalableTopicController {
SegmentInfo parent = currentLayout.getAllSegments().get(segmentId);
String parentTopicName = toSegmentPersistentName(parent);
- // Step 1: Discover subscriptions on the parent segment, then create
child
- // segment topics with those subscriptions (routed to owning brokers
via admin API)
- return discoverSubscriptions(parentTopicName)
+ // Step 1: Read the scalable topic's subscriptions from metadata (the
single source
+ // of truth — segment topics may live on different brokers, but the
subscription set
+ // is tracked here), then create child segment topics with those
subscriptions
+ // already provisioned (the create call routes to each segment's
owning broker).
+ return resources.listSubscriptionsAsync(topicName)
.thenCompose(parentSubs -> {
var subList = new java.util.ArrayList<>(parentSubs);
return createSegmentTopic(child1, subList)
@@ -277,13 +277,10 @@ public class ScalableTopicController {
String parent1Topic = toSegmentPersistentName(parent1);
String parent2Topic = toSegmentPersistentName(parent2);
- // Step 1: Discover subscriptions from both parents (union), then
create merged segment
- return discoverSubscriptions(parent1Topic)
- .thenCombine(discoverSubscriptions(parent2Topic), (subs1, subs2) -> {
- Set<String> allSubs = new LinkedHashSet<>(subs1);
- allSubs.addAll(subs2);
- return allSubs;
- })
+ // Step 1: Read the scalable topic's subscriptions from metadata
(single source of
+ // truth, see splitSegment), then create the merged segment topic with
those
+ // subscriptions provisioned.
+ return resources.listSubscriptionsAsync(topicName)
.thenCompose(parentSubs -> createSegmentTopic(merged, new
java.util.ArrayList<>(parentSubs)))
// Step 2: Terminate both parent segment topics
@@ -582,29 +579,6 @@ public class ScalableTopicController {
}
}
- /**
- * Discover all subscription names on a segment topic. Works whether the
topic is
- * on this broker or a remote one by using the admin client.
- */
- private CompletableFuture<Set<String>> discoverSubscriptions(String
segmentTopicName) {
- // Try local first (avoids RPC if the segment is on this broker)
- return brokerService.getTopicIfExists(segmentTopicName)
- .thenCompose(optTopic -> {
- if (optTopic.isPresent()) {
- return CompletableFuture.completedFuture(
- new
LinkedHashSet<>(optTopic.get().getSubscriptions().keySet()));
- }
- // Topic is on a remote broker — use admin client
- try {
- return brokerService.getPulsar().getAdminClient()
-
.topics().getSubscriptionsAsync(segmentTopicName)
- .thenApply(LinkedHashSet::new);
- } catch (PulsarServerException e) {
- return CompletableFuture.failedFuture(e);
- }
- });
- }
-
private CompletableFuture<Void> notifySubscriptions(SegmentLayout layout) {
CompletableFuture<?>[] futures = subscriptions.values().stream()
.map(coordinator -> coordinator.onLayoutChange(layout))
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java
new file mode 100644
index 00000000000..85ae5890993
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Base class for tests that need a shared multi-broker cluster across test
classes.
+ *
+ * <p>Companion to {@link SharedPulsarBaseTest}. Use this when a test
specifically depends on
+ * behavior that only manifests across brokers — namespace ownership transfer,
controller-leader
+ * failover, segment placement on different brokers, V5 client reconnect to a
different broker.
+ * For everything else, prefer the single-broker {@link SharedPulsarBaseTest}:
it's faster, has
+ * fewer moving parts, and is sufficient for most coverage.
+ *
+ * <p>Each test method gets a fresh namespace under {@link
SharedMultiBrokerPulsarCluster#TENANT_NAME}
+ * (created in {@link #setupSharedMultiBrokerTest()} and force-deleted in
+ * {@link #cleanupSharedMultiBrokerTest()}). The cluster itself is JVM-wide
and reused across
+ * every test class that extends this — see {@link
SharedMultiBrokerPulsarCluster}.
+ *
+ * <p>Subclasses get:
+ * <ul>
+ * <li>{@link #admin} / {@link #pulsarClient} — handles aimed at broker 0;
lookups against any
+ * broker correctly redirect to topic owners, so most tests should just
use these.</li>
+ * <li>{@link #brokers} / {@link #admins} / {@link #clients} — full
per-broker lists, in start
+ * order, for tests that need to address a specific broker (e.g.
asserting topic
+ * ownership, killing a specific broker).</li>
+ * <li>{@link #newTopicName()} — generates a unique topic in the test
namespace.</li>
+ * </ul>
+ */
+@CustomLog
+public abstract class SharedMultiBrokerPulsarBaseTest {
+
+ /** All brokers in the shared cluster, in start order. */
+ protected List<PulsarService> brokers;
+ /** Per-broker admin handles, aligned with {@link #brokers}. */
+ protected List<PulsarAdmin> admins;
+ /** Per-broker client handles, aligned with {@link #brokers}. */
+ protected List<PulsarClient> clients;
+
+ /** Convenience: broker 0's admin. */
+ protected PulsarAdmin admin;
+ /** Convenience: broker 0's client. */
+ protected PulsarClient pulsarClient;
+
+ private final List<String> namespaces = new ArrayList<>();
+
+ /** Returns the unique namespace assigned to the current test method. */
+ protected String getNamespace() {
+ return namespaces.get(0);
+ }
+
+ /** Returns the broker service URL for broker {@code index}. */
+ protected String getBrokerServiceUrl(int index) {
+ return brokers.get(index).getBrokerServiceUrl();
+ }
+
+ /** Returns the web service URL for broker {@code index}. */
+ protected String getWebServiceUrl(int index) {
+ return brokers.get(index).getWebServiceAddress();
+ }
+
+ /**
+ * Creates a new {@link PulsarClient} connected to broker {@code index}.
Callers are
+ * responsible for closing the returned client.
+ */
+ protected PulsarClient newPulsarClient(int index) throws
PulsarClientException {
+ return
PulsarClient.builder().serviceUrl(brokers.get(index).getBrokerServiceUrl()).build();
+ }
+
+ /**
+ * Initializes (lazily) the shared cluster singleton and wires the
per-class fields. Called
+ * once per test class.
+ */
+ @BeforeClass(alwaysRun = true)
+ public void setupSharedMultiBrokerCluster() throws Exception {
+ SharedMultiBrokerPulsarCluster cluster =
SharedMultiBrokerPulsarCluster.get();
+ brokers = cluster.getBrokers();
+ admins = cluster.getAdmins();
+ clients = cluster.getClients();
+ admin = cluster.getAdmin();
+ pulsarClient = cluster.getClient();
+ }
+
+ /** Creates a unique namespace for the current test method. */
+ @BeforeMethod(alwaysRun = true)
+ public void setupSharedMultiBrokerTest() throws Exception {
+ createNewNamespace();
+ }
+
+ /** Force-deletes all namespaces created during the test method. */
+ @AfterMethod(alwaysRun = true)
+ public void cleanupSharedMultiBrokerTest() throws Exception {
+ for (String ns : namespaces) {
+ try {
+ admin.namespaces().deleteNamespace(ns, true);
+ log.info().attr("testNamespace", ns).log("Deleted test
namespace");
+ } catch (Exception e) {
+ log.warn().attr("deleteNamespace",
ns).exceptionMessage(e).log("Failed to delete namespace");
+ }
+ }
+ namespaces.clear();
+ }
+
+ /** Creates a new namespace under the shared tenant and registers it for
cleanup. */
+ protected String createNewNamespace() throws Exception {
+ String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
+ String ns = SharedMultiBrokerPulsarCluster.TENANT_NAME + "/" + nsName;
+ admin.namespaces().createNamespace(ns,
Set.of(SharedMultiBrokerPulsarCluster.CLUSTER_NAME));
+ namespaces.add(ns);
+ log.info().attr("testNamespace", ns).log("Created test namespace");
+ return ns;
+ }
+
+ /** Generates a unique persistent topic name within the current test
namespace. */
+ protected String newTopicName() {
+ return "persistent://" + getNamespace() + "/topic-" +
UUID.randomUUID().toString().substring(0, 8);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java
new file mode 100644
index 00000000000..fde85c06ddb
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.oxia.testcontainers.OxiaContainer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import lombok.CustomLog;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+import org.apache.pulsar.tests.ThreadLeakDetectorListener;
+
+/**
+ * JVM-wide singleton that manages a lightweight multi-broker Pulsar cluster
for integration tests.
+ *
+ * <p>Companion to {@link SharedPulsarCluster}, but spins up {@value
#NUM_BROKERS} Pulsar brokers
+ * sharing a single bookie and an in-memory metadata store. Use this when a
test specifically
+ * needs to exercise behavior that only manifests across brokers — namespace
ownership transfer,
+ * controller-leader failover, segment placement on different brokers, V5
client reconnect to a
+ * different broker, etc.
+ *
+ * <p>The first broker's admin and client are exposed as the "primary" handles
via
+ * {@link #getAdmin()} and {@link #getClient()}; per-broker handles are
available via
+ * {@link #getBrokers()}, {@link #getAdmins()}, and {@link #getClients()}.
Lookups against any
+ * broker correctly redirect to the broker that owns the requested topic, so
most tests should
+ * just use the primary handles.
+ *
+ * <p>Lazy on first call to {@link #get()}; closed via JVM shutdown hook.
+ *
+ * @see SharedMultiBrokerPulsarBaseTest
+ */
+@CustomLog
+public class SharedMultiBrokerPulsarCluster {
+
+ public static final String CLUSTER_NAME = "multi-broker-test-cluster";
+ public static final String TENANT_NAME = "multi-broker-test-tenant";
+
+ /**
+ * Number of brokers in the shared cluster. Three is the minimum that lets
us exercise
+ * controller-leader failover (one leader + at least two followers) and
segment placement
+ * across more than one broker.
+ */
+ public static final int NUM_BROKERS = 3;
+
+ private static volatile SharedMultiBrokerPulsarCluster instance;
+
+ private OxiaContainer oxiaServer;
+ private String metadataStoreUrl;
+ private BKCluster bkCluster;
+ private final List<PulsarService> brokers = new ArrayList<>(NUM_BROKERS);
+ private final List<PulsarAdmin> admins = new ArrayList<>(NUM_BROKERS);
+ private final List<PulsarClient> clients = new ArrayList<>(NUM_BROKERS);
+
+ /** Returns the singleton instance, starting the cluster on first
invocation. */
+ public static SharedMultiBrokerPulsarCluster get() throws Exception {
+ if (instance == null) {
+ synchronized (SharedMultiBrokerPulsarCluster.class) {
+ if (instance == null) {
+ SharedMultiBrokerPulsarCluster cluster = new
SharedMultiBrokerPulsarCluster();
+ cluster.start();
+ instance = cluster;
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ instance.close();
+ } catch (Exception e) {
+ log.error().exception(e).log("Failed to close
SharedMultiBrokerPulsarCluster");
+ }
+ }));
+ }
+ }
+ }
+ return instance;
+ }
+
+ /** All brokers in the cluster, in start order. */
+ public List<PulsarService> getBrokers() {
+ return Collections.unmodifiableList(brokers);
+ }
+
+ /** Per-broker {@link PulsarAdmin} handles, in the same order as {@link
#getBrokers()}. */
+ public List<PulsarAdmin> getAdmins() {
+ return Collections.unmodifiableList(admins);
+ }
+
+ /** Per-broker {@link PulsarClient} handles, in the same order as {@link
#getBrokers()}. */
+ public List<PulsarClient> getClients() {
+ return Collections.unmodifiableList(clients);
+ }
+
+ /** Convenience: the first broker's admin. Lookups redirect to
topic-owning brokers. */
+ public PulsarAdmin getAdmin() {
+ return admins.get(0);
+ }
+
+ /** Convenience: the first broker's client. Lookups redirect to
topic-owning brokers. */
+ public PulsarClient getClient() {
+ return clients.get(0);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void start() throws Exception {
+ log.info().attr("brokers", NUM_BROKERS).log("Starting
SharedMultiBrokerPulsarCluster");
+
+ // Real Oxia server (not the in-memory metadata store). Per-topic
leader election
+ // (used by the ScalableTopicController) relies on per-session
ephemeral nodes, and
+ // the in-memory store treats every connection on the same JVM as the
same session
+ // — so multiple brokers all "win" the same election simultaneously.
Oxia gives each
+ // broker its own session and the proper ephemeral / CAS semantics.
Container-based
+ // because oxia ships no in-process server; tests skip cleanly when
Docker isn't
+ // available.
+ oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME);
+ oxiaServer.start();
+ metadataStoreUrl = "oxia://" + oxiaServer.getServiceAddress();
+
+ // Single shared bookie. Same minimal config as SharedPulsarCluster —
write quorum stays
+ // at 1 across brokers because the bookie count is the limiting
factor, not the brokers.
+ ServerConfiguration bkConf = new ServerConfiguration();
+ bkConf.setProperty("dbStorage_writeCacheMaxSizeMb", 32);
+ bkConf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 4);
+ bkConf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 4);
+ bkConf.setProperty("dbStorage_rocksDB_blockCacheSize", 4 * 1024 *
1024);
+ bkConf.setJournalSyncData(false);
+ bkConf.setJournalWriteData(false);
+ bkConf.setProperty("journalMaxGroupWaitMSec", 0L);
+ bkConf.setProperty("journalPreAllocSizeMB", 1);
+ bkConf.setFlushInterval(60000);
+ bkConf.setGcWaitTime(60000);
+ bkConf.setAllowLoopback(true);
+ bkConf.setAdvertisedAddress("127.0.0.1");
+ bkConf.setAllowEphemeralPorts(true);
+ bkConf.setNumAddWorkerThreads(0);
+ bkConf.setNumReadWorkerThreads(0);
+ bkConf.setNumHighPriorityWorkerThreads(0);
+ bkConf.setNumJournalCallbackThreads(0);
+ bkConf.setServerNumIOThreads(1);
+ bkConf.setNumLongPollWorkerThreads(1);
+ bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+
bkConf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+
+ bkCluster = BKCluster.builder()
+ .baseServerConfiguration(bkConf)
+ .metadataServiceUri(metadataStoreUrl)
+ .numBookies(1)
+ .clearOldData(true)
+ .build();
+
+ // Start NUM_BROKERS brokers. The first one provisions the cluster +
tenant; the rest
+ // discover them through the shared metadata store.
+ for (int i = 0; i < NUM_BROKERS; i++) {
+ PulsarService broker = startBroker(i);
+ brokers.add(broker);
+
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(broker.getWebServiceAddress())
+ .build();
+ admins.add(admin);
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(broker.getBrokerServiceUrl())
+ .build();
+ clients.add(client);
+
+ if (i == 0) {
+ admin.clusters().createCluster(CLUSTER_NAME,
+ ClusterData.builder()
+ .serviceUrl(broker.getWebServiceAddress())
+ .brokerServiceUrl(broker.getBrokerServiceUrl())
+ .build());
+ admin.tenants().createTenant(TENANT_NAME,
+ TenantInfo.builder()
+ .allowedClusters(Set.of(CLUSTER_NAME))
+ .build());
+ }
+ }
+
+ log.info()
+ .attr("brokers",
brokers.stream().map(PulsarService::getBrokerServiceUrl).toList())
+ .log("SharedMultiBrokerPulsarCluster started");
+
+ // Reset thread-leak baseline so cluster threads aren't reported
against the first test.
+ ThreadLeakDetectorListener.resetCapturedThreads();
+ }
+
+ private PulsarService startBroker(int index) throws Exception {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setMetadataStoreUrl(metadataStoreUrl);
+ config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
+ config.setClusterName(CLUSTER_NAME);
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(0));
+ config.setManagedLedgerDefaultEnsembleSize(1);
+ config.setManagedLedgerDefaultWriteQuorum(1);
+ config.setManagedLedgerDefaultAckQuorum(1);
+ // More bundles than brokers so the load balancer has room to spread
ownership.
+ config.setDefaultNumberOfNamespaceBundles(NUM_BROKERS * 2);
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setNumExecutorThreadPoolSize(5);
+ config.setManagedLedgerCacheSizeMB(8);
+ config.setActiveConsumerFailoverDelayTimeMillis(0);
+ config.setAllowAutoTopicCreationType(
+
org.apache.pulsar.common.policies.data.TopicType.NON_PARTITIONED);
+ config.setBookkeeperNumberOfChannelsPerBookie(1);
+ config.setBookkeeperClientExposeStatsToPrometheus(false);
+ config.setDispatcherRetryBackoffInitialTimeInMs(0);
+ config.setDispatcherRetryBackoffMaxTimeInMs(0);
+ config.setForceDeleteNamespaceAllowed(true);
+ config.setForceDeleteTenantAllowed(true);
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.setBrokerDeduplicationEnabled(true);
+
+ // Reduce thread pool sizes — three brokers each spinning up the
default counts is heavy.
+ config.setNumIOThreads(2);
+ config.setNumOrderedExecutorThreads(2);
+ config.setNumHttpServerThreads(4);
+ config.setBookkeeperClientNumWorkerThreads(2);
+ config.setBookkeeperClientNumIoThreads(2);
+ config.setNumCacheExecutorThreadPoolSize(1);
+ config.setManagedLedgerNumSchedulerThreads(2);
+ config.setTopicOrderedExecutorThreadNum(4);
+
+ // Load balancer is what makes this multi-broker. Disable shedding to
keep tests
+ // deterministic: bundles assigned at first lookup don't move around
mid-test.
+ config.setLoadBalancerEnabled(true);
+ config.setLoadBalancerSheddingEnabled(false);
+
+ log.info().attr("index", index).log("Starting broker");
+ PulsarService broker = new PulsarService(config);
+ broker.start();
+ log.info().attr("index", index)
+ .attr("broker", broker.getBrokerServiceUrl())
+ .attr("web", broker.getWebServiceAddress())
+ .log("Broker started");
+ return broker;
+ }
+
+ private void close() throws Exception {
+ log.info("Closing SharedMultiBrokerPulsarCluster");
+ // Tear down in reverse order: clients first so they don't interfere
with broker shutdown.
+ for (int i = clients.size() - 1; i >= 0; i--) {
+ try {
+ clients.get(i).close();
+ } catch (Exception e) {
+ log.warn().attr("index", i).exceptionMessage(e).log("Failed to
close client");
+ }
+ }
+ for (int i = admins.size() - 1; i >= 0; i--) {
+ try {
+ admins.get(i).close();
+ } catch (Exception e) {
+ log.warn().attr("index", i).exceptionMessage(e).log("Failed to
close admin");
+ }
+ }
+ for (int i = brokers.size() - 1; i >= 0; i--) {
+ try {
+ brokers.get(i).close();
+ } catch (Exception e) {
+ log.warn().attr("index", i).exceptionMessage(e).log("Failed to
close broker");
+ }
+ }
+ if (bkCluster != null) {
+ bkCluster.close();
+ }
+ if (oxiaServer != null) {
+ oxiaServer.close();
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java
new file mode 100644
index 00000000000..609da36a58e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedMultiBrokerPulsarBaseTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * Base class for V5 client end-to-end tests that need a multi-broker cluster.
+ *
+ * <p>Companion to {@link V5ClientBaseTest} (single-broker). Use this when a
test specifically
+ * exercises broker-side coordination across brokers — controller-leader
placement,
+ * admin-operation redirects to the controller-owning broker, V5 consumer
reconnection to a
+ * different broker, etc. For non-multi-broker concerns prefer the
single-broker variant: it's
+ * faster and has fewer moving parts.
+ *
+ * <p>Builds one V5 {@link PulsarClient} per broker in the shared cluster
(initialized in
+ * {@code @BeforeClass}, closed in {@code @AfterClass}) so tests can drive
operations through a
+ * specific broker and observe how the system routes them. {@link #v5Client}
aliases the first
+ * client; {@link #v5Clients} exposes the full list aligned with {@link
#brokers}.
+ */
+public abstract class V5MultiBrokerClientBaseTest extends
SharedMultiBrokerPulsarBaseTest {
+
+ /** V5 client connected to broker 0. Shorthand for {@code
v5Clients.get(0)}. */
+ protected PulsarClient v5Client;
+
+ /** Per-broker V5 clients, in the same order as {@link #brokers}. */
+ protected List<PulsarClient> v5Clients;
+
+ private final List<PulsarClient> trackedClients = new ArrayList<>();
+
+ @BeforeClass(alwaysRun = true)
+ public void setupSharedV5MultiBrokerClients() throws Exception {
+ List<PulsarClient> built = new ArrayList<>(brokers.size());
+ for (var broker : brokers) {
+ built.add(PulsarClient.builder()
+ .serviceUrl(broker.getBrokerServiceUrl())
+ .build());
+ }
+ v5Clients = Collections.unmodifiableList(built);
+ v5Client = v5Clients.get(0);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void closeSharedV5MultiBrokerClients() throws Exception {
+ if (v5Clients != null) {
+ for (PulsarClient c : v5Clients) {
+ try {
+ c.close();
+ } catch (Exception ignored) {
+ // best-effort cleanup
+ }
+ }
+ v5Clients = null;
+ v5Client = null;
+ }
+ }
+
+ /**
+ * Build a fresh V5 client connected to the given broker. The returned
client is
+ * registered for automatic close at the end of the current test method —
useful for
+ * tests that want a dedicated client (e.g. to sever just its connection).
+ */
+ protected PulsarClient newV5Client(int brokerIndex) throws Exception {
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(brokers.get(brokerIndex).getBrokerServiceUrl())
+ .build();
+ trackedClients.add(client);
+ return client;
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void closeTrackedV5Clients() {
+ for (int i = trackedClients.size() - 1; i >= 0; i--) {
+ try {
+ trackedClients.get(i).close();
+ } catch (Exception ignored) {
+ // best-effort cleanup
+ }
+ }
+ trackedClients.clear();
+ }
+
+ /**
+ * Create a scalable topic in the current test namespace and return its
+ * {@code topic://...} name.
+ */
+ protected String newScalableTopic(int numInitialSegments) throws Exception
{
+ String name = "topic://" + getNamespace() + "/scalable-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic(name, numInitialSegments);
+ return name;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java
new file mode 100644
index 00000000000..b6e747ea634
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for V5 scalable topics across a multi-broker cluster.
+ *
+ * <p>Each scalable topic has a single {@link
org.apache.pulsar.broker.service.scalable.ScalableTopicController}
+ * leader; the leader's brokerId is recorded in metadata. With three brokers
and many topics
+ * the leader role spreads naturally across the cluster, so tests need to cope
with admin
+ * operations and consumer sessions that target a non-owning broker. These
tests assert that
+ * the routing layers do their job:
+ *
+ * <ul>
+ * <li>controller leadership distributes across brokers given enough
topics;</li>
+ * <li>{@code admin.scalableTopics()} mutating calls (split / merge) follow
the HTTP 307
+ * redirect emitted by non-leader brokers and complete on the
leader;</li>
+ * <li>V5 StreamConsumer attached through a non-owning broker still reaches
the right
+ * controller via the lookup → controller-URL path and receives
messages;</li>
+ * <li>V5 CheckpointConsumer with {@code consumerGroup(...)} (the only
Checkpoint mode that
+ * runs through the controller) does the same.</li>
+ * </ul>
+ */
+public class V5MultiBrokerScalableTopicTest extends
V5MultiBrokerClientBaseTest {
+
+ /**
+ * Smoke: create a scalable topic, produce on broker 0's V5 client,
consume on the last
+ * broker's V5 client. Lookups across brokers must converge on the segment
owners.
+ */
+ @Test
+ public void testProduceConsumeAcrossBrokers() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer =
v5Clients.get(0).newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ QueueConsumer<String> consumer = v5Clients.get(v5Clients.size() - 1)
+ .newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("smoke-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 30;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missing message #" + i);
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent);
+ }
+
+ /**
+ * Drive controller materialization through a different broker each
iteration so the
+ * leader-election race naturally lands on different brokers. The first
broker to call
+ * {@code getOrCreateController} for a topic wins (subsequent peers just
observe the
+ * existing leader); cycling the "first" broker spreads leadership across
the cluster.
+ */
+ @Test
+ public void testControllerLeadershipDistributesAcrossBrokers() throws
Exception {
+ int numTopics = brokers.size() * 4;
+ Set<String> leaders = new HashSet<>();
+ for (int i = 0; i < numTopics; i++) {
+ String topic = newScalableTopic(1);
+ int firstBroker = i % brokers.size();
+ // Force this broker to materialize the controller first → it
becomes leader.
+
brokers.get(firstBroker).getBrokerService().getScalableTopicService()
+ .getOrCreateController(TopicName.get(topic))
+ .get(5, java.util.concurrent.TimeUnit.SECONDS);
+ String leader = findControllerLeader(topic);
+ assertNotNull(leader, "controller leader must be elected for " +
topic);
+ leaders.add(leader);
+ }
+ assertEquals(leaders.size(), brokers.size(),
+ "expected controller leadership to spread across every broker,
got " + leaders);
+ }
+
+ /**
+ * Splitting a scalable topic must succeed when the request hits a
non-leader broker:
+ * the admin layer redirects the call to the controller-leader broker via
307, the admin
+ * client follows the redirect, and the metadata reflects the split.
+ */
+ @Test
+ public void testSplitFromNonLeaderBrokerRedirectsToOwner() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ // Force the parent segment topic to load by producing a message —
split's
+ // terminate step requires the segment to exist on its owning broker.
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ producer.newMessage().value("warm-up").send();
+
+ int leaderIndex = findControllerLeaderIndex(topic);
+ int nonLeaderIndex = (leaderIndex + 1) % brokers.size();
+
+ long activeId = singleActiveSegmentId(topic);
+ // Issue the split through a non-leader broker's admin. Without
redirect this would
+ // fail with a "not the leader" error; with redirect the leader
applies the split.
+ admins.get(nonLeaderIndex).scalableTopics().splitSegment(topic,
activeId);
+
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2, "split must produce 2 active children");
+ });
+ }
+
+ /**
+ * Same as {@link #testSplitFromNonLeaderBrokerRedirectsToOwner()}, but
for merge:
+ * prepare a topic with two adjacent active children (via a split), then
merge them
+ * through a non-leader broker's admin and assert the merge completed.
+ */
+ @Test
+ public void testMergeFromNonLeaderBrokerRedirectsToOwner() throws
Exception {
+ String topic = newScalableTopic(1);
+ // Warm-up — same reason as split: the parent segment topic must be
loaded
+ // on its owning broker before split/merge can terminate it.
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ producer.newMessage().value("warm-up").send();
+
+ long parentId = singleActiveSegmentId(topic);
+ admin.scalableTopics().splitSegment(topic, parentId);
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ for (var seg :
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2);
+ });
+
+ var meta = admin.scalableTopics().getMetadata(topic);
+ long[] activeIds = new long[2];
+ int idx = 0;
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeIds[idx++] = seg.getSegmentId();
+ }
+ }
+
+ int leaderIndex = findControllerLeaderIndex(topic);
+ int nonLeaderIndex = (leaderIndex + 1) % brokers.size();
+ admins.get(nonLeaderIndex).scalableTopics()
+ .mergeSegments(topic, activeIds[0], activeIds[1]);
+
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ for (var seg :
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 1, "merge must collapse to a single active
segment");
+ });
+ }
+
+ /**
+ * A V5 StreamConsumer subscribed via a non-owning broker must still reach
the controller
+ * leader through the DAG-watch lookup → controller-URL path. Two
consumers sharing the
+ * subscription on different brokers should split segments via the
controller and together
+ * deliver every message exactly once.
+ */
+ @Test
+ public void testStreamConsumerControllerCoordinationAcrossBrokers() throws
Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "cross-broker-stream";
+
+ @Cleanup
+ Producer<String> producer =
v5Clients.get(0).newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ int leaderIndex = findControllerLeaderIndex(topic);
+ int nonLeaderA = (leaderIndex + 1) % brokers.size();
+ int nonLeaderB = (leaderIndex + 2) % brokers.size();
+
+ @Cleanup
+ StreamConsumer<String> a =
v5Clients.get(nonLeaderA).newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("alice")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ StreamConsumer<String> b =
v5Clients.get(nonLeaderB).newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("bob")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 80;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = ConcurrentHashMap.newKeySet();
+ Set<String> aGot = ConcurrentHashMap.newKeySet();
+ Set<String> bGot = ConcurrentHashMap.newKeySet();
+ Thread ta = drainStream(a, received, aGot);
+ Thread tb = drainStream(b, received, bGot);
+ ta.join();
+ tb.join();
+
+ assertEquals(received, sent, "every message must be delivered exactly
once across the group");
+ Set<String> overlap = new HashSet<>(aGot);
+ overlap.retainAll(bGot);
+ assertTrue(overlap.isEmpty(), "no message should be delivered to both
consumers, overlap=" + overlap);
+ assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+ "controller must split segments across both consumers (a=" +
aGot.size()
+ + " b=" + bGot.size() + ")");
+ }
+
+ /**
+ * V5 CheckpointConsumer with {@code consumerGroup(...)} routes its
session through the
+ * topic controller (same path as StreamConsumer). The same cross-broker
test as above:
+ * two checkpoint consumers in a group, attached via non-owning brokers,
must receive
+ * disjoint subsets that together cover every produced message.
+ */
+ @Test
+ public void testCheckpointConsumerControllerCoordinationAcrossBrokers()
throws Exception {
+ String topic = newScalableTopic(4);
+ String group = "cross-broker-checkpoint-group";
+
+ @Cleanup
+ Producer<String> producer =
v5Clients.get(0).newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ int leaderIndex = findControllerLeaderIndex(topic);
+ int nonLeaderA = (leaderIndex + 1) % brokers.size();
+ int nonLeaderB = (leaderIndex + 2) % brokers.size();
+
+ @Cleanup
+ CheckpointConsumer<String> a = v5Clients.get(nonLeaderA)
+ .newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+ @Cleanup
+ CheckpointConsumer<String> b = v5Clients.get(nonLeaderB)
+ .newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int n = 80;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = ConcurrentHashMap.newKeySet();
+ Set<String> aGot = ConcurrentHashMap.newKeySet();
+ Set<String> bGot = ConcurrentHashMap.newKeySet();
+ Thread ta = drainCheckpoint(a, received, aGot);
+ Thread tb = drainCheckpoint(b, received, bGot);
+ ta.join();
+ tb.join();
+
+ assertEquals(received, sent, "every message must be delivered exactly
once across the group");
+ Set<String> overlap = new HashSet<>(aGot);
+ overlap.retainAll(bGot);
+ assertTrue(overlap.isEmpty(),
+ "no message should be delivered to both checkpoint consumers,
overlap=" + overlap);
+ assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+ "controller must split segments across both consumers (a=" +
aGot.size()
+ + " b=" + bGot.size() + ")");
+ }
+
+ // --- Helpers ---
+
+ /**
+ * Returns the brokerId of the controller leader for {@code topic}. Forces
the controller
+ * to materialize on every broker (so leader election runs), then waits
until every
+ * broker's metadata store reflects the elected leader. The latter wait is
what makes the
+ * subsequent V5 subscribe deterministic: the DAG-watch lookup from any
broker reads the
+ * controller znode via its own metadata store, and we need that read to
return the
+ * leader URL — not the empty fallback that pushes the client onto a
non-leader broker.
+ */
+ private String findControllerLeader(String topic) throws Exception {
+ TopicName tn = TopicName.get(topic);
+ // Step 1: force controller materialization + leader election on every
broker.
+ for (PulsarService broker : brokers) {
+
broker.getBrokerService().getScalableTopicService().getOrCreateController(tn)
+ .get(5, java.util.concurrent.TimeUnit.SECONDS);
+ }
+ // Step 2: wait until each broker's metadata store sees the
controller-lock znode.
+ // Without this, a lookup against a follower can return an empty
controller URL —
+ // the watch hasn't propagated yet — and the client subscribes to the
wrong broker.
+ Awaitility.await().untilAsserted(() -> {
+ for (PulsarService broker : brokers) {
+ var resources =
broker.getPulsarResources().getScalableTopicResources();
+ var optValue =
resources.getStore().get(resources.controllerLockPath(tn))
+ .get(5, java.util.concurrent.TimeUnit.SECONDS);
+ assertTrue(optValue.isPresent(),
+ "broker " + broker.getBrokerId()
+ + " must see controller lock for " + topic);
+ }
+ });
+ var controller =
brokers.get(0).getBrokerService().getScalableTopicService()
+ .getOrCreateController(tn).get();
+ return controller.getLeaderBrokerId().get().orElseThrow();
+ }
+
+ private int findControllerLeaderIndex(String topic) throws Exception {
+ String leaderBrokerId = findControllerLeader(topic);
+ for (int i = 0; i < brokers.size(); i++) {
+ if (brokers.get(i).getBrokerId().equals(leaderBrokerId)) {
+ return i;
+ }
+ }
+ throw new AssertionError("controller leader '" + leaderBrokerId
+ + "' does not match any broker in cluster");
+ }
+
+ /**
+ * Returns the segment id of the (single) active segment of {@code topic}.
Convenience
+ * for tests that work on a freshly-created scalable topic with one
initial segment.
+ */
+ private long singleActiveSegmentId(String topic) throws Exception {
+ var meta = admin.scalableTopics().getMetadata(topic);
+ long active = -1;
+ int count = 0;
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ active = seg.getSegmentId();
+ count++;
+ }
+ }
+ assertEquals(count, 1, "expected exactly one active segment");
+ assertNotEquals(active, -1L);
+ return active;
+ }
+
+ private Thread drainStream(StreamConsumer<String> consumer, Set<String>
all, Set<String> mine) {
+ Thread t = new Thread(() -> {
+ try {
+ MessageId last = null;
+ while (true) {
+ Message<String> msg =
consumer.receive(Duration.ofSeconds(1));
+ if (msg == null) {
+ if (last != null) {
+ consumer.acknowledgeCumulative(last);
+ }
+ return;
+ }
+ all.add(msg.value());
+ mine.add(msg.value());
+ last = msg.id();
+ }
+ } catch (Exception ignored) {
+ }
+ }, "stream-consumer-drainer");
+ t.start();
+ return t;
+ }
+
+ private Thread drainCheckpoint(CheckpointConsumer<String> consumer,
+ Set<String> all, Set<String> mine) {
+ Thread t = new Thread(() -> {
+ try {
+ while (true) {
+ Message<String> msg =
consumer.receive(Duration.ofSeconds(1));
+ if (msg == null) {
+ return;
+ }
+ all.add(msg.value());
+ mine.add(msg.value());
+ }
+ } catch (Exception ignored) {
+ }
+ }, "checkpoint-consumer-drainer");
+ t.start();
+ return t;
+ }
+}