This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 5736b60085f [fix][sec] Upgrade to async-http-client 2.14.5 to address
CVE-2026-40490 (#25546)
5736b60085f is described below
commit 5736b60085f538b74c0236904d1c39959f35ce48
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 17 13:39:15 2026 +0300
[fix][sec] Upgrade to async-http-client 2.14.5 to address CVE-2026-40490
(#25546)
(cherry picked from commit a1613bc2e5fd26cc16fc95b4a3c61bc5e1ae090d)
---
distribution/server/src/assemble/LICENSE.bin.txt | 6 +-
distribution/shell/src/assemble/LICENSE.bin.txt | 6 +-
pom.xml | 2 +-
.../internal/http/AsyncHttpConnectorTest.java | 56 ++++++++
pulsar-client/pom.xml | 6 +
.../org/apache/pulsar/client/impl/HttpClient.java | 89 +++++++++---
.../apache/pulsar/client/impl/HttpClientTest.java | 150 +++++++++++++++++++++
.../pulsar/client/impl/HttpLookupServiceTest.java | 142 +++++++++++++++++++
8 files changed, 435 insertions(+), 22 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 4698da389ce..6645d0d5392 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -389,8 +389,8 @@ The Apache Software License, Version 2.0
* AirCompressor
- io.airlift-aircompressor-2.0.3.jar
* AsyncHttpClient
- - org.asynchttpclient-async-http-client-2.12.4.jar
- - org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
+ - org.asynchttpclient-async-http-client-2.14.5.jar
+ - org.asynchttpclient-async-http-client-netty-utils-2.14.5.jar
* Jetty
- org.eclipse.jetty-jetty-alpn-client-12.1.8.jar
- org.eclipse.jetty-jetty-alpn-conscrypt-server-12.1.8.jar
@@ -628,7 +628,7 @@ Eclipse Public License - v2.0 --
../licenses/LICENSE-EPL-2.0.txt
* Jakarta Transactions API --
jakarta.transaction-jakarta.transaction-api-1.3.3.jar
Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
- * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.3.jar
+ * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.4.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 1846799c302..b86a20eb9f3 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -398,8 +398,8 @@ The Apache Software License, Version 2.0
* AirCompressor
- aircompressor-2.0.3.jar
* AsyncHttpClient
- - async-http-client-2.12.4.jar
- - async-http-client-netty-utils-2.12.4.jar
+ - async-http-client-2.14.5.jar
+ - async-http-client-netty-utils-2.14.5.jar
* Jetty
- jetty-alpn-client-12.1.8.jar
- jetty-client-12.1.8.jar
@@ -463,7 +463,7 @@ Eclipse Public License - v2.0 --
../licenses/LICENSE-EPL-2.0.txt
* Jakarta Injection -- jakarta.inject-2.6.1.jar
Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
- * Reactive Streams -- reactive-streams-1.0.3.jar
+ * Reactive Streams -- reactive-streams-1.0.4.jar
diff --git a/pom.xml b/pom.xml
index 71368671658..c505d2e6f25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,7 +263,7 @@ flexible messaging model and an intuitive client
API.</description>
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
<confluent.version>8.1.1</confluent.version>
<aircompressor.version>2.0.3</aircompressor.version>
- <asynchttpclient.version>2.12.4</asynchttpclient.version>
+ <asynchttpclient.version>2.14.5</asynchttpclient.version>
<commons-lang3.version>3.19.0</commons-lang3.version>
<commons-io.version>2.21.0</commons-io.version>
<commons-codec.version>1.20.0</commons-codec.version>
diff --git
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
index e0b4b13a03d..38243976f56 100644
---
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
+++
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.admin.internal.http;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.testng.Assert.assertEquals;
@@ -282,6 +284,60 @@ public class AsyncHttpConnectorTest {
assertEquals(response.getResponseBody(), "OK");
}
+ /**
+ * Locks in that AsyncHttpConnector forwards the {@code Authorization}
header across a cross-origin
+ * HTTP redirect (different host:port — serverA → serverB). The admin
connector disables AHC's
+ * built-in follow-redirect and runs its own redirect loop that copies the
original request headers,
+ * so it must remain unaffected by the async-http-client >= 2.14.5
{@code Authorization} stripping
+ * on cross-origin redirects (CVE-2026-40490 fix). Regressing {@code
setFollowRedirect(true)} would
+ * break this test.
+ */
+ @Test
+ void testAuthorizationHeaderOnCrossOriginRedirect() throws
ExecutionException, InterruptedException {
+ WireMockServer serverB = new
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+ serverB.start();
+ try {
+ server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .willReturn(aResponse()
+ .withStatus(307)
+ .withHeader("Location",
+ "http://127.0.0.1:" + serverB.port() +
"/admin/v2/clusters")));
+
+ serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .atPriority(2)
+ .willReturn(aResponse().withStatus(401).withBody("missing
auth")));
+ serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .atPriority(1)
+ .withHeader("Authorization", equalTo("Bearer test-token"))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody("[\"test-cluster\"]")));
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("http://127.0.0.1:" + server.port());
+
+ @Cleanup
+ AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+ 5000, 0, conf, false, null);
+
+ Request request = new RequestBuilder("GET")
+ .setUrl("http://127.0.0.1:" + server.port() +
"/admin/v2/clusters")
+ .addHeader("Authorization", "Bearer test-token")
+ .build();
+
+ Response response = connector.executeRequest(request).get();
+
+ assertEquals(response.getStatusCode(), 200,
+ "cross-origin redirect should forward Authorization and
return the stubbed body");
+ assertEquals(response.getResponseBody(), "[\"test-cluster\"]");
+ serverB.verify(getRequestedFor(urlEqualTo("/admin/v2/clusters"))
+ .withHeader("Authorization", equalTo("Bearer
test-token")));
+ } finally {
+ serverB.stop();
+ }
+ }
+
@Test
void testRedirectWithBody() throws ExecutionException,
InterruptedException {
server.stubFor(post(urlEqualTo("/path1"))
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 94e2f21f85b..f6ccb1e1664 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -228,6 +228,12 @@
<artifactId>fastutil</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8-standalone</artifactId>
+ <version>${wiremock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 51b267699c2..e02748d4fec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -56,6 +56,7 @@ import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
import org.asynchttpclient.SslEngineFactory;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
@@ -88,7 +89,12 @@ public class HttpClient implements Closeable {
DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
- confBuilder.setFollowRedirect(true);
+ // Follow redirects manually in executeGet(...) so we can re-invoke
authentication per hop and
+ // carry the Authorization header across cross-origin redirects.
async-http-client >= 2.14.5
+ // (CVE-2026-40490 fix) strips the Authorization header when it
follows redirects itself; Pulsar
+ // HTTP lookups routinely redirect to another broker's
httpUrl/httpUrlTls which is a different
+ // host/port, i.e. cross-origin.
+ confBuilder.setFollowRedirect(false);
confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS *
1000);
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
@@ -168,10 +174,28 @@ public class HttpClient implements Closeable {
try {
URI hostUri = serviceNameResolver.resolveHostUri();
String requestUrl = new URL(hostUri.toURL(), path).toString();
- String remoteHostName = hostUri.getHost();
+ InetSocketAddress originalHost =
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort());
+ executeGet(requestUrl, originalHost,
clientConf.getMaxLookupRedirects(), future, clazz);
+ } catch (Exception e) {
+ log.warn("[{}] Failed to initiate HTTP get request: {}", path,
e.getMessage());
+ if (e instanceof PulsarClientException) {
+ future.completeExceptionally(e);
+ } else {
+ future.completeExceptionally(new PulsarClientException(e));
+ }
+ }
+
+ return future;
+ }
+
+ private <T> void executeGet(String requestUrl, InetSocketAddress
originalHost,
+ int redirectsRemaining, CompletableFuture<T>
future, Class<T> clazz) {
+ try {
+ URI currentUri = URI.create(requestUrl);
+ String remoteHostName = currentUri.getHost();
AuthenticationDataProvider authData =
authentication.getAuthData(remoteHostName);
- CompletableFuture<Map<String, String>> authFuture = new
CompletableFuture<>();
+ CompletableFuture<Map<String, String>> authFuture = new
CompletableFuture<>();
// bring a authenticationStage for sasl auth.
if (authData.hasDataForHttp()) {
@@ -180,18 +204,15 @@ public class HttpClient implements Closeable {
authFuture.complete(null);
}
- // auth complete, do real request
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
- serviceNameResolver.markHostAvailability(
-
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()),
false);
+ serviceNameResolver.markHostAvailability(originalHost,
false);
log.warn("[{}] Failed to perform http request at
authentication stage: {}",
requestUrl, ex.getMessage());
future.completeExceptionally(new
PulsarClientException(ex));
return;
}
- // auth complete, use a new builder
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
// share the DNS resolver and cache with Pulsar client
.setNameResolver(nameResolver)
@@ -218,17 +239,22 @@ public class HttpClient implements Closeable {
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
- serviceNameResolver.markHostAvailability(
-
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()),
false);
+ serviceNameResolver.markHostAvailability(originalHost,
false);
log.warn("[{}] Failed to perform http request: {}",
requestUrl, t.getMessage());
future.completeExceptionally(new
PulsarClientException(t));
return;
}
- serviceNameResolver.markHostAvailability(
-
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), true);
+ serviceNameResolver.markHostAvailability(originalHost,
true);
+
+ int statusCode = response2.getStatusCode();
+ if (isRedirectStatusCode(statusCode)) {
+ handleRedirect(requestUrl, currentUri, response2,
originalHost,
+ redirectsRemaining, future, clazz);
+ return;
+ }
// request not success
- if (response2.getStatusCode() !=
HttpURLConnection.HTTP_OK) {
+ if (statusCode != HttpURLConnection.HTTP_OK) {
String errorReason = response2.getStatusText();
if
("application/json".equals(response2.getContentType()) || "text/json".equals(
response2.getContentType())) {
@@ -250,7 +276,7 @@ public class HttpClient implements Closeable {
}
log.warn("[{}] HTTP get request failed: {}",
requestUrl, errorReason);
Exception e;
- if (response2.getStatusCode() ==
HttpURLConnection.HTTP_NOT_FOUND) {
+ if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) {
e = new NotFoundException("Not found: " +
errorReason);
} else {
e = new PulsarClientException("HTTP get request
failed: " + errorReason);
@@ -270,15 +296,48 @@ public class HttpClient implements Closeable {
});
});
} catch (Exception e) {
- log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
+ log.warn("[{}] HTTP request setup failed: {}", requestUrl,
e.getMessage());
if (e instanceof PulsarClientException) {
future.completeExceptionally(e);
} else {
future.completeExceptionally(new PulsarClientException(e));
}
}
+ }
- return future;
+ private <T> void handleRedirect(String requestUrl, URI currentUri,
+ Response response,
+ InetSocketAddress originalHost, int
redirectsRemaining,
+ CompletableFuture<T> future, Class<T>
clazz) {
+ String location = response.getHeader("Location");
+ if (location == null || location.isEmpty()) {
+ future.completeExceptionally(new PulsarClientException(
+ "HTTP redirect " + response.getStatusCode() + " without
Location header: " + requestUrl));
+ return;
+ }
+ if (redirectsRemaining <= 0) {
+ future.completeExceptionally(new PulsarClientException(
+ "Maximum redirects exceeded (" +
clientConf.getMaxLookupRedirects()
+ + ") while following HTTP redirect for " +
requestUrl));
+ return;
+ }
+ String newUrl;
+ try {
+ newUrl = currentUri.resolve(location).toString();
+ } catch (Exception e) {
+ future.completeExceptionally(new PulsarClientException(
+ "Invalid redirect Location \"" + location + "\" for " +
requestUrl));
+ return;
+ }
+ executeGet(newUrl, originalHost, redirectsRemaining - 1, future,
clazz);
+ }
+
+ private static boolean isRedirectStatusCode(int statusCode) {
+ return statusCode == HttpURLConnection.HTTP_MOVED_PERM // 301
+ || statusCode == HttpURLConnection.HTTP_MOVED_TEMP // 302
+ || statusCode == HttpURLConnection.HTTP_SEE_OTHER // 303
+ || statusCode == 307 //
Temporary Redirect
+ || statusCode == 308; //
Permanent Redirect
}
protected PulsarSslConfiguration
buildSslConfiguration(ClientConfigurationData config, String host)
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
new file mode 100644
index 00000000000..ec3c8b0b78e
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.resolver.NameResolver;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.lookup.data.LookupData;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies that {@link HttpClient} carries the {@code Authorization} header
across cross-origin HTTP redirects.
+ *
+ * <p>Pulsar's HTTP lookup endpoint returns {@code 307 Temporary Redirect} to
whichever broker owns the bundle for a
+ * topic. The redirect target is that broker's {@code httpUrl}/{@code
httpUrlTls}, i.e. typically a different host or
+ * port from the original request. Auth plugins ({@code AuthenticationToken},
{@code AuthenticationBasic},
+ * {@code AuthenticationOAuth2}, {@code AuthenticationAthenz}) inject the
{@code Authorization} header — that header
+ * must reach the redirect target for lookup to succeed.
+ *
+ * <p>async-http-client 2.14.5 strips {@code Authorization} on cross-origin
redirects when its built-in follow-redirect
+ * is enabled (CVE-2026-40490 fix). This test drives two WireMock servers on
different ports to exercise that path.
+ */
+public class HttpClientTest {
+
+ private static final String LOOKUP_PATH =
"/lookup/v2/topic/persistent/public/default/test-topic";
+ private static final String EXPECTED_BODY =
"{\"brokerUrl\":\"pulsar://broker-b:6650\","
+ + "\"brokerUrlTls\":\"pulsar+ssl://broker-b:6651\","
+ + "\"httpUrl\":\"http://broker-b:8080\","
+ + "\"httpUrlTls\":\"https://broker-b:8443\","
+ + "\"nativeUrl\":\"pulsar://broker-b:6650\"}";
+
+ private WireMockServer serverA;
+ private WireMockServer serverB;
+ private EventLoopGroup eventLoopGroup;
+ private Timer timer;
+ private NameResolver<InetAddress> nameResolver;
+
+ @BeforeClass(alwaysRun = true)
+ void beforeClass() {
+ eventLoopGroup = new NioEventLoopGroup(1, new
DefaultThreadFactory("HttpClientTest"));
+ timer = new HashedWheelTimer(new
DefaultThreadFactory("HttpClientTest-timer"));
+ // Default JDK-backed resolver is sufficient; we only hit 127.0.0.1.
+ nameResolver = DnsResolverUtil.adaptToNameResolver(null);
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully();
+ }
+ if (timer != null) {
+ timer.stop();
+ }
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ void beforeMethod() {
+ serverA = new
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+ serverB = new
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+ serverA.start();
+ serverB.start();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void afterMethod() {
+ if (serverA != null) {
+ serverA.stop();
+ }
+ if (serverB != null) {
+ serverB.stop();
+ }
+ }
+
+ @Test
+ public void testCrossOriginRedirectCarriesAuthorizationHeader() throws
Exception {
+ // serverA (origin host:port) returns 307 Temporary Redirect to
serverB (different port -> cross-origin).
+ serverA.stubFor(get(urlPathMatching(LOOKUP_PATH))
+ .willReturn(aResponse()
+ .withStatus(307)
+ .withHeader("Location",
+ "http://127.0.0.1:" + serverB.port() +
LOOKUP_PATH)));
+
+ // serverB only responds 200 when the Authorization header is present
with the expected token.
+ // Priorities: lower = higher priority; the specific-Authorization
stub must be checked first.
+ serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
+ .atPriority(2)
+ .willReturn(aResponse().withStatus(401).withBody("missing
auth")));
+ serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
+ .atPriority(1)
+ .withHeader("Authorization", equalTo("Bearer test-token"))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(EXPECTED_BODY)));
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("http://127.0.0.1:" + serverA.port());
+ conf.setAuthentication(AuthenticationFactory.token("test-token"));
+
+ try (HttpClient httpClient = new HttpClient(conf, eventLoopGroup,
timer, nameResolver)) {
+ LookupData result = httpClient.get(LOOKUP_PATH, LookupData.class)
+ .get(30, TimeUnit.SECONDS);
+
+ assertNotNull(result, "Expected lookup payload after cross-origin
redirect");
+ assertEquals(result.getBrokerUrl(), "pulsar://broker-b:6650");
+ assertEquals(result.getHttpUrl(), "http://broker-b:8080");
+
+ // Lock the invariant: the final hop on serverB must carry the
Authorization header.
+ serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH))
+ .withHeader("Authorization", equalTo("Bearer
test-token")));
+ }
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
new file mode 100644
index 00000000000..72f7953f16a
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end check that {@link HttpLookupService#getBroker} carries the
{@code Authorization} header
+ * across a cross-origin redirect — the production scenario where a broker
returns
+ * {@code 307 Temporary Redirect} with a {@code Location} pointing at another
broker's
+ * {@code httpUrl}/{@code httpUrlTls}.
+ */
+public class HttpLookupServiceTest {
+
+ private static final String TOPIC =
"persistent://public/default/cross-origin-lookup-topic";
+ private static final String LOOKUP_PATH_REGEX =
+
"/lookup/v2/topic/persistent/public/default/cross-origin-lookup-topic";
+ private static final String LOOKUP_BODY =
"{\"brokerUrl\":\"pulsar://broker-b.example:6650\","
+ + "\"brokerUrlTls\":\"pulsar+ssl://broker-b.example:6651\","
+ + "\"httpUrl\":\"http://broker-b.example:8080\","
+ + "\"httpUrlTls\":\"https://broker-b.example:8443\","
+ + "\"nativeUrl\":\"pulsar://broker-b.example:6650\"}";
+
+ private WireMockServer serverA;
+ private WireMockServer serverB;
+ private EventLoopGroup eventLoopGroup;
+ private Timer timer;
+
+ @BeforeClass(alwaysRun = true)
+ void beforeClass() {
+ eventLoopGroup = new NioEventLoopGroup(1, new
DefaultThreadFactory("HttpLookupServiceTest"));
+ timer = new HashedWheelTimer(new
DefaultThreadFactory("HttpLookupServiceTest-timer"));
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully();
+ }
+ if (timer != null) {
+ timer.stop();
+ }
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ void beforeMethod() {
+ serverA = new
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+ serverB = new
WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
+ serverA.start();
+ serverB.start();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void afterMethod() {
+ if (serverA != null) {
+ serverA.stop();
+ }
+ if (serverB != null) {
+ serverB.stop();
+ }
+ }
+
+ @Test
+ public void testGetBrokerFollowsCrossOriginRedirect() throws Exception {
+ serverA.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+ .willReturn(aResponse()
+ .withStatus(307)
+ .withHeader("Location",
+ "http://127.0.0.1:" + serverB.port() +
LOOKUP_PATH_REGEX)));
+
+ serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+ .atPriority(2)
+ .willReturn(aResponse().withStatus(401).withBody("missing
auth")));
+ serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX))
+ .atPriority(1)
+ .withHeader("Authorization", equalTo("Bearer test-token"))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(LOOKUP_BODY)));
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("http://127.0.0.1:" + serverA.port());
+ conf.setAuthentication(AuthenticationFactory.token("test-token"));
+
+ HttpLookupService lookupService = new HttpLookupService(
+ InstrumentProvider.NOOP, conf, eventLoopGroup, timer,
DnsResolverUtil.adaptToNameResolver(null));
+ try {
+ LookupTopicResult result =
lookupService.getBroker(TopicName.get(TOPIC), null)
+ .get(30, TimeUnit.SECONDS);
+
+ assertNotNull(result);
+ assertEquals(result.getLogicalAddress().getHostString(),
"broker-b.example");
+ assertEquals(result.getLogicalAddress().getPort(), 6650);
+
+ serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH_REGEX))
+ .withHeader("Authorization", equalTo("Bearer
test-token")));
+ } finally {
+ lookupService.close();
+ }
+ }
+}