This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2cf0aa763c21f3b38bc008552c6773f184f9fdb2 Author: Lari Hotari <[email protected]> AuthorDate: Sun Jun 7 23:10:15 2026 +0300 [fix][broker] Fix tableview divergence in ServiceUnitStateTableViewSyncer causing flaky tests (#25946) --- .../channel/ServiceUnitStateTableViewSyncer.java | 188 +++++++++--- .../ExtensibleLoadManagerImplBaseTest.java | 16 +- .../extensions/ExtensibleLoadManagerImplTest.java | 320 ++++++++++++++------- 3 files changed, 376 insertions(+), 148 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java index 45ff0dcb267..999c7bea93a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java @@ -23,20 +23,26 @@ import static org.apache.pulsar.broker.ServiceConfiguration.ServiceUnitTableView import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.COMPACTION_THRESHOLD; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.configureSystemTopics; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.jspecify.annotations.NonNull; /** * Defines ServiceUnitTableViewSyncer. @@ -47,10 +53,15 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; public class ServiceUnitStateTableViewSyncer implements Closeable { private static final int MAX_CONCURRENT_SYNC_COUNT = 100; private static final int SYNC_WAIT_TIME_IN_SECS = 300; + private static final long RECONCILE_INTERVAL_IN_MILLIS = 5_000; + private static final BiConsumer<String, ServiceUnitStateData> NOOP_CONSUMER = (__, ___) -> { + }; + private volatile int syncWaitTimeInSecs = SYNC_WAIT_TIME_IN_SECS; private PulsarService pulsar; private volatile ServiceUnitStateTableView systemTopicTableView; private volatile ServiceUnitStateTableView metadataStoreTableView; private volatile boolean isActive = false; + private final ObjectWriter jsonWriter = ObjectMapperFactory.getMapper().writer(); public void start(PulsarService pulsar) @@ -82,53 +93,77 @@ public class ServiceUnitStateTableViewSyncer implements Closeable { } private CompletableFuture<Void> syncToSystemTopic(String key, ServiceUnitStateData data) { - return systemTopicTableView.put(key, data); + return logIfFailed(sync(systemTopicTableView, key, data), key, data, "systemTopic"); } private CompletableFuture<Void> syncToMetadataStore(String key, ServiceUnitStateData data) { - return metadataStoreTableView.put(key, data); + return logIfFailed(sync(metadataStoreTableView, key, data), key, data, "metadataStore"); } - private void dummy(String key, ServiceUnitStateData data) { + private CompletableFuture<Void> sync(ServiceUnitStateTableView dst, String key, ServiceUnitStateData data) { + // A null tail item is a tombstone: the source view removed the key. Route it to + // delete() rather than put(): the metadata-store view's put() rejects null + // (@NonNull) and the system-topic view's delete() is itself a null-valued put(), + // so a uniform delete keeps both sync directions symmetric and prevents a missed + // deletion from leaving the two views with different sizes (which would make + // waitUntilSynced spin until the timeout budget). + return data == null ? dst.delete(key) : dst.put(key, data); + } + + private CompletableFuture<Void> logIfFailed(CompletableFuture<Void> future, String key, + ServiceUnitStateData data, String dst) { + return future.whenComplete((__, e) -> { + if (e != null && !(e instanceof PulsarClientException.AlreadyClosedException)) { + log.warn("Failed to sync tableview item; sizes may diverge until the next update;" + + " dst={} serviceUnit={} data={}", dst, key, data, e); + } + }); } private void syncExistingItems() throws IOException, ExecutionException, InterruptedException, TimeoutException { long started = System.currentTimeMillis(); + @Cleanup ServiceUnitStateTableView metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl(); metadataStoreTableView.start( pulsar, - this::dummy, - this::dummy, - this::dummy + NOOP_CONSUMER, + NOOP_CONSUMER, + NOOP_CONSUMER ); @Cleanup ServiceUnitStateTableView systemTopicTableView = new ServiceUnitStateTableViewImpl(); systemTopicTableView.start( pulsar, - this::dummy, - this::dummy, - this::dummy + NOOP_CONSUMER, + NOOP_CONSUMER, + NOOP_CONSUMER ); var syncer = pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer(); + ServiceUnitStateTableView src; + ServiceUnitStateTableView dst; if (syncer == SystemTopicToMetadataStoreSyncer) { clean(metadataStoreTableView); syncExistingItemsToMetadataStore(systemTopicTableView); + src = systemTopicTableView; + dst = metadataStoreTableView; } else { clean(systemTopicTableView); syncExistingItemsToSystemTopic(metadataStoreTableView, systemTopicTableView); + src = metadataStoreTableView; + dst = systemTopicTableView; } - if (!waitUntilSynced(metadataStoreTableView, systemTopicTableView, started)) { + if (!waitUntilSynced(src, dst, started)) { throw new TimeoutException( syncer + " failed to sync existing items in tableviews. MetadataStoreTableView.size: " + metadataStoreTableView.entrySet().size() + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in " - + SYNC_WAIT_TIME_IN_SECS + " secs"); + + syncWaitTimeInSecs + " secs"); } log.info("Synced existing items MetadataStoreTableView.size:{} , " @@ -154,8 +189,8 @@ public class ServiceUnitStateTableViewSyncer implements Closeable { this.metadataStoreTableView.start( pulsar, this::syncToSystemTopic, - this::dummy, - this::dummy + NOOP_CONSUMER, + NOOP_CONSUMER ); log.info("Started MetadataStoreTableView"); @@ -163,18 +198,20 @@ public class ServiceUnitStateTableViewSyncer implements Closeable { this.systemTopicTableView.start( pulsar, this::syncToMetadataStore, - this::dummy, - this::dummy + NOOP_CONSUMER, + NOOP_CONSUMER ); log.info("Started SystemTopicTableView"); var syncer = pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer(); - if (!waitUntilSynced(metadataStoreTableView, systemTopicTableView, started)) { + var src = syncer == SystemTopicToMetadataStoreSyncer ? systemTopicTableView : metadataStoreTableView; + var dst = syncer == SystemTopicToMetadataStoreSyncer ? metadataStoreTableView : systemTopicTableView; + if (!waitUntilSynced(src, dst, started)) { throw new TimeoutException( syncer + " failed to sync tableviews. MetadataStoreTableView.size: " + metadataStoreTableView.entrySet().size() + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in " - + SYNC_WAIT_TIME_IN_SECS + " secs"); + + syncWaitTimeInSecs + " secs"); } @@ -187,62 +224,134 @@ public class ServiceUnitStateTableViewSyncer implements Closeable { private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView src) throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException { // Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out) - var store = pulsar.getLocalMetadataStore(); - var writer = ObjectMapperFactory.getMapper().writer(); - var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); List<CompletableFuture<Void>> futures = new ArrayList<>(); var srcIter = src.entrySet().iterator(); while (srcIter.hasNext()) { var e = srcIter.next(); - futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(), - writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null)); - if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !srcIter.hasNext()) { - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - } + futures.add(writeToMetadataStore(e.getKey(), e.getValue())); + maybeWaitCompletion(futures, !srcIter.hasNext()); + } + } + + private void maybeWaitCompletion(List<CompletableFuture<Void>> futures, boolean forceWait) + throws InterruptedException, ExecutionException, TimeoutException { + if (!futures.isEmpty() && (futures.size() == MAX_CONCURRENT_SYNC_COUNT || forceWait)) { + FutureUtil.waitForAll(futures) + .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + futures.clear(); } } + private @NonNull CompletableFuture<Void> writeToMetadataStore(String key, ServiceUnitStateData value) + throws JsonProcessingException { + return pulsar.getLocalMetadataStore().put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + key, + jsonWriter.writeValueAsBytes(value), Optional.empty()).thenApply(__ -> null); + } + private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src, ServiceUnitStateTableView dst) throws ExecutionException, InterruptedException, TimeoutException { - var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); List<CompletableFuture<Void>> futures = new ArrayList<>(); var srcIter = src.entrySet().iterator(); while (srcIter.hasNext()) { var e = srcIter.next(); futures.add(dst.put(e.getKey(), e.getValue())); - if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !srcIter.hasNext()) { - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - } + maybeWaitCompletion(futures, !srcIter.hasNext()); } } private void clean(ServiceUnitStateTableView dst) throws ExecutionException, InterruptedException, TimeoutException { - var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); var dstIter = dst.entrySet().iterator(); List<CompletableFuture<Void>> futures = new ArrayList<>(); while (dstIter.hasNext()) { var e = dstIter.next(); futures.add(dst.delete(e.getKey())); - if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !dstIter.hasNext()) { - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - } + maybeWaitCompletion(futures, !dstIter.hasNext()); } } - private boolean waitUntilSynced(ServiceUnitStateTableView srt, ServiceUnitStateTableView dst, long started) + private boolean waitUntilSynced(ServiceUnitStateTableView src, ServiceUnitStateTableView dst, long started) throws InterruptedException { - while (srt.entrySet().size() != dst.entrySet().size()) { - if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) - > SYNC_WAIT_TIME_IN_SECS) { + long lastReconciled = started; + while (src.entrySet().size() != dst.entrySet().size()) { + long now = System.currentTimeMillis(); + if (TimeUnit.MILLISECONDS.toSeconds(now - started) > syncWaitTimeInSecs) { return false; } + // Give in-flight syncs a grace period to settle on their own, then reconcile + // periodically: updates that raced with the table views' (re)start were replayed + // to the fresh views as existing items — which are deliberately not wired to + // sync — so without reconciliation the views would never converge. + if (now - lastReconciled >= RECONCILE_INTERVAL_IN_MILLIS) { + if (log.isDebugEnabled()) { + log.debug("Tableviews not synced yet; reconciling; srcSize={} dstSize={} elapsedSecs={}", + src.entrySet().size(), dst.entrySet().size(), + TimeUnit.MILLISECONDS.toSeconds(now - started)); + } + reconcile(src, dst, started); + lastReconciled = now; + } Thread.sleep(100); } return true; } + /** + * Copies items the destination table view is missing and removes stale items that no longer + * exist in the source. Channel updates that land between the existing-items copy and the + * registration of the tail listeners are only visible as existing items of the freshly + * started views, so the tail listeners never see them. Writes flow to the migration source + * while the syncer starts, making the source view authoritative; destination-only items are + * removed only when they predate this sync phase and are still absent from the source, so a + * concurrent fresh write to the destination is never discarded. Failures are logged and left + * for the next reconcile pass. Runs on the caller's (load manager) thread with each batch + * bounded by the metadata store operation timeout. + */ + private void reconcile(ServiceUnitStateTableView src, ServiceUnitStateTableView dst, long started) + throws InterruptedException { + // Snapshot the destination entries before iterating the source so that a key arriving + // in the destination through a concurrent tail sync cannot be misclassified as stale. + var staleDstItems = new HashMap<String, ServiceUnitStateData>(); + for (var e : dst.entrySet()) { + staleDstItems.put(e.getKey(), e.getValue()); + } + try { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (var e : src.entrySet()) { + if (staleDstItems.remove(e.getKey()) == null) { + log.info("Reconciling item missing from the destination tableview; serviceUnit={}", + e.getKey()); + if (dst.isMetadataStoreBased()) { + // Write directly to the store like syncExistingItemsToMetadataStore + // does; the view's put() would conflict the item out. + futures.add(writeToMetadataStore(e.getKey(), e.getValue())); + } else { + futures.add(dst.put(e.getKey(), e.getValue())); + } + maybeWaitCompletion(futures, false); + } + } + for (var e : staleDstItems.entrySet()) { + // Only remove items written before this sync phase began and re-confirmed absent + // from the source: a fresh destination write (e.g. from a broker already switched + // to the destination implementation) is propagated to the source by the tail + // listener instead of being deleted here. + if (e.getValue().timestamp() < started && src.get(e.getKey()) == null) { + log.info("Reconciling stale item in the destination tableview; serviceUnit={}", + e.getKey()); + futures.add(dst.delete(e.getKey())); + maybeWaitCompletion(futures, false); + } + } + maybeWaitCompletion(futures, true); + } catch (IOException | ExecutionException | TimeoutException e) { + // Transient write failures leave a size divergence behind; the next reconcile pass + // (or the sync-wait timeout) handles it. + log.warn("Failed to reconcile tableview items", e); + } + } + @Override public void close() throws IOException { if (!isActive) { @@ -282,4 +391,9 @@ public class ServiceUnitStateTableViewSyncer implements Closeable { public boolean isActive() { return isActive; } + + @VisibleForTesting + public void setSyncWaitTimeInSecs(int syncWaitTimeInSecs) { + this.syncWaitTimeInSecs = syncWaitTimeInSecs; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index 55e9b8d6baf..2b30723f0a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -196,7 +197,20 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ @BeforeMethod(alwaysRun = true) protected void initializeState() throws PulsarAdminException, IllegalAccessException { - admin.namespaces().unload(defaultTestNamespace); + // After a prior test churned leader election, the channel-topic bundle can be left + // unserved ("not served by this instance"), making the unload's channel publish fail + // (HTTP 500) or hang server-side until the background monitor task (120s interval) + // reconciles the brokers' roles with the channel ownership. Drive monitor() eagerly to + // heal that state, bound each unload attempt (a synchronous unload() can block longer + // than the whole retry window), and fail loudly on exhaustion. + Awaitility.await().atMost(120, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); + admin.namespaces().unloadAsync(defaultTestNamespace).get(15, TimeUnit.SECONDS); + }); reset(primaryLoadManager, secondaryLoadManager); FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 65d017499fd..05e9bfba6ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1331,7 +1331,11 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase } } - @Test(priority = 200) + // Cap below the 300s suite default (AnnotationListener) so a hung ServiceUnitStateTableViewSyncer + // start() fails fast and is retried instead of consuming the full 300s slot and corrupting the + // next @BeforeMethod. Combined with the shortened sync-wait budget set below, a real divergence + // surfaces within the shortened budget instead of a 5-minute ThreadTimeoutException. + @Test(priority = 200, timeOut = 240 * 1000) public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { // Make pulsar1 the leader so primaryLoadManager is the syncer-running broker. makePrimaryAsLeader(); @@ -1361,120 +1365,142 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase String syncerType = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; + // Shrink the sync-wait budget on both brokers' live syncers BEFORE the first start() + // (driven by monitor() below) so any tableview-size divergence fails in ~30s with the + // syncer's own TimeoutException instead of spinning for the full 300s default — the + // exact hang observed in CI happened inside that first start(). + primaryLoadManager.getServiceUnitStateTableViewSyncer().setSyncWaitTimeInSecs(30); + secondaryLoadManager.getServiceUnitStateTableViewSyncer().setSyncWaitTimeInSecs(30); + pulsar.getAdminClient().brokers() .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerType); Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - primaryLoadManager.monitor(); - Awaitility.await().atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer() - .isActive())); - assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); - - // === Phase 2: add a 3rd broker using the OTHER table view impl === - // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 deliberately - // uses the other one so the test exercises cross-impl lookups regardless of - // which parametrization we're running. - String otherClassName = - serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) - ? ServiceUnitStateMetadataStoreTableViewImpl.class.getName() - : ServiceUnitStateTableViewImpl.class.getName(); - - ServiceConfiguration crossImplConf = getDefaultConf(); - crossImplConf.setAllowAutoTopicCreation(true); - crossImplConf.setForceDeleteNamespaceAllowed(true); - crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); - crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName); - - try (var crossImplCtx = createAdditionalPulsarTestContext(crossImplConf)) { - var pulsar3 = crossImplCtx.getPulsarService(); - - // All three brokers (across both impls) must agree on topic ownership. - assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); - assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); - Optional<URL> webUrlPulsar3 = pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webUrlPulsar3.isPresent()); - assertEquals(webUrlPulsar3.get().toString(), webUrlBefore.get().toString()); - - // SLA monitor and heartbeat lookups must agree across impls in every direction. - List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3); - for (PulsarService viewer : brokers) { - for (PulsarService owner : brokers) { - assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); - assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); + // Drive monitor() inside the await so a transient start() failure (swallowed by + // monitor()'s catch) is retried instead of waiting for the 120s background monitor task. + Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> { + primaryLoadManager.monitor(); + assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + }); + + try { + assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + + // === Phase 2: add a 3rd broker using the OTHER table view impl === + // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 deliberately + // uses the other one so the test exercises cross-impl lookups regardless of + // which parametrization we're running. + String otherClassName = + serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) + ? ServiceUnitStateMetadataStoreTableViewImpl.class.getName() + : ServiceUnitStateTableViewImpl.class.getName(); + + ServiceConfiguration crossImplConf = getDefaultConf(); + crossImplConf.setAllowAutoTopicCreation(true); + crossImplConf.setForceDeleteNamespaceAllowed(true); + crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); + crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName); + + try (var crossImplCtx = createAdditionalPulsarTestContext(crossImplConf)) { + var pulsar3 = crossImplCtx.getPulsarService(); + + // All three brokers (across both impls) must agree on topic ownership. + assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); + assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); + Optional<URL> webUrlPulsar3 = pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webUrlPulsar3.isPresent()); + assertEquals(webUrlPulsar3.get().toString(), webUrlBefore.get().toString()); + + // SLA monitor and heartbeat lookups must agree across impls in every direction. + List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3); + for (PulsarService viewer : brokers) { + for (PulsarService owner : brokers) { + assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); + assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); + } } - } - // === Phase 3: simulate the cross-impl broker going offline === - // Its SLA namespace must reassign to a remaining broker, and the ownership - // change must propagate through the syncer to brokers using the other impl. - var wrapper3 = (ExtensibleLoadManagerWrapper) pulsar3.getLoadManager().get(); - var loadManager3 = (ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper3, "loadManager", true); - ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel) - FieldUtils.readField(loadManager3, "serviceUnitStateChannel", true); - channel3.cleanOwnerships(); - // Set state to Closed BEFORE deleting the ZK node to prevent the notification - // handler's session-expiry recovery from auto-re-registering broker3. In - // production the PulsarService shuts down after unregister(), so the handler - // never fires; in tests the service stays running and creates a race. - var registry3 = (BrokerRegistryImpl) loadManager3.getBrokerRegistry(); - registry3.state.set(BrokerRegistryImpl.State.Closed); - pulsar3.getLocalMetadataStore() - .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), Optional.empty()).get(); - - String slaMonitorTopic = getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration()) - .getPersistentTopicName("test"); - String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { - String reassigned = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(reassigned); - assertNotEquals(reassigned, pulsar3BrokerUrl); - }); + // === Phase 3: simulate the cross-impl broker going offline === + // Its SLA namespace must reassign to a remaining broker, and the ownership + // change must propagate through the syncer to brokers using the other impl. + var wrapper3 = (ExtensibleLoadManagerWrapper) pulsar3.getLoadManager().get(); + var loadManager3 = (ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper3, "loadManager", true); + ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel) + FieldUtils.readField(loadManager3, "serviceUnitStateChannel", true); + channel3.cleanOwnerships(); + // Set state to Closed BEFORE deleting the ZK node to prevent the notification + // handler's session-expiry recovery from auto-re-registering broker3. In + // production the PulsarService shuts down after unregister(), so the handler + // never fires; in tests the service stays running and creates a race. + var registry3 = (BrokerRegistryImpl) loadManager3.getBrokerRegistry(); + registry3.state.set(BrokerRegistryImpl.State.Closed); + pulsar3.getLocalMetadataStore() + .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), Optional.empty()).get(); + + String slaMonitorTopic = getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration()) + .getPersistentTopicName("test"); + String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String reassigned = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(reassigned); + assertNotEquals(reassigned, pulsar3BrokerUrl); + }); - // Send a message while the topic is owned by the reassigned broker; this must - // remain durable when ownership migrates back below. - @Cleanup - Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); - producer.send("offline"); - - // === Phase 4: re-register the broker and verify ownership returns === - registry3.state.set(BrokerRegistryImpl.State.Started); - registry3.registerAsync().get(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> - assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic), - pulsar3.getBrokerServiceUrl())); - - // Same producer reconnects to the new owner; a fresh producer also works. - producer.send("after-reconnect"); - @Cleanup - Producer<String> producer2 = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); - producer2.send("from-new-producer"); + // Send a message while the topic is owned by the reassigned broker; this must + // remain durable when ownership migrates back below. + @Cleanup + Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) + .topic(slaMonitorTopic).create(); + producer.send("offline"); - @Cleanup - Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) - .topic(slaMonitorTopic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionName("test") - .subscribe(); - assertEquals(consumer.receive().getValue(), "offline"); - assertEquals(consumer.receive().getValue(), "after-reconnect"); - assertEquals(consumer.receive().getValue(), "from-new-producer"); - } + // === Phase 4: re-register the broker and verify ownership returns === + registry3.state.set(BrokerRegistryImpl.State.Started); + registry3.registerAsync().get(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> + assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic), + pulsar3.getBrokerServiceUrl())); - // === Phase 5: disable the syncer and verify it deactivates === - pulsar.getAdminClient().brokers() - .deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer"); - Awaitility.await().untilAsserted(() -> - assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - primaryLoadManager.monitor(); - Awaitility.await().atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer() - .isActive())); - assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + // Same producer reconnects to the new owner; a fresh producer also works. + producer.send("after-reconnect"); + @Cleanup + Producer<String> producer2 = pulsar.getClient().newProducer(Schema.STRING) + .topic(slaMonitorTopic).create(); + producer2.send("from-new-producer"); + + @Cleanup + Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + assertEquals(consumer.receive().getValue(), "offline"); + assertEquals(consumer.receive().getValue(), "after-reconnect"); + assertEquals(consumer.receive().getValue(), "from-new-producer"); + } + } finally { + // === Phase 5: disable the syncer and verify it deactivates === + // Guarantee the dynamic config is removed and the syncer is driven inactive even if + // the body threw, so the syncer cannot stay enabled and poison later tests. Note this + // cannot tear down a start() that failed before isActive=true (close() short-circuits + // on !isActive); leftover tail views from such a partial start are recovered by the + // next successful start(), and the next test's initializeState() retry absorbs any + // residual channel disruption. + try { + pulsar.getAdminClient().brokers() + .deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer"); + } catch (Exception e) { + log.warn("Failed to delete syncer dynamic config in cleanup", e); + } + Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); + assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + }); + } } private void assertLookupHeartbeatOwner(PulsarService pulsar, @@ -1541,7 +1567,52 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase }); } - @Test(timeOut = 30 * 1000, priority = 2100) + // After a test churns leader election, the channel-topic bundle can be transiently + // unowned and the channel producer can be in reconnect backoff. The next @BeforeMethod + // (initializeState -> namespaces().unload(...)) publishes a state change on the channel + // topic; if it runs in that window the producer send times out (HTTP 500). Give the + // re-election a best-effort chance to settle before yielding to the next test. + // + // This is a best-effort smoothing wait, not an assertion: the budget is deliberately a + // fraction (20s) of the callers' 60s method timeout so it cannot consume the whole slot + // and trip TestNG's ThreadTimeoutException mid-poll, and any failure to settle + // is swallowed-and-logged rather than thrown. That matters because callers invoke this from + // a finally block — a settling delay here must never replace (mask) the body's exception. + // The next test's initializeState() carries a 60s ignoreExceptions retry as the real backstop. + private void awaitChannelOwnerStable() { + try { + Awaitility.await().atMost(20, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + // monitor() reconciles each broker's role with the channel ownership and + // re-serves the channel-topic bundle if the leadership churn left it unserved + // ("not served by this instance") — the same self-healing the 120s background + // monitor task provides, driven eagerly so the next test does not start inside + // the broken window. + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); + Optional<String> owner1 = channel1.getChannelOwnerAsync().get(5, TimeUnit.SECONDS); + Optional<String> owner2 = channel2.getChannelOwnerAsync().get(5, TimeUnit.SECONDS); + assertTrue(owner1.isPresent()); + assertEquals(owner1, owner2); + assertTrue(channel1.isChannelOwner() ^ channel2.isChannelOwner()); + // Probe that the channel topic is actually served: the lookup re-assigns the + // pulsar/system bundle if it is unowned, and getStats proves the owner loads + // the topic (the lookup layer alone can claim an owner that refuses to serve). + String channelTopic = ServiceUnitStateTableViewImpl.TOPIC; + assertNotNull(pulsar.getAdminClient().lookups().lookupTopic(channelTopic)); + if (serviceUnitStateTableViewClassName.equals( + ServiceUnitStateTableViewImpl.class.getName())) { + assertNotNull(pulsar.getAdminClient().topics().getStats(channelTopic)); + } + }); + } catch (Throwable t) { + log.warn("Channel owner did not stabilize within the best-effort window; " + + "relying on the next test's initializeState() retry", t); + } + } + + // 60s: the body's repeated role transitions plus the trailing awaitChannelOwnerStable() can + // exceed 30s under load. + @Test(timeOut = 60 * 1000, priority = 2100) public void testRoleChangeIdempotency() throws Exception { makePrimaryAsLeader(); @@ -1621,7 +1692,8 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase assertEquals(ExtensibleLoadManagerImpl.Role.Follower, secondaryLoadManager.getRole()); - + // Confirm a stable channel owner before yielding to the next test's @BeforeMethod. + awaitChannelOwnerStable(); } @DataProvider(name = "noChannelOwnerMonitorHandler") @@ -1629,7 +1701,9 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase return new Object[][] { { true }, { false } }; } - @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101) + // 60s: the body's leader-election churn plus the trailing awaitChannelOwnerStable() can + // exceed 30s under load. + @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 60 * 1000, priority = 2101) public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler) throws Exception { makePrimaryAsLeader(); @@ -1696,10 +1770,25 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase // clean up for monitor test pulsar1.getLeaderElectionService().start(); pulsar2.getLeaderElectionService().start(); + // If the body failed mid-churn, both restarted elections can keep flapping the + // leadership between the brokers, leaving the channel-topic bundle unserved + // beyond what the next test's initializeState() retry can absorb. Force a + // deterministic single leader before stabilizing (best-effort: must not mask + // the body's exception). + try { + makePrimaryAsLeader(); + } catch (Throwable t) { + log.warn("Failed to re-establish primary as leader in cleanup", t); + } + // Re-establish a stable channel owner before yielding to the next test's + // @BeforeMethod, which publishes to the channel topic via namespace unload. + awaitChannelOwnerStable(); } } - @Test(timeOut = 30 * 1000, priority = 2000) + // 60s: the body's role transitions plus the trailing awaitChannelOwnerStable() can exceed + // 30s under load (observed locally at 30.017s). + @Test(timeOut = 60 * 1000, priority = 2000) public void testRoleChange() throws Exception { makePrimaryAsLeader(); @@ -1719,6 +1808,11 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase new NamespaceBundleStats())); Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + // The internal topics live in the pulsar/system bundle; if the leadership churn + // left it unserved, only a monitor() role reconciliation re-serves it (the + // background monitor task would take up to 120s) — drive it while waiting. + leader.monitor(); + follower.monitor(); assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), "tableView", true)); @@ -1756,6 +1850,9 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 1; Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + // Same monitor()-driven healing as above for the post-transfer assertions. + leader2.monitor(); + follower2.monitor(); assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(), "tableView", true)); assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), "tableView", true)); @@ -1780,6 +1877,9 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase follower2.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected).get(3, TimeUnit.SECONDS); follower2.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected) .get(3, TimeUnit.SECONDS); + + // Confirm a stable channel owner before yielding to the next test's @BeforeMethod. + awaitChannelOwnerStable(); } @Test(priority = Integer.MIN_VALUE)
