This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ff90898b88eea16fe1b48207398a05efded355c8
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 17 13:39:15 2026 +0300

    [fix][sec] Upgrade to async-http-client 2.14.5 to address CVE-2026-40490 
(#25546)
    
    (cherry picked from commit a1613bc2e5fd26cc16fc95b4a3c61bc5e1ae090d)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   6 +-
 distribution/shell/src/assemble/LICENSE.bin.txt    |   6 +-
 pom.xml                                            |   2 +-
 .../internal/http/AsyncHttpConnectorTest.java      |  56 ++++++++
 pulsar-client/pom.xml                              |   6 +
 .../org/apache/pulsar/client/impl/HttpClient.java  |  89 +++++++++---
 .../apache/pulsar/client/impl/HttpClientTest.java  | 150 +++++++++++++++++++++
 .../pulsar/client/impl/HttpLookupServiceTest.java  | 142 +++++++++++++++++++
 8 files changed, 435 insertions(+), 22 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 8cd3b8c3b6c..d73b2bf7092 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -392,8 +392,8 @@ The Apache Software License, Version 2.0
  * AirCompressor
     - io.airlift-aircompressor-2.0.3.jar
  * AsyncHttpClient
-    - org.asynchttpclient-async-http-client-2.12.4.jar
-    - org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
+    - org.asynchttpclient-async-http-client-2.14.5.jar
+    - org.asynchttpclient-async-http-client-netty-utils-2.14.5.jar
  * Jetty
     - org.eclipse.jetty-jetty-client-9.4.58.v20250814.jar
     - org.eclipse.jetty-jetty-continuation-9.4.58.v20250814.jar
@@ -611,7 +611,7 @@ Eclipse Public License - v2.0 -- 
../licenses/LICENSE-EPL-2.0.txt
  * Jakarta Injection -- org.glassfish.hk2.external-jakarta.inject-2.6.1.jar
 
 Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
- * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.3.jar
+ * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.4.jar
 
 Creative Commons Attribution License
  * Jcip -- ../licenses/LICENSE-jcip.txt
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index c4bfdf8a3de..e306bdced43 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -398,8 +398,8 @@ The Apache Software License, Version 2.0
   * AirCompressor
      - aircompressor-2.0.3.jar
  * AsyncHttpClient
-    - async-http-client-2.12.4.jar
-    - async-http-client-netty-utils-2.12.4.jar
+    - async-http-client-2.14.5.jar
+    - async-http-client-netty-utils-2.14.5.jar
  * Jetty
     - jetty-client-9.4.58.v20250814.jar
     - jetty-http-9.4.58.v20250814.jar
@@ -459,7 +459,7 @@ Eclipse Public License - v2.0 -- 
../licenses/LICENSE-EPL-2.0.txt
  * Jakarta Injection -- jakarta.inject-2.6.1.jar
 
 Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
- * Reactive Streams -- reactive-streams-1.0.3.jar
+ * Reactive Streams -- reactive-streams-1.0.4.jar
 
 Creative Commons Attribution License
  * Jcip -- ../licenses/LICENSE-jcip.txt
diff --git a/pom.xml b/pom.xml
index 3a14a780d38..8940335744a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -261,7 +261,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <prometheus-jmx.version>0.16.1</prometheus-jmx.version>
     <confluent.version>7.9.2</confluent.version>
     <aircompressor.version>2.0.3</aircompressor.version>
-    <asynchttpclient.version>2.12.4</asynchttpclient.version>
+    <asynchttpclient.version>2.14.5</asynchttpclient.version>
     <commons-lang3.version>3.19.0</commons-lang3.version>
     <commons-io.version>2.21.0</commons-io.version>
     <commons-codec.version>1.20.0</commons-codec.version>
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
index e0b4b13a03d..38243976f56 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.admin.internal.http;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
 import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
 import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
 import static org.testng.Assert.assertEquals;
@@ -282,6 +284,60 @@ public class AsyncHttpConnectorTest {
         assertEquals(response.getResponseBody(), "OK");
     }
 
+    /**
+     * Locks in that AsyncHttpConnector forwards the {@code Authorization} 
header across a cross-origin
+     * HTTP redirect (different host:port — serverA → serverB). The admin 
connector disables AHC's
+     * built-in follow-redirect and runs its own redirect loop that copies the 
original request headers,
+     * so it must remain unaffected by the async-http-client &gt;= 2.14.5 
{@code Authorization} stripping
+     * on cross-origin redirects (CVE-2026-40490 fix). Regressing {@code 
setFollowRedirect(true)} would
+     * break this test.
+     */
+    @Test
+    void testAuthorizationHeaderOnCrossOriginRedirect() throws 
ExecutionException, InterruptedException {
+        WireMockServer serverB = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverB.start();
+        try {
+            server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                    .willReturn(aResponse()
+                            .withStatus(307)
+                            .withHeader("Location",
+                                    "http://127.0.0.1:"; + serverB.port() + 
"/admin/v2/clusters")));
+
+            serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                    .atPriority(2)
+                    .willReturn(aResponse().withStatus(401).withBody("missing 
auth")));
+            serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                    .atPriority(1)
+                    .withHeader("Authorization", equalTo("Bearer test-token"))
+                    .willReturn(aResponse()
+                            .withStatus(200)
+                            .withHeader("Content-Type", "application/json")
+                            .withBody("[\"test-cluster\"]")));
+
+            ClientConfigurationData conf = new ClientConfigurationData();
+            conf.setServiceUrl("http://127.0.0.1:"; + server.port());
+
+            @Cleanup
+            AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+                    5000, 0, conf, false, null);
+
+            Request request = new RequestBuilder("GET")
+                    .setUrl("http://127.0.0.1:"; + server.port() + 
"/admin/v2/clusters")
+                    .addHeader("Authorization", "Bearer test-token")
+                    .build();
+
+            Response response = connector.executeRequest(request).get();
+
+            assertEquals(response.getStatusCode(), 200,
+                    "cross-origin redirect should forward Authorization and 
return the stubbed body");
+            assertEquals(response.getResponseBody(), "[\"test-cluster\"]");
+            serverB.verify(getRequestedFor(urlEqualTo("/admin/v2/clusters"))
+                    .withHeader("Authorization", equalTo("Bearer 
test-token")));
+        } finally {
+            serverB.stop();
+        }
+    }
+
     @Test
     void testRedirectWithBody() throws ExecutionException, 
InterruptedException {
         server.stubFor(post(urlEqualTo("/path1"))
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index aaab87c6e63..6c60e79d0e0 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -228,6 +228,12 @@
       <artifactId>fastutil</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.tomakehurst</groupId>
+      <artifactId>wiremock-jre8-standalone</artifactId>
+      <version>${wiremock.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 72fcf82b859..6c53c067702 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -55,6 +55,7 @@ import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
 import org.asynchttpclient.SslEngineFactory;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 
@@ -87,7 +88,12 @@ public class HttpClient implements Closeable {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setCookieStore(null);
         confBuilder.setUseProxyProperties(true);
-        confBuilder.setFollowRedirect(true);
+        // Follow redirects manually in executeGet(...) so we can re-invoke 
authentication per hop and
+        // carry the Authorization header across cross-origin redirects. 
async-http-client >= 2.14.5
+        // (CVE-2026-40490 fix) strips the Authorization header when it 
follows redirects itself; Pulsar
+        // HTTP lookups routinely redirect to another broker's 
httpUrl/httpUrlTls which is a different
+        // host/port, i.e. cross-origin.
+        confBuilder.setFollowRedirect(false);
         confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
         confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 
1000);
         confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
@@ -167,10 +173,28 @@ public class HttpClient implements Closeable {
         try {
             URI hostUri = serviceNameResolver.resolveHostUri();
             String requestUrl = new URL(hostUri.toURL(), path).toString();
-            String remoteHostName = hostUri.getHost();
+            InetSocketAddress originalHost = 
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort());
+            executeGet(requestUrl, originalHost, 
clientConf.getMaxLookupRedirects(), future, clazz);
+        } catch (Exception e) {
+            log.warn("[{}] Failed to initiate HTTP get request: {}", path, 
e.getMessage());
+            if (e instanceof PulsarClientException) {
+                future.completeExceptionally(e);
+            } else {
+                future.completeExceptionally(new PulsarClientException(e));
+            }
+        }
+
+        return future;
+    }
+
+    private <T> void executeGet(String requestUrl, InetSocketAddress 
originalHost,
+                                int redirectsRemaining, CompletableFuture<T> 
future, Class<T> clazz) {
+        try {
+            URI currentUri = URI.create(requestUrl);
+            String remoteHostName = currentUri.getHost();
             AuthenticationDataProvider authData = 
authentication.getAuthData(remoteHostName);
 
-            CompletableFuture<Map<String, String>>  authFuture = new 
CompletableFuture<>();
+            CompletableFuture<Map<String, String>> authFuture = new 
CompletableFuture<>();
 
             // bring a authenticationStage for sasl auth.
             if (authData.hasDataForHttp()) {
@@ -179,18 +203,15 @@ public class HttpClient implements Closeable {
                 authFuture.complete(null);
             }
 
-            // auth complete, do real request
             authFuture.whenComplete((respHeaders, ex) -> {
                 if (ex != null) {
-                    serviceNameResolver.markHostAvailability(
-                            
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), 
false);
+                    serviceNameResolver.markHostAvailability(originalHost, 
false);
                     log.warn("[{}] Failed to perform http request at 
authentication stage: {}",
                         requestUrl, ex.getMessage());
                     future.completeExceptionally(new 
PulsarClientException(ex));
                     return;
                 }
 
-                // auth complete, use a new builder
                 BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
                         // share the DNS resolver and cache with Pulsar client
                         .setNameResolver(nameResolver)
@@ -217,20 +238,25 @@ public class HttpClient implements Closeable {
 
                 
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
                     if (t != null) {
-                        serviceNameResolver.markHostAvailability(
-                                
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), 
false);
+                        serviceNameResolver.markHostAvailability(originalHost, 
false);
                         log.warn("[{}] Failed to perform http request: {}", 
requestUrl, t.getMessage());
                         future.completeExceptionally(new 
PulsarClientException(t));
                         return;
                     }
-                    serviceNameResolver.markHostAvailability(
-                            
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), true);
+                    serviceNameResolver.markHostAvailability(originalHost, 
true);
+
+                    int statusCode = response2.getStatusCode();
+                    if (isRedirectStatusCode(statusCode)) {
+                        handleRedirect(requestUrl, currentUri, response2, 
originalHost,
+                                redirectsRemaining, future, clazz);
+                        return;
+                    }
 
                     // request not success
-                    if (response2.getStatusCode() != 
HttpURLConnection.HTTP_OK) {
+                    if (statusCode != HttpURLConnection.HTTP_OK) {
                         log.warn("[{}] HTTP get request failed: {}", 
requestUrl, response2.getStatusText());
                         Exception e;
-                        if (response2.getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+                        if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) {
                             e = new NotFoundException("Not found: " + 
response2.getStatusText());
                         } else {
                             e = new PulsarClientException("HTTP get request 
failed: " + response2.getStatusText());
@@ -250,15 +276,48 @@ public class HttpClient implements Closeable {
                 });
             });
         } catch (Exception e) {
-            log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
+            log.warn("[{}] HTTP request setup failed: {}", requestUrl, 
e.getMessage());
             if (e instanceof PulsarClientException) {
                 future.completeExceptionally(e);
             } else {
                 future.completeExceptionally(new PulsarClientException(e));
             }
         }
+    }
 
-        return future;
+    private <T> void handleRedirect(String requestUrl, URI currentUri,
+                                    Response response,
+                                    InetSocketAddress originalHost, int 
redirectsRemaining,
+                                    CompletableFuture<T> future, Class<T> 
clazz) {
+        String location = response.getHeader("Location");
+        if (location == null || location.isEmpty()) {
+            future.completeExceptionally(new PulsarClientException(
+                    "HTTP redirect " + response.getStatusCode() + " without 
Location header: " + requestUrl));
+            return;
+        }
+        if (redirectsRemaining <= 0) {
+            future.completeExceptionally(new PulsarClientException(
+                    "Maximum redirects exceeded (" + 
clientConf.getMaxLookupRedirects()
+                            + ") while following HTTP redirect for " + 
requestUrl));
+            return;
+        }
+        String newUrl;
+        try {
+            newUrl = currentUri.resolve(location).toString();
+        } catch (Exception e) {
+            future.completeExceptionally(new PulsarClientException(
+                    "Invalid redirect Location \"" + location + "\" for " + 
requestUrl));
+            return;
+        }
+        executeGet(newUrl, originalHost, redirectsRemaining - 1, future, 
clazz);
+    }
+
+    private static boolean isRedirectStatusCode(int statusCode) {
+        return statusCode == HttpURLConnection.HTTP_MOVED_PERM   // 301
+                || statusCode == HttpURLConnection.HTTP_MOVED_TEMP   // 302
+                || statusCode == HttpURLConnection.HTTP_SEE_OTHER    // 303
+                || statusCode == 307                                 // 
Temporary Redirect
+                || statusCode == 308;                                // 
Permanent Redirect
     }
 
     protected PulsarSslConfiguration 
buildSslConfiguration(ClientConfigurationData config, String host)
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
new file mode 100644
index 00000000000..ec3c8b0b78e
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.resolver.NameResolver;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.lookup.data.LookupData;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies that {@link HttpClient} carries the {@code Authorization} header 
across cross-origin HTTP redirects.
+ *
+ * <p>Pulsar's HTTP lookup endpoint returns {@code 307 Temporary Redirect} to 
whichever broker owns the bundle for a
+ * topic. The redirect target is that broker's {@code httpUrl}/{@code 
httpUrlTls}, i.e. typically a different host or
+ * port from the original request. Auth plugins ({@code AuthenticationToken}, 
{@code AuthenticationBasic},
+ * {@code AuthenticationOAuth2}, {@code AuthenticationAthenz}) inject the 
{@code Authorization} header — that header
+ * must reach the redirect target for lookup to succeed.
+ *
+ * <p>async-http-client 2.14.5 strips {@code Authorization} on cross-origin 
redirects when its built-in follow-redirect
+ * is enabled (CVE-2026-40490 fix). This test drives two WireMock servers on 
different ports to exercise that path.
+ */
+public class HttpClientTest {
+
+    private static final String LOOKUP_PATH = 
"/lookup/v2/topic/persistent/public/default/test-topic";
+    private static final String EXPECTED_BODY = 
"{\"brokerUrl\":\"pulsar://broker-b:6650\","
+            + "\"brokerUrlTls\":\"pulsar+ssl://broker-b:6651\","
+            + "\"httpUrl\":\"http://broker-b:8080\",";
+            + "\"httpUrlTls\":\"https://broker-b:8443\",";
+            + "\"nativeUrl\":\"pulsar://broker-b:6650\"}";
+
+    private WireMockServer serverA;
+    private WireMockServer serverB;
+    private EventLoopGroup eventLoopGroup;
+    private Timer timer;
+    private NameResolver<InetAddress> nameResolver;
+
+    @BeforeClass(alwaysRun = true)
+    void beforeClass() {
+        eventLoopGroup = new NioEventLoopGroup(1, new 
DefaultThreadFactory("HttpClientTest"));
+        timer = new HashedWheelTimer(new 
DefaultThreadFactory("HttpClientTest-timer"));
+        // Default JDK-backed resolver is sufficient; we only hit 127.0.0.1.
+        nameResolver = DnsResolverUtil.adaptToNameResolver(null);
+    }
+
+    @AfterClass(alwaysRun = true)
+    void afterClass() {
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully();
+        }
+        if (timer != null) {
+            timer.stop();
+        }
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    void beforeMethod() {
+        serverA = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverB = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverA.start();
+        serverB.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void afterMethod() {
+        if (serverA != null) {
+            serverA.stop();
+        }
+        if (serverB != null) {
+            serverB.stop();
+        }
+    }
+
+    @Test
+    public void testCrossOriginRedirectCarriesAuthorizationHeader() throws 
Exception {
+        // serverA (origin host:port) returns 307 Temporary Redirect to 
serverB (different port -> cross-origin).
+        serverA.stubFor(get(urlPathMatching(LOOKUP_PATH))
+                .willReturn(aResponse()
+                        .withStatus(307)
+                        .withHeader("Location",
+                                "http://127.0.0.1:"; + serverB.port() + 
LOOKUP_PATH)));
+
+        // serverB only responds 200 when the Authorization header is present 
with the expected token.
+        // Priorities: lower = higher priority; the specific-Authorization 
stub must be checked first.
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
+                .atPriority(2)
+                .willReturn(aResponse().withStatus(401).withBody("missing 
auth")));
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
+                .atPriority(1)
+                .withHeader("Authorization", equalTo("Bearer test-token"))
+                .willReturn(aResponse()
+                        .withStatus(200)
+                        .withHeader("Content-Type", "application/json")
+                        .withBody(EXPECTED_BODY)));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://127.0.0.1:"; + serverA.port());
+        conf.setAuthentication(AuthenticationFactory.token("test-token"));
+
+        try (HttpClient httpClient = new HttpClient(conf, eventLoopGroup, 
timer, nameResolver)) {
+            LookupData result = httpClient.get(LOOKUP_PATH, LookupData.class)
+                    .get(30, TimeUnit.SECONDS);
+
+            assertNotNull(result, "Expected lookup payload after cross-origin 
redirect");
+            assertEquals(result.getBrokerUrl(), "pulsar://broker-b:6650");
+            assertEquals(result.getHttpUrl(), "http://broker-b:8080";);
+
+            // Lock the invariant: the final hop on serverB must carry the 
Authorization header.
+            serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH))
+                    .withHeader("Authorization", equalTo("Bearer 
test-token")));
+        }
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
new file mode 100644
index 00000000000..f2d3751ecb0
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end check that {@link HttpLookupService#getBroker} carries the 
{@code Authorization} header
+ * across a cross-origin redirect — the production scenario where a broker 
returns
+ * {@code 307 Temporary Redirect} with a {@code Location} pointing at another 
broker's
+ * {@code httpUrl}/{@code httpUrlTls}.
+ */
+public class HttpLookupServiceTest {
+
+    private static final String TOPIC = 
"persistent://public/default/cross-origin-lookup-topic";
+    private static final String LOOKUP_PATH_REGEX =
+            
"/lookup/v2/topic/persistent/public/default/cross-origin-lookup-topic";
+    private static final String LOOKUP_BODY = 
"{\"brokerUrl\":\"pulsar://broker-b.example:6650\","
+            + "\"brokerUrlTls\":\"pulsar+ssl://broker-b.example:6651\","
+            + "\"httpUrl\":\"http://broker-b.example:8080\",";
+            + "\"httpUrlTls\":\"https://broker-b.example:8443\",";
+            + "\"nativeUrl\":\"pulsar://broker-b.example:6650\"}";
+
+    private WireMockServer serverA;
+    private WireMockServer serverB;
+    private EventLoopGroup eventLoopGroup;
+    private Timer timer;
+
+    @BeforeClass(alwaysRun = true)
+    void beforeClass() {
+        eventLoopGroup = new NioEventLoopGroup(1, new 
DefaultThreadFactory("HttpLookupServiceTest"));
+        timer = new HashedWheelTimer(new 
DefaultThreadFactory("HttpLookupServiceTest-timer"));
+    }
+
+    @AfterClass(alwaysRun = true)
+    void afterClass() {
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully();
+        }
+        if (timer != null) {
+            timer.stop();
+        }
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    void beforeMethod() {
+        serverA = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverB = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverA.start();
+        serverB.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void afterMethod() {
+        if (serverA != null) {
+            serverA.stop();
+        }
+        if (serverB != null) {
+            serverB.stop();
+        }
+    }
+
+    @Test
+    public void testGetBrokerFollowsCrossOriginRedirect() throws Exception {
+        serverA.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+                .willReturn(aResponse()
+                        .withStatus(307)
+                        .withHeader("Location",
+                                "http://127.0.0.1:"; + serverB.port() + 
LOOKUP_PATH_REGEX)));
+
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+                .atPriority(2)
+                .willReturn(aResponse().withStatus(401).withBody("missing 
auth")));
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+                .atPriority(1)
+                .withHeader("Authorization", equalTo("Bearer test-token"))
+                .willReturn(aResponse()
+                        .withStatus(200)
+                        .withHeader("Content-Type", "application/json")
+                        .withBody(LOOKUP_BODY)));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://127.0.0.1:"; + serverA.port());
+        conf.setAuthentication(AuthenticationFactory.token("test-token"));
+
+        HttpLookupService lookupService = new HttpLookupService(
+                InstrumentProvider.NOOP, conf, eventLoopGroup, timer, 
DnsResolverUtil.adaptToNameResolver(null));
+        try {
+            LookupTopicResult result = 
lookupService.getBroker(TopicName.get(TOPIC))
+                    .get(30, TimeUnit.SECONDS);
+
+            assertNotNull(result);
+            assertEquals(result.getLogicalAddress().getHostString(), 
"broker-b.example");
+            assertEquals(result.getLogicalAddress().getPort(), 6650);
+
+            serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH_REGEX))
+                    .withHeader("Authorization", equalTo("Bearer 
test-token")));
+        } finally {
+            lookupService.close();
+        }
+    }
+}

Reply via email to