This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ff90898b88eea16fe1b48207398a05efded355c8 Author: Lari Hotari <[email protected]> AuthorDate: Fri Apr 17 13:39:15 2026 +0300 [fix][sec] Upgrade to async-http-client 2.14.5 to address CVE-2026-40490 (#25546) (cherry picked from commit a1613bc2e5fd26cc16fc95b4a3c61bc5e1ae090d) --- distribution/server/src/assemble/LICENSE.bin.txt | 6 +- distribution/shell/src/assemble/LICENSE.bin.txt | 6 +- pom.xml | 2 +- .../internal/http/AsyncHttpConnectorTest.java | 56 ++++++++ pulsar-client/pom.xml | 6 + .../org/apache/pulsar/client/impl/HttpClient.java | 89 +++++++++--- .../apache/pulsar/client/impl/HttpClientTest.java | 150 +++++++++++++++++++++ .../pulsar/client/impl/HttpLookupServiceTest.java | 142 +++++++++++++++++++ 8 files changed, 435 insertions(+), 22 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 8cd3b8c3b6c..d73b2bf7092 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -392,8 +392,8 @@ The Apache Software License, Version 2.0 * AirCompressor - io.airlift-aircompressor-2.0.3.jar * AsyncHttpClient - - org.asynchttpclient-async-http-client-2.12.4.jar - - org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar + - org.asynchttpclient-async-http-client-2.14.5.jar + - org.asynchttpclient-async-http-client-netty-utils-2.14.5.jar * Jetty - org.eclipse.jetty-jetty-client-9.4.58.v20250814.jar - org.eclipse.jetty-jetty-continuation-9.4.58.v20250814.jar @@ -611,7 +611,7 @@ Eclipse Public License - v2.0 -- ../licenses/LICENSE-EPL-2.0.txt * Jakarta Injection -- org.glassfish.hk2.external-jakarta.inject-2.6.1.jar Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt - * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.3.jar + * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.4.jar Creative Commons Attribution License * Jcip -- ../licenses/LICENSE-jcip.txt diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index c4bfdf8a3de..e306bdced43 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -398,8 +398,8 @@ The Apache Software License, Version 2.0 * AirCompressor - aircompressor-2.0.3.jar * AsyncHttpClient - - async-http-client-2.12.4.jar - - async-http-client-netty-utils-2.12.4.jar + - async-http-client-2.14.5.jar + - async-http-client-netty-utils-2.14.5.jar * Jetty - jetty-client-9.4.58.v20250814.jar - jetty-http-9.4.58.v20250814.jar @@ -459,7 +459,7 @@ Eclipse Public License - v2.0 -- ../licenses/LICENSE-EPL-2.0.txt * Jakarta Injection -- jakarta.inject-2.6.1.jar Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt - * Reactive Streams -- reactive-streams-1.0.3.jar + * Reactive Streams -- reactive-streams-1.0.4.jar Creative Commons Attribution License * Jcip -- ../licenses/LICENSE-jcip.txt diff --git a/pom.xml b/pom.xml index 3a14a780d38..8940335744a 100644 --- a/pom.xml +++ b/pom.xml @@ -261,7 +261,7 @@ flexible messaging model and an intuitive client API.</description> <prometheus-jmx.version>0.16.1</prometheus-jmx.version> <confluent.version>7.9.2</confluent.version> <aircompressor.version>2.0.3</aircompressor.version> - <asynchttpclient.version>2.12.4</asynchttpclient.version> + <asynchttpclient.version>2.14.5</asynchttpclient.version> <commons-lang3.version>3.19.0</commons-lang3.version> <commons-io.version>2.21.0</commons-io.version> <commons-codec.version>1.20.0</commons-codec.version> diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java index e0b4b13a03d..38243976f56 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java @@ -19,7 +19,9 @@ package org.apache.pulsar.client.admin.internal.http; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static org.testng.Assert.assertEquals; @@ -282,6 +284,60 @@ public class AsyncHttpConnectorTest { assertEquals(response.getResponseBody(), "OK"); } + /** + * Locks in that AsyncHttpConnector forwards the {@code Authorization} header across a cross-origin + * HTTP redirect (different host:port — serverA → serverB). The admin connector disables AHC's + * built-in follow-redirect and runs its own redirect loop that copies the original request headers, + * so it must remain unaffected by the async-http-client >= 2.14.5 {@code Authorization} stripping + * on cross-origin redirects (CVE-2026-40490 fix). Regressing {@code setFollowRedirect(true)} would + * break this test. + */ + @Test + void testAuthorizationHeaderOnCrossOriginRedirect() throws ExecutionException, InterruptedException { + WireMockServer serverB = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0)); + serverB.start(); + try { + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .willReturn(aResponse() + .withStatus(307) + .withHeader("Location", + "http://127.0.0.1:" + serverB.port() + "/admin/v2/clusters"))); + + serverB.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .atPriority(2) + .willReturn(aResponse().withStatus(401).withBody("missing auth"))); + serverB.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .atPriority(1) + .withHeader("Authorization", equalTo("Bearer test-token")) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://127.0.0.1:" + server.port()); + + @Cleanup + AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, + 5000, 0, conf, false, null); + + Request request = new RequestBuilder("GET") + .setUrl("http://127.0.0.1:" + server.port() + "/admin/v2/clusters") + .addHeader("Authorization", "Bearer test-token") + .build(); + + Response response = connector.executeRequest(request).get(); + + assertEquals(response.getStatusCode(), 200, + "cross-origin redirect should forward Authorization and return the stubbed body"); + assertEquals(response.getResponseBody(), "[\"test-cluster\"]"); + serverB.verify(getRequestedFor(urlEqualTo("/admin/v2/clusters")) + .withHeader("Authorization", equalTo("Bearer test-token"))); + } finally { + serverB.stop(); + } + } + @Test void testRedirectWithBody() throws ExecutionException, InterruptedException { server.stubFor(post(urlEqualTo("/path1")) diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index aaab87c6e63..6c60e79d0e0 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -228,6 +228,12 @@ <artifactId>fastutil</artifactId> </dependency> + <dependency> + <groupId>com.github.tomakehurst</groupId> + <artifactId>wiremock-jre8-standalone</artifactId> + <version>${wiremock.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 72fcf82b859..6c53c067702 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -55,6 +55,7 @@ import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; import org.asynchttpclient.Request; +import org.asynchttpclient.Response; import org.asynchttpclient.SslEngineFactory; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; @@ -87,7 +88,12 @@ public class HttpClient implements Closeable { DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); confBuilder.setCookieStore(null); confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); + // Follow redirects manually in executeGet(...) so we can re-invoke authentication per hop and + // carry the Authorization header across cross-origin redirects. async-http-client >= 2.14.5 + // (CVE-2026-40490 fix) strips the Authorization header when it follows redirects itself; Pulsar + // HTTP lookups routinely redirect to another broker's httpUrl/httpUrlTls which is a different + // host/port, i.e. cross-origin. + confBuilder.setFollowRedirect(false); confBuilder.setMaxRedirects(conf.getMaxLookupRedirects()); confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); @@ -167,10 +173,28 @@ public class HttpClient implements Closeable { try { URI hostUri = serviceNameResolver.resolveHostUri(); String requestUrl = new URL(hostUri.toURL(), path).toString(); - String remoteHostName = hostUri.getHost(); + InetSocketAddress originalHost = InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()); + executeGet(requestUrl, originalHost, clientConf.getMaxLookupRedirects(), future, clazz); + } catch (Exception e) { + log.warn("[{}] Failed to initiate HTTP get request: {}", path, e.getMessage()); + if (e instanceof PulsarClientException) { + future.completeExceptionally(e); + } else { + future.completeExceptionally(new PulsarClientException(e)); + } + } + + return future; + } + + private <T> void executeGet(String requestUrl, InetSocketAddress originalHost, + int redirectsRemaining, CompletableFuture<T> future, Class<T> clazz) { + try { + URI currentUri = URI.create(requestUrl); + String remoteHostName = currentUri.getHost(); AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); - CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>(); + CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>(); // bring a authenticationStage for sasl auth. if (authData.hasDataForHttp()) { @@ -179,18 +203,15 @@ public class HttpClient implements Closeable { authFuture.complete(null); } - // auth complete, do real request authFuture.whenComplete((respHeaders, ex) -> { if (ex != null) { - serviceNameResolver.markHostAvailability( - InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), false); + serviceNameResolver.markHostAvailability(originalHost, false); log.warn("[{}] Failed to perform http request at authentication stage: {}", requestUrl, ex.getMessage()); future.completeExceptionally(new PulsarClientException(ex)); return; } - // auth complete, use a new builder BoundRequestBuilder builder = httpClient.prepareGet(requestUrl) // share the DNS resolver and cache with Pulsar client .setNameResolver(nameResolver) @@ -217,20 +238,25 @@ public class HttpClient implements Closeable { builder.execute().toCompletableFuture().whenComplete((response2, t) -> { if (t != null) { - serviceNameResolver.markHostAvailability( - InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), false); + serviceNameResolver.markHostAvailability(originalHost, false); log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage()); future.completeExceptionally(new PulsarClientException(t)); return; } - serviceNameResolver.markHostAvailability( - InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), true); + serviceNameResolver.markHostAvailability(originalHost, true); + + int statusCode = response2.getStatusCode(); + if (isRedirectStatusCode(statusCode)) { + handleRedirect(requestUrl, currentUri, response2, originalHost, + redirectsRemaining, future, clazz); + return; + } // request not success - if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) { + if (statusCode != HttpURLConnection.HTTP_OK) { log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText()); Exception e; - if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) { e = new NotFoundException("Not found: " + response2.getStatusText()); } else { e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText()); @@ -250,15 +276,48 @@ public class HttpClient implements Closeable { }); }); } catch (Exception e) { - log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage()); + log.warn("[{}] HTTP request setup failed: {}", requestUrl, e.getMessage()); if (e instanceof PulsarClientException) { future.completeExceptionally(e); } else { future.completeExceptionally(new PulsarClientException(e)); } } + } - return future; + private <T> void handleRedirect(String requestUrl, URI currentUri, + Response response, + InetSocketAddress originalHost, int redirectsRemaining, + CompletableFuture<T> future, Class<T> clazz) { + String location = response.getHeader("Location"); + if (location == null || location.isEmpty()) { + future.completeExceptionally(new PulsarClientException( + "HTTP redirect " + response.getStatusCode() + " without Location header: " + requestUrl)); + return; + } + if (redirectsRemaining <= 0) { + future.completeExceptionally(new PulsarClientException( + "Maximum redirects exceeded (" + clientConf.getMaxLookupRedirects() + + ") while following HTTP redirect for " + requestUrl)); + return; + } + String newUrl; + try { + newUrl = currentUri.resolve(location).toString(); + } catch (Exception e) { + future.completeExceptionally(new PulsarClientException( + "Invalid redirect Location \"" + location + "\" for " + requestUrl)); + return; + } + executeGet(newUrl, originalHost, redirectsRemaining - 1, future, clazz); + } + + private static boolean isRedirectStatusCode(int statusCode) { + return statusCode == HttpURLConnection.HTTP_MOVED_PERM // 301 + || statusCode == HttpURLConnection.HTTP_MOVED_TEMP // 302 + || statusCode == HttpURLConnection.HTTP_SEE_OTHER // 303 + || statusCode == 307 // Temporary Redirect + || statusCode == 308; // Permanent Redirect } protected PulsarSslConfiguration buildSslConfiguration(ClientConfigurationData config, String host) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java new file mode 100644 index 00000000000..ec3c8b0b78e --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpClientTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.resolver.NameResolver; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.net.InetAddress; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.lookup.data.LookupData; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Verifies that {@link HttpClient} carries the {@code Authorization} header across cross-origin HTTP redirects. + * + * <p>Pulsar's HTTP lookup endpoint returns {@code 307 Temporary Redirect} to whichever broker owns the bundle for a + * topic. The redirect target is that broker's {@code httpUrl}/{@code httpUrlTls}, i.e. typically a different host or + * port from the original request. Auth plugins ({@code AuthenticationToken}, {@code AuthenticationBasic}, + * {@code AuthenticationOAuth2}, {@code AuthenticationAthenz}) inject the {@code Authorization} header — that header + * must reach the redirect target for lookup to succeed. + * + * <p>async-http-client 2.14.5 strips {@code Authorization} on cross-origin redirects when its built-in follow-redirect + * is enabled (CVE-2026-40490 fix). This test drives two WireMock servers on different ports to exercise that path. + */ +public class HttpClientTest { + + private static final String LOOKUP_PATH = "/lookup/v2/topic/persistent/public/default/test-topic"; + private static final String EXPECTED_BODY = "{\"brokerUrl\":\"pulsar://broker-b:6650\"," + + "\"brokerUrlTls\":\"pulsar+ssl://broker-b:6651\"," + + "\"httpUrl\":\"http://broker-b:8080\"," + + "\"httpUrlTls\":\"https://broker-b:8443\"," + + "\"nativeUrl\":\"pulsar://broker-b:6650\"}"; + + private WireMockServer serverA; + private WireMockServer serverB; + private EventLoopGroup eventLoopGroup; + private Timer timer; + private NameResolver<InetAddress> nameResolver; + + @BeforeClass(alwaysRun = true) + void beforeClass() { + eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("HttpClientTest")); + timer = new HashedWheelTimer(new DefaultThreadFactory("HttpClientTest-timer")); + // Default JDK-backed resolver is sufficient; we only hit 127.0.0.1. + nameResolver = DnsResolverUtil.adaptToNameResolver(null); + } + + @AfterClass(alwaysRun = true) + void afterClass() { + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully(); + } + if (timer != null) { + timer.stop(); + } + } + + @BeforeMethod(alwaysRun = true) + void beforeMethod() { + serverA = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0)); + serverB = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0)); + serverA.start(); + serverB.start(); + } + + @AfterMethod(alwaysRun = true) + void afterMethod() { + if (serverA != null) { + serverA.stop(); + } + if (serverB != null) { + serverB.stop(); + } + } + + @Test + public void testCrossOriginRedirectCarriesAuthorizationHeader() throws Exception { + // serverA (origin host:port) returns 307 Temporary Redirect to serverB (different port -> cross-origin). + serverA.stubFor(get(urlPathMatching(LOOKUP_PATH)) + .willReturn(aResponse() + .withStatus(307) + .withHeader("Location", + "http://127.0.0.1:" + serverB.port() + LOOKUP_PATH))); + + // serverB only responds 200 when the Authorization header is present with the expected token. + // Priorities: lower = higher priority; the specific-Authorization stub must be checked first. + serverB.stubFor(get(urlPathMatching(LOOKUP_PATH)) + .atPriority(2) + .willReturn(aResponse().withStatus(401).withBody("missing auth"))); + serverB.stubFor(get(urlPathMatching(LOOKUP_PATH)) + .atPriority(1) + .withHeader("Authorization", equalTo("Bearer test-token")) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(EXPECTED_BODY))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://127.0.0.1:" + serverA.port()); + conf.setAuthentication(AuthenticationFactory.token("test-token")); + + try (HttpClient httpClient = new HttpClient(conf, eventLoopGroup, timer, nameResolver)) { + LookupData result = httpClient.get(LOOKUP_PATH, LookupData.class) + .get(30, TimeUnit.SECONDS); + + assertNotNull(result, "Expected lookup payload after cross-origin redirect"); + assertEquals(result.getBrokerUrl(), "pulsar://broker-b:6650"); + assertEquals(result.getHttpUrl(), "http://broker-b:8080"); + + // Lock the invariant: the final hop on serverB must carry the Authorization header. + serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH)) + .withHeader("Authorization", equalTo("Bearer test-token"))); + } + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java new file mode 100644 index 00000000000..f2d3751ecb0 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HttpLookupServiceTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * End-to-end check that {@link HttpLookupService#getBroker} carries the {@code Authorization} header + * across a cross-origin redirect — the production scenario where a broker returns + * {@code 307 Temporary Redirect} with a {@code Location} pointing at another broker's + * {@code httpUrl}/{@code httpUrlTls}. + */ +public class HttpLookupServiceTest { + + private static final String TOPIC = "persistent://public/default/cross-origin-lookup-topic"; + private static final String LOOKUP_PATH_REGEX = + "/lookup/v2/topic/persistent/public/default/cross-origin-lookup-topic"; + private static final String LOOKUP_BODY = "{\"brokerUrl\":\"pulsar://broker-b.example:6650\"," + + "\"brokerUrlTls\":\"pulsar+ssl://broker-b.example:6651\"," + + "\"httpUrl\":\"http://broker-b.example:8080\"," + + "\"httpUrlTls\":\"https://broker-b.example:8443\"," + + "\"nativeUrl\":\"pulsar://broker-b.example:6650\"}"; + + private WireMockServer serverA; + private WireMockServer serverB; + private EventLoopGroup eventLoopGroup; + private Timer timer; + + @BeforeClass(alwaysRun = true) + void beforeClass() { + eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("HttpLookupServiceTest")); + timer = new HashedWheelTimer(new DefaultThreadFactory("HttpLookupServiceTest-timer")); + } + + @AfterClass(alwaysRun = true) + void afterClass() { + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully(); + } + if (timer != null) { + timer.stop(); + } + } + + @BeforeMethod(alwaysRun = true) + void beforeMethod() { + serverA = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0)); + serverB = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0)); + serverA.start(); + serverB.start(); + } + + @AfterMethod(alwaysRun = true) + void afterMethod() { + if (serverA != null) { + serverA.stop(); + } + if (serverB != null) { + serverB.stop(); + } + } + + @Test + public void testGetBrokerFollowsCrossOriginRedirect() throws Exception { + serverA.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX)) + .willReturn(aResponse() + .withStatus(307) + .withHeader("Location", + "http://127.0.0.1:" + serverB.port() + LOOKUP_PATH_REGEX))); + + serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX)) + .atPriority(2) + .willReturn(aResponse().withStatus(401).withBody("missing auth"))); + serverB.stubFor(get(urlPathMatching(LOOKUP_PATH_REGEX)) + .atPriority(1) + .withHeader("Authorization", equalTo("Bearer test-token")) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(LOOKUP_BODY))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://127.0.0.1:" + serverA.port()); + conf.setAuthentication(AuthenticationFactory.token("test-token")); + + HttpLookupService lookupService = new HttpLookupService( + InstrumentProvider.NOOP, conf, eventLoopGroup, timer, DnsResolverUtil.adaptToNameResolver(null)); + try { + LookupTopicResult result = lookupService.getBroker(TopicName.get(TOPIC)) + .get(30, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(result.getLogicalAddress().getHostString(), "broker-b.example"); + assertEquals(result.getLogicalAddress().getPort(), 6650); + + serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH_REGEX)) + .withHeader("Authorization", equalTo("Bearer test-token"))); + } finally { + lookupService.close(); + } + } +}
