This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 882946c59d3 [fix][client] Prevent duplicate ServiceUrlProvider 
initialization (#25899)
882946c59d3 is described below

commit 882946c59d37b2a4d7e474bfa3a7d8da4d7fa59d
Author: Oneby Wang <[email protected]>
AuthorDate: Thu Jun 4 01:44:38 2026 +0800

    [fix][client] Prevent duplicate ServiceUrlProvider initialization (#25899)
---
 ...SameAuthParamsLookupAutoClusterFailoverTest.java | 14 ++++++++++++++
 .../pulsar/client/api/ServiceUrlProvider.java       | 11 ++++++++++-
 .../pulsar/client/impl/AutoClusterFailover.java     | 15 +++++++++++++--
 .../client/impl/ControlledClusterFailover.java      | 15 +++++++++++++--
 .../SameAuthParamsLookupAutoClusterFailover.java    | 16 +++++++++++++---
 .../pulsar/client/impl/AutoClusterFailoverTest.java | 21 +++++++++++++++++++++
 .../client/impl/ControlledClusterFailoverTest.java  | 17 +++++++++++++++++
 7 files changed, 101 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
index 128be102f58..60324baf076 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -140,6 +140,20 @@ public class SameAuthParamsLookupAutoClusterFailoverTest 
extends OneWayReplicato
         dummyServer.close();
     }
 
+    @Test
+    public void testInitializeCanOnlyBeCalledOnce() throws Exception {
+        setup();
+        final SameAuthParamsLookupAutoClusterFailover failover = 
SameAuthParamsLookupAutoClusterFailover.builder()
+                .pulsarServiceUrlArray(new 
String[]{pulsar1.getBrokerServiceUrl()})
+                .checkHealthyIntervalMs(1000)
+                .build();
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrlProvider(failover).build()) {
+            Throwable error = Assert.expectThrows(IllegalStateException.class, 
() -> failover.initialize(client));
+            Assert.assertEquals(error.getMessage(), "ServiceUrlProvider has 
already been initialized");
+        }
+    }
+
     /**
      * Wait for the state machine to converge to the expected per-index states 
and current index.
      * The state read happens on the failover executor to avoid races with the 
periodic check task,
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index e8b513b103f..f95f650cbb7 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -27,6 +27,11 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
  * <p>This allows applications to retrieve the service URL from an external 
configuration provider and,
  * more importantly, to force the Pulsar client to reconnect if the service 
URL has been changed.
  *
+ * <p>Each provider instance is tied to the lifecycle of one {@link 
PulsarClient} instance. The client
+ * initializes the provider when the client is created and closes the provider 
when the owning client is
+ * closed. Applications that create multiple Pulsar clients should create a 
separate provider instance
+ * for each client instead of sharing one provider.
+ *
  * <p>It can be passed with {@link 
ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
  */
 @InterfaceAudience.Public
@@ -39,6 +44,9 @@ public interface ServiceUrlProvider extends AutoCloseable {
      * <p>This can be used by the provider to force the Pulsar client to 
reconnect whenever the service url might have
      * changed. See {@link PulsarClient#updateServiceUrl(String)}.
      *
+     * <p>This method is invoked by the Pulsar client and is expected to be 
called once for a provider
+     * instance. Implementations may reject repeated initialization.
+     *
      * @param client
      *            created pulsar client.
      */
@@ -52,7 +60,8 @@ public interface ServiceUrlProvider extends AutoCloseable {
     String getServiceUrl();
 
     /**
-     * Close the resource that the provider allocated.
+     * Close the resource that the provider allocated. The owning Pulsar 
client invokes this method when
+     * it is closed.
      *
      */
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 26d10dde3ad..b3d0e009821 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -40,6 +40,14 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 
+/**
+ * A service URL provider that automatically fails over from the primary 
Pulsar service URL to one of
+ * the secondary service URLs and switches back after the primary service 
recovers.
+ *
+ * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once 
initialized by a
+ * Pulsar client, it must not be reused by another client. Create a new 
provider instance for each
+ * Pulsar client.
+ */
 @CustomLog
 @Data
 public class AutoClusterFailover implements ServiceUrlProvider {
@@ -86,7 +94,10 @@ public class AutoClusterFailover implements 
ServiceUrlProvider {
     }
 
     @Override
-    public void initialize(PulsarClient client) {
+    public synchronized void initialize(PulsarClient client) {
+        if (this.pulsarClient != null) {
+            throw new IllegalStateException("ServiceUrlProvider has already 
been initialized");
+        }
         this.pulsarClient = (PulsarClientImpl) client;
         this.addressResolver = pulsarClient.getAddressResolver();
         ClientConfigurationData config = pulsarClient.getConfiguration();
@@ -123,7 +134,7 @@ public class AutoClusterFailover implements 
ServiceUrlProvider {
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         this.executor.shutdown();
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 1252819a561..e4ad4ed1d1f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -57,6 +57,14 @@ import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.jspecify.annotations.Nullable;
 
+/**
+ * A service URL provider that fetches controlled failover configuration from 
an external HTTP service
+ * and updates the Pulsar client when the returned configuration changes.
+ *
+ * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once 
initialized by a
+ * Pulsar client, it must not be reused by another client. Create a new 
provider instance for each
+ * Pulsar client.
+ */
 @CustomLog
 public class ControlledClusterFailover implements ServiceUrlProvider {
     private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -111,7 +119,10 @@ public class ControlledClusterFailover implements 
ServiceUrlProvider {
     }
 
     @Override
-    public void initialize(PulsarClient client) {
+    public synchronized void initialize(PulsarClient client) {
+        if (this.pulsarClient != null) {
+            throw new IllegalStateException("ServiceUrlProvider has already 
been initialized");
+        }
         this.pulsarClient = (PulsarClientImpl) client;
         this.httpClient = buildHttpClient();
         this.requestBuilder = httpClient.prepareGet(urlProvider)
@@ -223,7 +234,7 @@ public class ControlledClusterFailover implements 
ServiceUrlProvider {
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         this.executor.shutdown();
         if (httpClient != null) {
             try {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
index 8abc0984c91..8d1bd777a7f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
@@ -37,6 +37,14 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 
+/**
+ * A service URL provider that probes multiple Pulsar service URLs with the 
same authentication
+ * parameters and fails over according to service health.
+ *
+ * <p>Each instance is tied to the lifecycle of one {@link PulsarClient}. Once 
initialized by a
+ * Pulsar client, it must not be reused by another client. Create a new 
provider instance for each
+ * Pulsar client.
+ */
 @CustomLog
 @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"})
 public class SameAuthParamsLookupAutoClusterFailover implements 
ServiceUrlProvider {
@@ -65,7 +73,10 @@ public class SameAuthParamsLookupAutoClusterFailover 
implements ServiceUrlProvid
     private SameAuthParamsLookupAutoClusterFailover() {}
 
     @Override
-    public void initialize(PulsarClient client) {
+    public synchronized void initialize(PulsarClient client) {
+        if (this.pulsarClient != null) {
+            throw new IllegalStateException("ServiceUrlProvider has already 
been initialized");
+        }
         this.currentPulsarServiceIndex = 0;
         this.pulsarClient = (PulsarClientImpl) client;
         this.executor = EventLoopUtil.newEventLoopGroup(1, false,
@@ -110,7 +121,7 @@ public class SameAuthParamsLookupAutoClusterFailover 
implements ServiceUrlProvid
 
     @SuppressWarnings("deprecation")
     @Override
-    public void close() throws Exception {
+    public synchronized void close() throws Exception {
         if (closed) {
             return;
         }
@@ -377,4 +388,3 @@ public class SameAuthParamsLookupAutoClusterFailover 
implements ServiceUrlProvid
         }
     }
 }
-
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 4596ac521b5..204225c03c8 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -31,10 +31,12 @@ import lombok.Cleanup;
 import lombok.CustomLog;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
@@ -148,6 +150,25 @@ public class AutoClusterFailoverTest {
         }
     }
 
+    @Test
+    public void testInitializeCanOnlyBeCalledOnce() throws Exception {
+        String primary = "pulsar://localhost:6650";
+        String secondary = "pulsar://localhost:6651";
+
+        ServiceUrlProvider provider = AutoClusterFailover.builder()
+                .primary(primary)
+                .secondary(Collections.singletonList(secondary))
+                .failoverDelay(1, TimeUnit.SECONDS)
+                .switchBackDelay(1, TimeUnit.SECONDS)
+                .checkInterval(30, TimeUnit.SECONDS)
+                .build();
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrlProvider(provider).build()) {
+            Throwable error = Assert.expectThrows(IllegalStateException.class, 
() -> provider.initialize(client));
+            assertEquals(error.getMessage(), "ServiceUrlProvider has already 
been initialized");
+        }
+    }
+
     @Test
     public void testAutoClusterFailoverSwitchWithoutAuthentication() throws 
Exception {
         String primary = "pulsar://localhost:6650";
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 86b2fa7cb4f..cc47841ce0a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.asynchttpclient.Request;
@@ -73,6 +74,22 @@ public class ControlledClusterFailoverTest {
         Assert.assertEquals(request.getHeaders().get(keyB), valueB);
     }
 
+    @Test
+    public void testInitializeCanOnlyBeCalledOnce() throws Exception {
+        String defaultServiceUrl = "pulsar://localhost:6650";
+        String urlProvider = "http://localhost:8080/test";;
+
+        ServiceUrlProvider provider = ControlledClusterFailover.builder()
+            .defaultServiceUrl(defaultServiceUrl)
+            .urlProvider(urlProvider)
+            .build();
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrlProvider(provider).build()) {
+            Throwable error = Assert.expectThrows(IllegalStateException.class, 
() -> provider.initialize(client));
+            Assert.assertEquals(error.getMessage(), "ServiceUrlProvider has 
already been initialized");
+        }
+    }
+
     @Test
     public void testControlledClusterFailoverSwitch() throws Exception {
         String defaultServiceUrl = "pulsar+ssl://localhost:6651";

Reply via email to