This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bc811025c52 [fix][sec] Upgrade to async-http-client 2.14.5 to address 
CVE-2026-40490 (#25546)
bc811025c52 is described below

commit bc811025c5280b58b1034113eabaa08e8a539bf4
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 17 13:39:15 2026 +0300

    [fix][sec] Upgrade to async-http-client 2.14.5 to address CVE-2026-40490 
(#25546)
    
    (cherry picked from commit a1613bc2e5fd26cc16fc95b4a3c61bc5e1ae090d)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   6 +-
 distribution/shell/src/assemble/LICENSE.bin.txt    |   6 +-
 pom.xml                                            |   2 +-
 .../internal/http/AsyncHttpConnectorTest.java      |  56 +++++++++
 pulsar-client/pom.xml                              |   6 +
 .../org/apache/pulsar/client/impl/HttpClient.java  |  74 ++++++++++-
 .../apache/pulsar/client/impl/HttpClientTest.java  | 137 +++++++++++++++++++++
 .../pulsar/client/impl/HttpLookupServiceTest.java  | 134 ++++++++++++++++++++
 8 files changed, 408 insertions(+), 13 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index ff76b5e0d05..1d86377828f 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -379,8 +379,8 @@ The Apache Software License, Version 2.0
  * AirCompressor
     - io.airlift-aircompressor-2.0.3.jar
  * AsyncHttpClient
-    - org.asynchttpclient-async-http-client-2.12.4.jar
-    - org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
+    - org.asynchttpclient-async-http-client-2.14.5.jar
+    - org.asynchttpclient-async-http-client-netty-utils-2.14.5.jar
  * Jetty
     - org.eclipse.jetty-jetty-client-9.4.58.v20250814.jar
     - org.eclipse.jetty-jetty-continuation-9.4.58.v20250814.jar
@@ -565,7 +565,7 @@ Eclipse Public License - v2.0 -- 
../licenses/LICENSE-EPL-2.0.txt
  * Jakarta Injection -- org.glassfish.hk2.external-jakarta.inject-2.6.1.jar
 
 Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
- * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.3.jar
+ * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.4.jar
 
 Creative Commons Attribution License
  * Jcip -- ../licenses/LICENSE-jcip.txt
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index bbc33955365..c5ae5aca399 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -396,8 +396,8 @@ The Apache Software License, Version 2.0
   * AirCompressor
      - aircompressor-2.0.3.jar
  * AsyncHttpClient
-    - async-http-client-2.12.4.jar
-    - async-http-client-netty-utils-2.12.4.jar
+    - async-http-client-2.14.5.jar
+    - async-http-client-netty-utils-2.14.5.jar
  * Jetty
     - jetty-client-9.4.58.v20250814.jar
     - jetty-http-9.4.58.v20250814.jar
@@ -456,7 +456,7 @@ Eclipse Public License - v2.0 -- 
../licenses/LICENSE-EPL-2.0.txt
  * Jakarta Injection -- jakarta.inject-2.6.1.jar
 
 Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
- * Reactive Streams -- reactive-streams-1.0.3.jar
+ * Reactive Streams -- reactive-streams-1.0.4.jar
 
 Creative Commons Attribution License
  * Jcip -- ../licenses/LICENSE-jcip.txt
diff --git a/pom.xml b/pom.xml
index aa12d2c56b1..539dd46acf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,7 +214,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <prometheus-jmx.version>0.16.1</prometheus-jmx.version>
     <confluent.version>7.9.2</confluent.version>
     <aircompressor.version>2.0.3</aircompressor.version>
-    <asynchttpclient.version>2.12.4</asynchttpclient.version>
+    <asynchttpclient.version>2.14.5</asynchttpclient.version>
     <jcommander.version>1.82</jcommander.version>
     <commons-lang3.version>3.18.0</commons-lang3.version>
     <commons-configuration.version>1.10</commons-configuration.version>
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
index 44e97386779..caed88a46b0 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.admin.internal.http;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
 import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
 import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
 import static org.testng.Assert.assertEquals;
@@ -282,6 +284,60 @@ public class AsyncHttpConnectorTest {
         assertEquals(response.getResponseBody(), "OK");
     }
 
+    /**
+     * Locks in that AsyncHttpConnector forwards the {@code Authorization} 
header across a cross-origin
+     * HTTP redirect (different host:port — serverA → serverB). The admin 
connector disables AHC's
+     * built-in follow-redirect and runs its own redirect loop that copies the 
original request headers,
+     * so it must remain unaffected by the async-http-client &gt;= 2.14.5 
{@code Authorization} stripping
+     * on cross-origin redirects (CVE-2026-40490 fix). Regressing {@code 
setFollowRedirect(true)} would
+     * break this test.
+     */
+    @Test
+    void testAuthorizationHeaderOnCrossOriginRedirect() throws 
ExecutionException, InterruptedException {
+        WireMockServer serverB = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverB.start();
+        try {
+            server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                    .willReturn(aResponse()
+                            .withStatus(307)
+                            .withHeader("Location",
+                                    "http://127.0.0.1:"; + serverB.port() + 
"/admin/v2/clusters")));
+
+            serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                    .atPriority(2)
+                    .willReturn(aResponse().withStatus(401).withBody("missing 
auth")));
+            serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                    .atPriority(1)
+                    .withHeader("Authorization", equalTo("Bearer test-token"))
+                    .willReturn(aResponse()
+                            .withStatus(200)
+                            .withHeader("Content-Type", "application/json")
+                            .withBody("[\"test-cluster\"]")));
+
+            ClientConfigurationData conf = new ClientConfigurationData();
+            conf.setServiceUrl("http://127.0.0.1:"; + server.port());
+
+            @Cleanup
+            AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+                    5000, 0, conf);
+
+            Request request = new RequestBuilder("GET")
+                    .setUrl("http://127.0.0.1:"; + server.port() + 
"/admin/v2/clusters")
+                    .addHeader("Authorization", "Bearer test-token")
+                    .build();
+
+            Response response = connector.executeRequest(request).get();
+
+            assertEquals(response.getStatusCode(), 200,
+                    "cross-origin redirect should forward Authorization and 
return the stubbed body");
+            assertEquals(response.getResponseBody(), "[\"test-cluster\"]");
+            serverB.verify(getRequestedFor(urlEqualTo("/admin/v2/clusters"))
+                    .withHeader("Authorization", equalTo("Bearer 
test-token")));
+        } finally {
+            serverB.stop();
+        }
+    }
+
     @Test
     void testRedirectWithBody() throws ExecutionException, 
InterruptedException {
         server.stubFor(post(urlEqualTo("/path1"))
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 66b82730f94..29778a810b7 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -193,6 +193,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.tomakehurst</groupId>
+      <artifactId>wiremock-jre8-standalone</artifactId>
+      <version>${wiremock.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index d732a6a36d9..b61115a78c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -53,6 +53,7 @@ import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.asynchttpclient.netty.ssl.JsseSslEngineFactory;
 
@@ -66,8 +67,10 @@ public class HttpClient implements Closeable {
     protected final AsyncHttpClient httpClient;
     protected final ServiceNameResolver serviceNameResolver;
     protected final Authentication authentication;
+    protected final ClientConfigurationData clientConf;
 
     protected HttpClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
+        this.clientConf = conf;
         this.authentication = conf.getAuthentication();
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
@@ -75,7 +78,12 @@ public class HttpClient implements Closeable {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setCookieStore(null);
         confBuilder.setUseProxyProperties(true);
-        confBuilder.setFollowRedirect(true);
+        // Follow redirects manually in executeGet(...) so we can re-invoke 
authentication per hop and
+        // carry the Authorization header across cross-origin redirects. 
async-http-client >= 2.14.5
+        // (CVE-2026-40490 fix) strips the Authorization header when it 
follows redirects itself; Pulsar
+        // HTTP lookups routinely redirect to another broker's 
httpUrl/httpUrlTls which is a different
+        // host/port, i.e. cross-origin.
+        confBuilder.setFollowRedirect(false);
         confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
         confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 
1000);
         confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
@@ -186,7 +194,24 @@ public class HttpClient implements Closeable {
         try {
             URI hostUri = serviceNameResolver.resolveHostUri();
             String requestUrl = new URL(hostUri.toURL(), path).toString();
-            String remoteHostName = hostUri.getHost();
+            executeGet(requestUrl, clientConf.getMaxLookupRedirects(), future, 
clazz);
+        } catch (Exception e) {
+            log.warn("[{}] Failed to initiate HTTP get request: {}", path, 
e.getMessage());
+            if (e instanceof PulsarClientException) {
+                future.completeExceptionally(e);
+            } else {
+                future.completeExceptionally(new PulsarClientException(e));
+            }
+        }
+
+        return future;
+    }
+
+    private <T> void executeGet(String requestUrl, int redirectsRemaining,
+                                CompletableFuture<T> future, Class<T> clazz) {
+        try {
+            URI currentUri = URI.create(requestUrl);
+            String remoteHostName = currentUri.getHost();
             AuthenticationDataProvider authData = 
authentication.getAuthData(remoteHostName);
 
             CompletableFuture<Map<String, String>>  authFuture = new 
CompletableFuture<>();
@@ -232,11 +257,17 @@ public class HttpClient implements Closeable {
                         return;
                     }
 
+                    int statusCode = response2.getStatusCode();
+                    if (isRedirectStatusCode(statusCode)) {
+                        handleRedirect(requestUrl, currentUri, response2, 
redirectsRemaining, future, clazz);
+                        return;
+                    }
+
                     // request not success
-                    if (response2.getStatusCode() != 
HttpURLConnection.HTTP_OK) {
+                    if (statusCode != HttpURLConnection.HTTP_OK) {
                         log.warn("[{}] HTTP get request failed: {}", 
requestUrl, response2.getStatusText());
                         Exception e;
-                        if (response2.getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+                        if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) {
                             e = new NotFoundException("Not found: " + 
response2.getStatusText());
                         } else {
                             e = new PulsarClientException("HTTP get request 
failed: " + response2.getStatusText());
@@ -256,14 +287,45 @@ public class HttpClient implements Closeable {
                 });
             });
         } catch (Exception e) {
-            log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
+            log.warn("[{}] HTTP request setup failed: {}", requestUrl, 
e.getMessage());
             if (e instanceof PulsarClientException) {
                 future.completeExceptionally(e);
             } else {
                 future.completeExceptionally(new PulsarClientException(e));
             }
         }
+    }
 
-        return future;
+    private <T> void handleRedirect(String requestUrl, URI currentUri, 
Response response,
+                                    int redirectsRemaining, 
CompletableFuture<T> future, Class<T> clazz) {
+        String location = response.getHeader("Location");
+        if (location == null || location.isEmpty()) {
+            future.completeExceptionally(new PulsarClientException(
+                    "HTTP redirect " + response.getStatusCode() + " without 
Location header: " + requestUrl));
+            return;
+        }
+        if (redirectsRemaining <= 0) {
+            future.completeExceptionally(new PulsarClientException(
+                    "Maximum redirects exceeded (" + 
clientConf.getMaxLookupRedirects()
+                            + ") while following HTTP redirect for " + 
requestUrl));
+            return;
+        }
+        String newUrl;
+        try {
+            newUrl = currentUri.resolve(location).toString();
+        } catch (Exception e) {
+            future.completeExceptionally(new PulsarClientException(
+                    "Invalid redirect Location \"" + location + "\" for " + 
requestUrl));
+            return;
+        }
+        executeGet(newUrl, redirectsRemaining - 1, future, clazz);
+    }
+
+    private static boolean isRedirectStatusCode(int statusCode) {
+        return statusCode == HttpURLConnection.HTTP_MOVED_PERM   // 301
+                || statusCode == HttpURLConnection.HTTP_MOVED_TEMP   // 302
+                || statusCode == HttpURLConnection.HTTP_SEE_OTHER    // 303
+                || statusCode == 307                                 // 
Temporary Redirect
+                || statusCode == 308;                                // 
Permanent Redirect
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
new file mode 100644
index 00000000000..c7ef13761a7
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.lookup.data.LookupData;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies that {@link HttpClient} carries the {@code Authorization} header 
across cross-origin HTTP redirects.
+ *
+ * <p>Pulsar's HTTP lookup endpoint returns {@code 307 Temporary Redirect} to 
whichever broker owns the bundle for a
+ * topic. The redirect target is that broker's {@code httpUrl}/{@code 
httpUrlTls}, i.e. typically a different host or
+ * port from the original request. Auth plugins ({@code AuthenticationToken}, 
{@code AuthenticationBasic},
+ * {@code AuthenticationOAuth2}, {@code AuthenticationAthenz}) inject the 
{@code Authorization} header — that header
+ * must reach the redirect target for lookup to succeed.
+ *
+ * <p>async-http-client 2.14.5 strips {@code Authorization} on cross-origin 
redirects when its built-in follow-redirect
+ * is enabled (CVE-2026-40490 fix). This test drives two WireMock servers on 
different ports to exercise that path.
+ */
+public class HttpClientTest {
+
+    private static final String LOOKUP_PATH = 
"/lookup/v2/topic/persistent/public/default/test-topic";
+    private static final String EXPECTED_BODY = 
"{\"brokerUrl\":\"pulsar://broker-b:6650\","
+            + "\"brokerUrlTls\":\"pulsar+ssl://broker-b:6651\","
+            + "\"httpUrl\":\"http://broker-b:8080\",";
+            + "\"httpUrlTls\":\"https://broker-b:8443\",";
+            + "\"nativeUrl\":\"pulsar://broker-b:6650\"}";
+
+    private WireMockServer serverA;
+    private WireMockServer serverB;
+    private EventLoopGroup eventLoopGroup;
+
+    @BeforeClass(alwaysRun = true)
+    void beforeClass() {
+        eventLoopGroup = new NioEventLoopGroup(1, new 
DefaultThreadFactory("HttpClientTest"));
+    }
+
+    @AfterClass(alwaysRun = true)
+    void afterClass() {
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully();
+        }
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    void beforeMethod() {
+        serverA = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverB = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverA.start();
+        serverB.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void afterMethod() {
+        if (serverA != null) {
+            serverA.stop();
+        }
+        if (serverB != null) {
+            serverB.stop();
+        }
+    }
+
+    @Test
+    public void testCrossOriginRedirectCarriesAuthorizationHeader() throws 
Exception {
+        // serverA (origin host:port) returns 307 Temporary Redirect to 
serverB (different port -> cross-origin).
+        serverA.stubFor(get(urlPathMatching(LOOKUP_PATH))
+                .willReturn(aResponse()
+                        .withStatus(307)
+                        .withHeader("Location",
+                                "http://127.0.0.1:"; + serverB.port() + 
LOOKUP_PATH)));
+
+        // serverB only responds 200 when the Authorization header is present 
with the expected token.
+        // Priorities: lower = higher priority; the specific-Authorization 
stub must be checked first.
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
+                .atPriority(2)
+                .willReturn(aResponse().withStatus(401).withBody("missing 
auth")));
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
+                .atPriority(1)
+                .withHeader("Authorization", equalTo("Bearer test-token"))
+                .willReturn(aResponse()
+                        .withStatus(200)
+                        .withHeader("Content-Type", "application/json")
+                        .withBody(EXPECTED_BODY)));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://127.0.0.1:"; + serverA.port());
+        conf.setAuthentication(AuthenticationFactory.token("test-token"));
+
+        try (HttpClient httpClient = new HttpClient(conf, eventLoopGroup)) {
+            LookupData result = httpClient.get(LOOKUP_PATH, LookupData.class)
+                    .get(30, TimeUnit.SECONDS);
+
+            assertNotNull(result, "Expected lookup payload after cross-origin 
redirect");
+            assertEquals(result.getBrokerUrl(), "pulsar://broker-b:6650");
+            assertEquals(result.getHttpUrl(), "http://broker-b:8080";);
+
+            // Lock the invariant: the final hop on serverB must carry the 
Authorization header.
+            serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH))
+                    .withHeader("Authorization", equalTo("Bearer 
test-token")));
+        }
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
new file mode 100644
index 00000000000..6ed7c921f06
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end check that {@link HttpLookupService#getBroker} carries the 
{@code Authorization} header
+ * across a cross-origin redirect — the production scenario where a broker 
returns
+ * {@code 307 Temporary Redirect} with a {@code Location} pointing at another 
broker's
+ * {@code httpUrl}/{@code httpUrlTls}.
+ */
+public class HttpLookupServiceTest {
+
+    private static final String TOPIC = 
"persistent://public/default/cross-origin-lookup-topic";
+    private static final String LOOKUP_PATH_REGEX =
+            
"/lookup/v2/topic/persistent/public/default/cross-origin-lookup-topic";
+    private static final String LOOKUP_BODY = 
"{\"brokerUrl\":\"pulsar://broker-b.example:6650\","
+            + "\"brokerUrlTls\":\"pulsar+ssl://broker-b.example:6651\","
+            + "\"httpUrl\":\"http://broker-b.example:8080\",";
+            + "\"httpUrlTls\":\"https://broker-b.example:8443\",";
+            + "\"nativeUrl\":\"pulsar://broker-b.example:6650\"}";
+
+    private WireMockServer serverA;
+    private WireMockServer serverB;
+    private EventLoopGroup eventLoopGroup;
+
+    @BeforeClass(alwaysRun = true)
+    void beforeClass() {
+        eventLoopGroup = new NioEventLoopGroup(1, new 
DefaultThreadFactory("HttpLookupServiceTest"));
+    }
+
+    @AfterClass(alwaysRun = true)
+    void afterClass() {
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully();
+        }
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    void beforeMethod() {
+        serverA = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverB = new 
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+        serverA.start();
+        serverB.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void afterMethod() {
+        if (serverA != null) {
+            serverA.stop();
+        }
+        if (serverB != null) {
+            serverB.stop();
+        }
+    }
+
+    @Test
+    public void testGetBrokerFollowsCrossOriginRedirect() throws Exception {
+        serverA.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+                .willReturn(aResponse()
+                        .withStatus(307)
+                        .withHeader("Location",
+                                "http://127.0.0.1:"; + serverB.port() + 
LOOKUP_PATH_REGEX)));
+
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+                .atPriority(2)
+                .willReturn(aResponse().withStatus(401).withBody("missing 
auth")));
+        serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+                .atPriority(1)
+                .withHeader("Authorization", equalTo("Bearer test-token"))
+                .willReturn(aResponse()
+                        .withStatus(200)
+                        .withHeader("Content-Type", "application/json")
+                        .withBody(LOOKUP_BODY)));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://127.0.0.1:"; + serverA.port());
+        conf.setAuthentication(AuthenticationFactory.token("test-token"));
+
+        HttpLookupService lookupService = new HttpLookupService(conf, 
eventLoopGroup);
+        try {
+            Pair<InetSocketAddress, InetSocketAddress> result = 
lookupService.getBroker(TopicName.get(TOPIC))
+                    .get(30, TimeUnit.SECONDS);
+
+            assertNotNull(result);
+            assertEquals(result.getLeft().getHostString(), "broker-b.example");
+            assertEquals(result.getLeft().getPort(), 6650);
+
+            serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH_REGEX))
+                    .withHeader("Authorization", equalTo("Bearer 
test-token")));
+        } finally {
+            lookupService.close();
+        }
+    }
+}

Reply via email to