This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 65b4321cc6a [fix][test] Fix flaky
ExtensibleLoadManagerImplTest.initializeState by recovering wedged channel
ownership (#25977)
65b4321cc6a is described below
commit 65b4321cc6ac3cb31e2827e61605ec696c40ecb7
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 8 23:41:13 2026 +0300
[fix][test] Fix flaky ExtensibleLoadManagerImplTest.initializeState by
recovering wedged channel ownership (#25977)
---
.../ExtensibleLoadManagerImplBaseTest.java | 77 +++++++++++++++++-----
.../extensions/ExtensibleLoadManagerImplTest.java | 3 +-
2 files changed, 64 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 8e4fc7bf4dd..6c7204db278 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.extensions;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.io.Resources;
import java.util.ArrayList;
@@ -38,7 +39,6 @@ import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateM
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -197,18 +198,39 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
}
@BeforeMethod(alwaysRun = true)
- protected void initializeState() throws PulsarAdminException,
IllegalAccessException {
- // After a prior test churned leader election, the channel-topic
bundle can be left
- // unserved ("not served by this instance"), making the unload's
channel publish fail
- // (HTTP 500) or hang server-side. monitor() only self-heals when
there is *no* channel
- // owner; it does NOT heal the case where an owner is recorded but the
bundle is not
- // actually served, so the unload below can never publish. Force-serve
the channel topic
- // each attempt: an admin lookup re-assigns the pulsar/system bundle
and getStats makes
- // the owner load the topic (the lookup layer alone can claim an owner
that refuses to
- // serve). Bound each unload attempt and fail loudly on exhaustion.
+ protected void initializeState() throws Exception {
+ // Reset to a clean state before each test: reconcile each broker's
role with the channel
+ // ownership and unload the test namespace so no bundle ownership
carries over. The unload
+ // publishes a state change on the channel system topic.
+ //
+ // A prior role-churning test (e.g. the direct
playLeader()/playFollower() calls in
+ // testRoleChangeIdempotency) can leave the channel system topic owned
by a broker that no
+ // longer serves it ("not served by this instance, redo the lookup"),
with the channel
+ // producer stuck in escalating reconnect backoff, so the unload's
channel publish keeps
+ // failing. Each unload attempt force-serves the channel topic (an
admin lookup re-assigns
+ // the pulsar/system bundle and getStats makes the owner load it); if
that still does not
+ // recover, force a clean channel owner via leader re-election (which
reassigns and
+ // re-serves the channel topic and makes clients redo their lookups)
and retry.
+ try {
+ awaitTestNamespaceUnloaded(30);
+ } catch (ConditionTimeoutException channelWedged) {
+ recoverChannelOwnership();
+ awaitTestNamespaceUnloaded(60);
+ }
+ reset(primaryLoadManager, secondaryLoadManager);
+ FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
+ pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+ pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+ }
+
+ // Drive monitor() to reconcile roles and force-serve the channel topic
(monitor() only
+ // self-heals when there is *no* channel owner, not when an owner is
recorded but the bundle is
+ // not served), then unload. ignoreExceptions() retries transient
channel-publish failures; each
+ // unload attempt is bounded so a synchronous unload cannot block longer
than the retry window.
+ private void awaitTestNamespaceUnloaded(long atMostSeconds) {
boolean systemTopicChannel =
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName());
- Awaitility.await().atMost(120, TimeUnit.SECONDS)
+ Awaitility.await().atMost(atMostSeconds, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.ignoreExceptions()
.untilAsserted(() -> {
@@ -220,10 +242,35 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
}
admin.namespaces().unloadAsync(defaultTestNamespace).get(15, TimeUnit.SECONDS);
});
- reset(primaryLoadManager, secondaryLoadManager);
- FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
- pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
- pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+ }
+
+ /**
+ * Force a clean channel owner via leader re-election. After heavy direct
+ * playLeader()/playFollower() churn the channel system topic can be left
owned by a broker
+ * that no longer serves it, leaving the channel producer stuck on a stale
lookup in
+ * escalating reconnect backoff. Closing the current owner's
LeaderElectionService moves
+ * ownership to the other broker; its playLeader() re-creates and
re-serves the channel
+ * topic, and the ownership change makes clients redo their (stale)
lookups.
+ */
+ private void recoverChannelOwnership() throws Exception {
+ boolean pulsar1Owns;
+ try {
+ pulsar1Owns = channel1.isChannelOwner();
+ } catch (Exception e) {
+ // Owner can't be determined (e.g. no channel owner now); default
to moving to pulsar2.
+ pulsar1Owns = true;
+ }
+ PulsarService currentOwner = pulsar1Owns ? pulsar1 : pulsar2;
+ ServiceUnitStateChannelImpl newOwnerChannel = pulsar1Owns ? channel2 :
channel1;
+ currentOwner.getLeaderElectionService().close();
+ try {
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions()
+ .untilAsserted(() ->
assertTrue(newOwnerChannel.isChannelOwner()));
+ } catch (ConditionTimeoutException ignore) {
+ // Best effort: the subsequent unload retry is the real backstop.
+ } finally {
+ currentOwner.getLeaderElectionService().start();
+ }
}
protected void setPrimaryLoadManager() throws IllegalAccessException {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 58a9b56e79b..9e6440f8585 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1589,7 +1589,8 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
// and trip TestNG's ThreadTimeoutException mid-poll, and any failure to
settle
// is swallowed-and-logged rather than thrown. That matters because
callers invoke this from
// a finally block — a settling delay here must never replace (mask) the
body's exception.
- // The next test's initializeState() carries a 60s ignoreExceptions retry
as the real backstop.
+ // The next test's initializeState() is the real backstop: it retries the
unload and, if the
+ // channel stays wedged, forces a clean channel owner via leader
re-election before retrying.
private void awaitChannelOwnerStable() {
try {
Awaitility.await().atMost(20,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {