This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cf0aa763c21f3b38bc008552c6773f184f9fdb2
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jun 7 23:10:15 2026 +0300

    [fix][broker] Fix tableview divergence in ServiceUnitStateTableViewSyncer 
causing flaky tests (#25946)
---
 .../channel/ServiceUnitStateTableViewSyncer.java   | 188 +++++++++---
 .../ExtensibleLoadManagerImplBaseTest.java         |  16 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 320 ++++++++++++++-------
 3 files changed, 376 insertions(+), 148 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
index 45ff0dcb267..999c7bea93a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
@@ -23,20 +23,26 @@ import static 
org.apache.pulsar.broker.ServiceConfiguration.ServiceUnitTableView
 import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.COMPACTION_THRESHOLD;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.configureSystemTopics;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.jspecify.annotations.NonNull;
 
 /**
  * Defines ServiceUnitTableViewSyncer.
@@ -47,10 +53,15 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 public class ServiceUnitStateTableViewSyncer implements Closeable {
     private static final int MAX_CONCURRENT_SYNC_COUNT = 100;
     private static final int SYNC_WAIT_TIME_IN_SECS = 300;
+    private static final long RECONCILE_INTERVAL_IN_MILLIS = 5_000;
+    private static final BiConsumer<String, ServiceUnitStateData> 
NOOP_CONSUMER = (__, ___) -> {
+    };
+    private volatile int syncWaitTimeInSecs = SYNC_WAIT_TIME_IN_SECS;
     private PulsarService pulsar;
     private volatile ServiceUnitStateTableView systemTopicTableView;
     private volatile ServiceUnitStateTableView metadataStoreTableView;
     private volatile boolean isActive = false;
+    private final ObjectWriter jsonWriter = 
ObjectMapperFactory.getMapper().writer();
 
 
     public void start(PulsarService pulsar)
@@ -82,53 +93,77 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
     }
 
     private CompletableFuture<Void> syncToSystemTopic(String key, 
ServiceUnitStateData data) {
-        return systemTopicTableView.put(key, data);
+        return logIfFailed(sync(systemTopicTableView, key, data), key, data, 
"systemTopic");
     }
 
     private CompletableFuture<Void> syncToMetadataStore(String key, 
ServiceUnitStateData data) {
-        return metadataStoreTableView.put(key, data);
+        return logIfFailed(sync(metadataStoreTableView, key, data), key, data, 
"metadataStore");
     }
 
-    private void dummy(String key, ServiceUnitStateData data) {
+    private CompletableFuture<Void> sync(ServiceUnitStateTableView dst, String 
key, ServiceUnitStateData data) {
+        // A null tail item is a tombstone: the source view removed the key. 
Route it to
+        // delete() rather than put(): the metadata-store view's put() rejects 
null
+        // (@NonNull) and the system-topic view's delete() is itself a 
null-valued put(),
+        // so a uniform delete keeps both sync directions symmetric and 
prevents a missed
+        // deletion from leaving the two views with different sizes (which 
would make
+        // waitUntilSynced spin until the timeout budget).
+        return data == null ? dst.delete(key) : dst.put(key, data);
+    }
+
+    private CompletableFuture<Void> logIfFailed(CompletableFuture<Void> 
future, String key,
+                                                ServiceUnitStateData data, 
String dst) {
+        return future.whenComplete((__, e) -> {
+            if (e != null && !(e instanceof 
PulsarClientException.AlreadyClosedException)) {
+                log.warn("Failed to sync tableview item; sizes may diverge 
until the next update;"
+                        + " dst={} serviceUnit={} data={}", dst, key, data, e);
+            }
+        });
     }
 
     private void syncExistingItems()
             throws IOException, ExecutionException, InterruptedException, 
TimeoutException {
         long started = System.currentTimeMillis();
+
         @Cleanup
         ServiceUnitStateTableView metadataStoreTableView = new 
ServiceUnitStateMetadataStoreTableViewImpl();
         metadataStoreTableView.start(
                 pulsar,
-                this::dummy,
-                this::dummy,
-                this::dummy
+                NOOP_CONSUMER,
+                NOOP_CONSUMER,
+                NOOP_CONSUMER
         );
 
         @Cleanup
         ServiceUnitStateTableView systemTopicTableView = new 
ServiceUnitStateTableViewImpl();
         systemTopicTableView.start(
                 pulsar,
-                this::dummy,
-                this::dummy,
-                this::dummy
+                NOOP_CONSUMER,
+                NOOP_CONSUMER,
+                NOOP_CONSUMER
         );
 
 
         var syncer = 
pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
+        ServiceUnitStateTableView src;
+        ServiceUnitStateTableView dst;
         if (syncer == SystemTopicToMetadataStoreSyncer) {
             clean(metadataStoreTableView);
             syncExistingItemsToMetadataStore(systemTopicTableView);
+            src = systemTopicTableView;
+            dst = metadataStoreTableView;
         } else {
             clean(systemTopicTableView);
             syncExistingItemsToSystemTopic(metadataStoreTableView, 
systemTopicTableView);
+            src = metadataStoreTableView;
+            dst = systemTopicTableView;
         }
 
-        if (!waitUntilSynced(metadataStoreTableView, systemTopicTableView, 
started)) {
+        if (!waitUntilSynced(src, dst, started)) {
             throw new TimeoutException(
                     syncer + " failed to sync existing items in tableviews. 
MetadataStoreTableView.size: "
                             + metadataStoreTableView.entrySet().size()
                             + ", SystemTopicTableView.size: " + 
systemTopicTableView.entrySet().size() + " in "
-                            + SYNC_WAIT_TIME_IN_SECS + " secs");
+                            + syncWaitTimeInSecs + " secs");
         }
 
         log.info("Synced existing items MetadataStoreTableView.size:{} , "
@@ -154,8 +189,8 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
         this.metadataStoreTableView.start(
                 pulsar,
                 this::syncToSystemTopic,
-                this::dummy,
-                this::dummy
+                NOOP_CONSUMER,
+                NOOP_CONSUMER
         );
         log.info("Started MetadataStoreTableView");
 
@@ -163,18 +198,20 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
         this.systemTopicTableView.start(
                 pulsar,
                 this::syncToMetadataStore,
-                this::dummy,
-                this::dummy
+                NOOP_CONSUMER,
+                NOOP_CONSUMER
         );
         log.info("Started SystemTopicTableView");
 
         var syncer = 
pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
-        if (!waitUntilSynced(metadataStoreTableView, systemTopicTableView, 
started)) {
+        var src = syncer == SystemTopicToMetadataStoreSyncer ? 
systemTopicTableView : metadataStoreTableView;
+        var dst = syncer == SystemTopicToMetadataStoreSyncer ? 
metadataStoreTableView : systemTopicTableView;
+        if (!waitUntilSynced(src, dst, started)) {
             throw new TimeoutException(
                     syncer + " failed to sync tableviews. 
MetadataStoreTableView.size: "
                             + metadataStoreTableView.entrySet().size()
                             + ", SystemTopicTableView.size: " + 
systemTopicTableView.entrySet().size() + " in "
-                            + SYNC_WAIT_TIME_IN_SECS + " secs");
+                            + syncWaitTimeInSecs + " secs");
         }
 
 
@@ -187,62 +224,134 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
     private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView 
src)
             throws JsonProcessingException, ExecutionException, 
InterruptedException, TimeoutException {
         // Directly use store to sync existing items to 
metadataStoreTableView(otherwise, they are conflicted out)
-        var store = pulsar.getLocalMetadataStore();
-        var writer = ObjectMapperFactory.getMapper().writer();
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         var srcIter = src.entrySet().iterator();
         while (srcIter.hasNext()) {
             var e = srcIter.next();
-            
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + 
"/" + e.getKey(),
-                    writer.writeValueAsBytes(e.getValue()), 
Optional.empty()).thenApply(__ -> null));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!srcIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            futures.add(writeToMetadataStore(e.getKey(), e.getValue()));
+            maybeWaitCompletion(futures, !srcIter.hasNext());
+        }
+    }
+
+    private void maybeWaitCompletion(List<CompletableFuture<Void>> futures, 
boolean forceWait)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        if (!futures.isEmpty() && (futures.size() == MAX_CONCURRENT_SYNC_COUNT 
|| forceWait)) {
+            FutureUtil.waitForAll(futures)
+                    
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            futures.clear();
         }
     }
 
+    private @NonNull CompletableFuture<Void> writeToMetadataStore(String key, 
ServiceUnitStateData value)
+            throws JsonProcessingException {
+        return 
pulsar.getLocalMetadataStore().put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX
 + "/" + key,
+                jsonWriter.writeValueAsBytes(value), 
Optional.empty()).thenApply(__ -> null);
+    }
+
     private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src,
                                                 ServiceUnitStateTableView dst)
             throws ExecutionException, InterruptedException, TimeoutException {
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         var srcIter = src.entrySet().iterator();
         while (srcIter.hasNext()) {
             var e = srcIter.next();
             futures.add(dst.put(e.getKey(), e.getValue()));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!srcIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            maybeWaitCompletion(futures, !srcIter.hasNext());
         }
     }
 
     private void clean(ServiceUnitStateTableView dst)
             throws ExecutionException, InterruptedException, TimeoutException {
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         var dstIter = dst.entrySet().iterator();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         while (dstIter.hasNext()) {
             var e = dstIter.next();
             futures.add(dst.delete(e.getKey()));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!dstIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            maybeWaitCompletion(futures, !dstIter.hasNext());
         }
     }
 
-    private boolean waitUntilSynced(ServiceUnitStateTableView srt, 
ServiceUnitStateTableView dst, long started)
+    private boolean waitUntilSynced(ServiceUnitStateTableView src, 
ServiceUnitStateTableView dst, long started)
             throws InterruptedException {
-        while (srt.entrySet().size() != dst.entrySet().size()) {
-            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
started)
-                    > SYNC_WAIT_TIME_IN_SECS) {
+        long lastReconciled = started;
+        while (src.entrySet().size() != dst.entrySet().size()) {
+            long now = System.currentTimeMillis();
+            if (TimeUnit.MILLISECONDS.toSeconds(now - started) > 
syncWaitTimeInSecs) {
                 return false;
             }
+            // Give in-flight syncs a grace period to settle on their own, 
then reconcile
+            // periodically: updates that raced with the table views' 
(re)start were replayed
+            // to the fresh views as existing items — which are deliberately 
not wired to
+            // sync — so without reconciliation the views would never converge.
+            if (now - lastReconciled >= RECONCILE_INTERVAL_IN_MILLIS) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Tableviews not synced yet; reconciling; 
srcSize={} dstSize={} elapsedSecs={}",
+                            src.entrySet().size(), dst.entrySet().size(),
+                            TimeUnit.MILLISECONDS.toSeconds(now - started));
+                }
+                reconcile(src, dst, started);
+                lastReconciled = now;
+            }
             Thread.sleep(100);
         }
         return true;
     }
 
+    /**
+     * Copies items the destination table view is missing and removes stale 
items that no longer
+     * exist in the source. Channel updates that land between the 
existing-items copy and the
+     * registration of the tail listeners are only visible as existing items 
of the freshly
+     * started views, so the tail listeners never see them. Writes flow to the 
migration source
+     * while the syncer starts, making the source view authoritative; 
destination-only items are
+     * removed only when they predate this sync phase and are still absent 
from the source, so a
+     * concurrent fresh write to the destination is never discarded. Failures 
are logged and left
+     * for the next reconcile pass. Runs on the caller's (load manager) thread 
with each batch
+     * bounded by the metadata store operation timeout.
+     */
+    private void reconcile(ServiceUnitStateTableView src, 
ServiceUnitStateTableView dst, long started)
+            throws InterruptedException {
+        // Snapshot the destination entries before iterating the source so 
that a key arriving
+        // in the destination through a concurrent tail sync cannot be 
misclassified as stale.
+        var staleDstItems = new HashMap<String, ServiceUnitStateData>();
+        for (var e : dst.entrySet()) {
+            staleDstItems.put(e.getKey(), e.getValue());
+        }
+        try {
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (var e : src.entrySet()) {
+                if (staleDstItems.remove(e.getKey()) == null) {
+                    log.info("Reconciling item missing from the destination 
tableview; serviceUnit={}",
+                            e.getKey());
+                    if (dst.isMetadataStoreBased()) {
+                        // Write directly to the store like 
syncExistingItemsToMetadataStore
+                        // does; the view's put() would conflict the item out.
+                        futures.add(writeToMetadataStore(e.getKey(), 
e.getValue()));
+                    } else {
+                        futures.add(dst.put(e.getKey(), e.getValue()));
+                    }
+                    maybeWaitCompletion(futures, false);
+                }
+            }
+            for (var e : staleDstItems.entrySet()) {
+                // Only remove items written before this sync phase began and 
re-confirmed absent
+                // from the source: a fresh destination write (e.g. from a 
broker already switched
+                // to the destination implementation) is propagated to the 
source by the tail
+                // listener instead of being deleted here.
+                if (e.getValue().timestamp() < started && src.get(e.getKey()) 
== null) {
+                    log.info("Reconciling stale item in the destination 
tableview; serviceUnit={}",
+                            e.getKey());
+                    futures.add(dst.delete(e.getKey()));
+                    maybeWaitCompletion(futures, false);
+                }
+            }
+            maybeWaitCompletion(futures, true);
+        } catch (IOException | ExecutionException | TimeoutException e) {
+            // Transient write failures leave a size divergence behind; the 
next reconcile pass
+            // (or the sync-wait timeout) handles it.
+            log.warn("Failed to reconcile tableview items", e);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (!isActive) {
@@ -282,4 +391,9 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
     public boolean isActive() {
         return isActive;
     }
+
+    @VisibleForTesting
+    public void setSyncWaitTimeInSecs(int syncWaitTimeInSecs) {
+        this.syncWaitTimeInSecs = syncWaitTimeInSecs;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 55e9b8d6baf..2b30723f0a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -196,7 +197,20 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
 
     @BeforeMethod(alwaysRun = true)
     protected void initializeState() throws PulsarAdminException, 
IllegalAccessException {
-        admin.namespaces().unload(defaultTestNamespace);
+        // After a prior test churned leader election, the channel-topic 
bundle can be left
+        // unserved ("not served by this instance"), making the unload's 
channel publish fail
+        // (HTTP 500) or hang server-side until the background monitor task 
(120s interval)
+        // reconciles the brokers' roles with the channel ownership. Drive 
monitor() eagerly to
+        // heal that state, bound each unload attempt (a synchronous unload() 
can block longer
+        // than the whole retry window), and fail loudly on exhaustion.
+        Awaitility.await().atMost(120, TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    primaryLoadManager.monitor();
+                    secondaryLoadManager.monitor();
+                    
admin.namespaces().unloadAsync(defaultTestNamespace).get(15, TimeUnit.SECONDS);
+                });
         reset(primaryLoadManager, secondaryLoadManager);
         FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
         pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 65d017499fd..05e9bfba6ef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1331,7 +1331,11 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         }
     }
 
-    @Test(priority = 200)
+    // Cap below the 300s suite default (AnnotationListener) so a hung 
ServiceUnitStateTableViewSyncer
+    // start() fails fast and is retried instead of consuming the full 300s 
slot and corrupting the
+    // next @BeforeMethod. Combined with the shortened sync-wait budget set 
below, a real divergence
+    // surfaces within the shortened budget instead of a 5-minute 
ThreadTimeoutException.
+    @Test(priority = 200, timeOut = 240 * 1000)
     public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
         // Make pulsar1 the leader so primaryLoadManager is the syncer-running 
broker.
         makePrimaryAsLeader();
@@ -1361,120 +1365,142 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         String syncerType =
                 
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
                         ? "SystemTopicToMetadataStoreSyncer" : 
"MetadataStoreToSystemTopicSyncer";
+        // Shrink the sync-wait budget on both brokers' live syncers BEFORE 
the first start()
+        // (driven by monitor() below) so any tableview-size divergence fails 
in ~30s with the
+        // syncer's own TimeoutException instead of spinning for the full 300s 
default — the
+        // exact hang observed in CI happened inside that first start().
+        
primaryLoadManager.getServiceUnitStateTableViewSyncer().setSyncWaitTimeInSecs(30);
+        
secondaryLoadManager.getServiceUnitStateTableViewSyncer().setSyncWaitTimeInSecs(30);
+
         pulsar.getAdminClient().brokers()
                 
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", 
syncerType);
         Awaitility.await().untilAsserted(() ->
                 
assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        primaryLoadManager.monitor();
-        Awaitility.await().atMost(30, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
-                        .isActive()));
-        
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
-
-        // === Phase 2: add a 3rd broker using the OTHER table view impl ===
-        // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 
deliberately
-        // uses the other one so the test exercises cross-impl lookups 
regardless of
-        // which parametrization we're running.
-        String otherClassName =
-                
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
-                        ? 
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
-                        : ServiceUnitStateTableViewImpl.class.getName();
-
-        ServiceConfiguration crossImplConf = getDefaultConf();
-        crossImplConf.setAllowAutoTopicCreation(true);
-        crossImplConf.setForceDeleteNamespaceAllowed(true);
-        
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
-        
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
-
-        try (var crossImplCtx = 
createAdditionalPulsarTestContext(crossImplConf)) {
-            var pulsar3 = crossImplCtx.getPulsarService();
-
-            // All three brokers (across both impls) must agree on topic 
ownership.
-            
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
-            
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
-            Optional<URL> webUrlPulsar3 = 
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
-            assertTrue(webUrlPulsar3.isPresent());
-            assertEquals(webUrlPulsar3.get().toString(), 
webUrlBefore.get().toString());
-
-            // SLA monitor and heartbeat lookups must agree across impls in 
every direction.
-            List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3);
-            for (PulsarService viewer : brokers) {
-                for (PulsarService owner : brokers) {
-                    assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), 
owner.getBrokerServiceUrl());
-                    assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), 
owner.getBrokerServiceUrl());
+        // Drive monitor() inside the await so a transient start() failure 
(swallowed by
+        // monitor()'s catch) is retried instead of waiting for the 120s 
background monitor task.
+        Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
+            primaryLoadManager.monitor();
+            
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+        });
+
+        try {
+            
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+
+            // === Phase 2: add a 3rd broker using the OTHER table view impl 
===
+            // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 
deliberately
+            // uses the other one so the test exercises cross-impl lookups 
regardless of
+            // which parametrization we're running.
+            String otherClassName =
+                    
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+                            ? 
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
+                            : ServiceUnitStateTableViewImpl.class.getName();
+
+            ServiceConfiguration crossImplConf = getDefaultConf();
+            crossImplConf.setAllowAutoTopicCreation(true);
+            crossImplConf.setForceDeleteNamespaceAllowed(true);
+            
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
+            
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+            
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
+
+            try (var crossImplCtx = 
createAdditionalPulsarTestContext(crossImplConf)) {
+                var pulsar3 = crossImplCtx.getPulsarService();
+
+                // All three brokers (across both impls) must agree on topic 
ownership.
+                
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
+                
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
+                Optional<URL> webUrlPulsar3 = 
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+                assertTrue(webUrlPulsar3.isPresent());
+                assertEquals(webUrlPulsar3.get().toString(), 
webUrlBefore.get().toString());
+
+                // SLA monitor and heartbeat lookups must agree across impls 
in every direction.
+                List<PulsarService> brokers = List.of(pulsar1, pulsar2, 
pulsar3);
+                for (PulsarService viewer : brokers) {
+                    for (PulsarService owner : brokers) {
+                        assertLookupHeartbeatOwner(viewer, 
owner.getBrokerId(), owner.getBrokerServiceUrl());
+                        assertLookupSLANamespaceOwner(viewer, 
owner.getBrokerId(), owner.getBrokerServiceUrl());
+                    }
                 }
-            }
 
-            // === Phase 3: simulate the cross-impl broker going offline ===
-            // Its SLA namespace must reassign to a remaining broker, and the 
ownership
-            // change must propagate through the syncer to brokers using the 
other impl.
-            var wrapper3 = (ExtensibleLoadManagerWrapper) 
pulsar3.getLoadManager().get();
-            var loadManager3 = (ExtensibleLoadManagerImpl)
-                    FieldUtils.readField(wrapper3, "loadManager", true);
-            ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
-                    FieldUtils.readField(loadManager3, 
"serviceUnitStateChannel", true);
-            channel3.cleanOwnerships();
-            // Set state to Closed BEFORE deleting the ZK node to prevent the 
notification
-            // handler's session-expiry recovery from auto-re-registering 
broker3. In
-            // production the PulsarService shuts down after unregister(), so 
the handler
-            // never fires; in tests the service stays running and creates a 
race.
-            var registry3 = (BrokerRegistryImpl) 
loadManager3.getBrokerRegistry();
-            registry3.state.set(BrokerRegistryImpl.State.Closed);
-            pulsar3.getLocalMetadataStore()
-                    .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), 
Optional.empty()).get();
-
-            String slaMonitorTopic = 
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
-                    .getPersistentTopicName("test");
-            String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
-            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
-                String reassigned = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                assertNotNull(reassigned);
-                assertNotEquals(reassigned, pulsar3BrokerUrl);
-            });
+                // === Phase 3: simulate the cross-impl broker going offline 
===
+                // Its SLA namespace must reassign to a remaining broker, and 
the ownership
+                // change must propagate through the syncer to brokers using 
the other impl.
+                var wrapper3 = (ExtensibleLoadManagerWrapper) 
pulsar3.getLoadManager().get();
+                var loadManager3 = (ExtensibleLoadManagerImpl)
+                        FieldUtils.readField(wrapper3, "loadManager", true);
+                ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
+                        FieldUtils.readField(loadManager3, 
"serviceUnitStateChannel", true);
+                channel3.cleanOwnerships();
+                // Set state to Closed BEFORE deleting the ZK node to prevent 
the notification
+                // handler's session-expiry recovery from auto-re-registering 
broker3. In
+                // production the PulsarService shuts down after unregister(), 
so the handler
+                // never fires; in tests the service stays running and creates 
a race.
+                var registry3 = (BrokerRegistryImpl) 
loadManager3.getBrokerRegistry();
+                registry3.state.set(BrokerRegistryImpl.State.Closed);
+                pulsar3.getLocalMetadataStore()
+                        .delete("/loadbalance/brokers/" + 
pulsar3.getBrokerId(), Optional.empty()).get();
+
+                String slaMonitorTopic = 
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
+                        .getPersistentTopicName("test");
+                String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String reassigned = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(reassigned);
+                    assertNotEquals(reassigned, pulsar3BrokerUrl);
+                });
 
-            // Send a message while the topic is owned by the reassigned 
broker; this must
-            // remain durable when ownership migrates back below.
-            @Cleanup
-            Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
-                    .topic(slaMonitorTopic).create();
-            producer.send("offline");
-
-            // === Phase 4: re-register the broker and verify ownership 
returns ===
-            registry3.state.set(BrokerRegistryImpl.State.Started);
-            registry3.registerAsync().get();
-            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
-                    
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
-                            pulsar3.getBrokerServiceUrl()));
-
-            // Same producer reconnects to the new owner; a fresh producer 
also works.
-            producer.send("after-reconnect");
-            @Cleanup
-            Producer<String> producer2 = 
pulsar.getClient().newProducer(Schema.STRING)
-                    .topic(slaMonitorTopic).create();
-            producer2.send("from-new-producer");
+                // Send a message while the topic is owned by the reassigned 
broker; this must
+                // remain durable when ownership migrates back below.
+                @Cleanup
+                Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
+                        .topic(slaMonitorTopic).create();
+                producer.send("offline");
 
-            @Cleanup
-            Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
-                    .topic(slaMonitorTopic)
-                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                    .subscriptionName("test")
-                    .subscribe();
-            assertEquals(consumer.receive().getValue(), "offline");
-            assertEquals(consumer.receive().getValue(), "after-reconnect");
-            assertEquals(consumer.receive().getValue(), "from-new-producer");
-        }
+                // === Phase 4: re-register the broker and verify ownership 
returns ===
+                registry3.state.set(BrokerRegistryImpl.State.Started);
+                registry3.registerAsync().get();
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
+                        
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
+                                pulsar3.getBrokerServiceUrl()));
 
-        // === Phase 5: disable the syncer and verify it deactivates ===
-        pulsar.getAdminClient().brokers()
-                
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
-        Awaitility.await().untilAsserted(() ->
-                
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        primaryLoadManager.monitor();
-        Awaitility.await().atMost(30, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
-                        .isActive()));
-        
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+                // Same producer reconnects to the new owner; a fresh producer 
also works.
+                producer.send("after-reconnect");
+                @Cleanup
+                Producer<String> producer2 = 
pulsar.getClient().newProducer(Schema.STRING)
+                        .topic(slaMonitorTopic).create();
+                producer2.send("from-new-producer");
+
+                @Cleanup
+                Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
+                        .topic(slaMonitorTopic)
+                        
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                        .subscriptionName("test")
+                        .subscribe();
+                assertEquals(consumer.receive().getValue(), "offline");
+                assertEquals(consumer.receive().getValue(), "after-reconnect");
+                assertEquals(consumer.receive().getValue(), 
"from-new-producer");
+            }
+        } finally {
+            // === Phase 5: disable the syncer and verify it deactivates ===
+            // Guarantee the dynamic config is removed and the syncer is 
driven inactive even if
+            // the body threw, so the syncer cannot stay enabled and poison 
later tests. Note this
+            // cannot tear down a start() that failed before isActive=true 
(close() short-circuits
+            // on !isActive); leftover tail views from such a partial start 
are recovered by the
+            // next successful start(), and the next test's initializeState() 
retry absorbs any
+            // residual channel disruption.
+            try {
+                pulsar.getAdminClient().brokers()
+                        
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
+            } catch (Exception e) {
+                log.warn("Failed to delete syncer dynamic config in cleanup", 
e);
+            }
+            Awaitility.await().atMost(60, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled());
+                primaryLoadManager.monitor();
+                secondaryLoadManager.monitor();
+                
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+                
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+            });
+        }
     }
 
     private void assertLookupHeartbeatOwner(PulsarService pulsar,
@@ -1541,7 +1567,52 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         });
     }
 
-    @Test(timeOut = 30 * 1000, priority = 2100)
+    // After a test churns leader election, the channel-topic bundle can be 
transiently
+    // unowned and the channel producer can be in reconnect backoff. The next 
@BeforeMethod
+    // (initializeState -> namespaces().unload(...)) publishes a state change 
on the channel
+    // topic; if it runs in that window the producer send times out (HTTP 
500). Give the
+    // re-election a best-effort chance to settle before yielding to the next 
test.
+    //
+    // This is a best-effort smoothing wait, not an assertion: the budget is 
deliberately a
+    // fraction (20s) of the callers' 60s method timeout so it cannot consume 
the whole slot
+    // and trip TestNG's ThreadTimeoutException mid-poll, and any failure to 
settle
+    // is swallowed-and-logged rather than thrown. That matters because 
callers invoke this from
+    // a finally block — a settling delay here must never replace (mask) the 
body's exception.
+    // The next test's initializeState() carries a 60s ignoreExceptions retry 
as the real backstop.
+    private void awaitChannelOwnerStable() {
+        try {
+            Awaitility.await().atMost(20, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                // monitor() reconciles each broker's role with the channel 
ownership and
+                // re-serves the channel-topic bundle if the leadership churn 
left it unserved
+                // ("not served by this instance") — the same self-healing the 
120s background
+                // monitor task provides, driven eagerly so the next test does 
not start inside
+                // the broken window.
+                primaryLoadManager.monitor();
+                secondaryLoadManager.monitor();
+                Optional<String> owner1 = 
channel1.getChannelOwnerAsync().get(5, TimeUnit.SECONDS);
+                Optional<String> owner2 = 
channel2.getChannelOwnerAsync().get(5, TimeUnit.SECONDS);
+                assertTrue(owner1.isPresent());
+                assertEquals(owner1, owner2);
+                assertTrue(channel1.isChannelOwner() ^ 
channel2.isChannelOwner());
+                // Probe that the channel topic is actually served: the lookup 
re-assigns the
+                // pulsar/system bundle if it is unowned, and getStats proves 
the owner loads
+                // the topic (the lookup layer alone can claim an owner that 
refuses to serve).
+                String channelTopic = ServiceUnitStateTableViewImpl.TOPIC;
+                
assertNotNull(pulsar.getAdminClient().lookups().lookupTopic(channelTopic));
+                if (serviceUnitStateTableViewClassName.equals(
+                        ServiceUnitStateTableViewImpl.class.getName())) {
+                    
assertNotNull(pulsar.getAdminClient().topics().getStats(channelTopic));
+                }
+            });
+        } catch (Throwable t) {
+            log.warn("Channel owner did not stabilize within the best-effort 
window; "
+                    + "relying on the next test's initializeState() retry", t);
+        }
+    }
+
+    // 60s: the body's repeated role transitions plus the trailing 
awaitChannelOwnerStable() can
+    // exceed 30s under load.
+    @Test(timeOut = 60 * 1000, priority = 2100)
     public void testRoleChangeIdempotency() throws Exception {
 
         makePrimaryAsLeader();
@@ -1621,7 +1692,8 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
                     secondaryLoadManager.getRole());
 
-
+            // Confirm a stable channel owner before yielding to the next 
test's @BeforeMethod.
+            awaitChannelOwnerStable();
     }
 
     @DataProvider(name = "noChannelOwnerMonitorHandler")
@@ -1629,7 +1701,9 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         return new Object[][] { { true }, { false } };
     }
 
-    @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, 
priority = 2101)
+    // 60s: the body's leader-election churn plus the trailing 
awaitChannelOwnerStable() can
+    // exceed 30s under load.
+    @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 60 * 1000, 
priority = 2101)
     public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler) 
throws Exception {
 
         makePrimaryAsLeader();
@@ -1696,10 +1770,25 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             // clean up for monitor test
             pulsar1.getLeaderElectionService().start();
             pulsar2.getLeaderElectionService().start();
+            // If the body failed mid-churn, both restarted elections can keep 
flapping the
+            // leadership between the brokers, leaving the channel-topic 
bundle unserved
+            // beyond what the next test's initializeState() retry can absorb. 
Force a
+            // deterministic single leader before stabilizing (best-effort: 
must not mask
+            // the body's exception).
+            try {
+                makePrimaryAsLeader();
+            } catch (Throwable t) {
+                log.warn("Failed to re-establish primary as leader in 
cleanup", t);
+            }
+            // Re-establish a stable channel owner before yielding to the next 
test's
+            // @BeforeMethod, which publishes to the channel topic via 
namespace unload.
+            awaitChannelOwnerStable();
         }
     }
 
-    @Test(timeOut = 30 * 1000, priority = 2000)
+    // 60s: the body's role transitions plus the trailing 
awaitChannelOwnerStable() can exceed
+    // 30s under load (observed locally at 30.017s).
+    @Test(timeOut = 60 * 1000, priority = 2000)
     public void testRoleChange() throws Exception {
         makePrimaryAsLeader();
 
@@ -1719,6 +1808,11 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 new NamespaceBundleStats()));
 
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            // The internal topics live in the pulsar/system bundle; if the 
leadership churn
+            // left it unserved, only a monitor() role reconciliation 
re-serves it (the
+            // background monitor task would take up to 120s) — drive it while 
waiting.
+            leader.monitor();
+            follower.monitor();
 
             
assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(),
                     "tableView", true));
@@ -1756,6 +1850,9 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 
1;
 
         Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+            // Same monitor()-driven healing as above for the post-transfer 
assertions.
+            leader2.monitor();
+            follower2.monitor();
             
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(),
 "tableView", true));
             
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), 
"tableView", true));
 
@@ -1780,6 +1877,9 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         follower2.getBrokerLoadDataStore().pushAsync(key, 
brokerLoadExpected).get(3, TimeUnit.SECONDS);
         follower2.getTopBundlesLoadDataStore().pushAsync(bundle, 
topBundlesExpected)
                 .get(3, TimeUnit.SECONDS);
+
+        // Confirm a stable channel owner before yielding to the next test's 
@BeforeMethod.
+        awaitChannelOwnerStable();
     }
 
     @Test(priority = Integer.MIN_VALUE)


Reply via email to