This is an automated email from the ASF dual-hosted git repository.
Jason918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03ed3af4cc8 [improve][client] Add SOCKS5 proxy support for PulsarAdmin
and for PulsarClient HTTP lookups (#25575)
03ed3af4cc8 is described below
commit 03ed3af4cc8fb9c18a310245280e3dc2fcb18dd3
Author: xiaolong ran <[email protected]>
AuthorDate: Tue Apr 28 10:08:20 2026 +0800
[improve][client] Add SOCKS5 proxy support for PulsarAdmin and for
PulsarClient HTTP lookups (#25575)
Signed-off-by: xiaolongran <[email protected]>
---
.../pulsar/client/admin/PulsarAdminBuilder.java | 30 ++++
.../admin/internal/PulsarAdminBuilderImpl.java | 23 +++
.../admin/internal/http/AsyncHttpConnector.java | 43 +++++
.../admin/internal/PulsarAdminBuilderImplTest.java | 82 +++++++++
.../http/AsyncHttpConnectorSocks5Test.java | 200 +++++++++++++++++++++
.../apache/pulsar/client/api/ClientBuilder.java | 17 +-
.../apache/pulsar/client/api/Socks5ProxyScope.java | 74 ++++++++
.../pulsar/client/impl/ClientBuilderImpl.java | 7 +
.../org/apache/pulsar/client/impl/HttpClient.java | 40 +++++
.../client/impl/PulsarChannelInitializer.java | 6 +-
.../client/impl/conf/ClientConfigurationData.java | 11 ++
11 files changed, 531 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index c53e4416c00..a252ce8ae20 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -395,6 +396,35 @@ public interface PulsarAdminBuilder {
*/
PulsarAdminBuilder description(String description);
+ /**
+ * Set the SOCKS5 proxy address to be used by the Pulsar Admin client for
outgoing HTTP(S)
+ * requests. When set, all admin traffic is tunneled through the given
SOCKS5 proxy.
+ *
+ * @param socks5ProxyAddress the SOCKS5 proxy address (host + port), or
{@code null} to disable
+ * @return the admin builder instance
+ */
+ PulsarAdminBuilder socks5ProxyAddress(InetSocketAddress
socks5ProxyAddress);
+
+ /**
+ * Set the username used to authenticate against the SOCKS5 proxy
configured via
+ * {@link #socks5ProxyAddress(InetSocketAddress)}. If the username is
{@code null} or blank,
+ * no authentication will be performed against the proxy.
+ *
+ * @param socks5ProxyUsername the SOCKS5 proxy username
+ * @return the admin builder instance
+ */
+ PulsarAdminBuilder socks5ProxyUsername(String socks5ProxyUsername);
+
+ /**
+ * Set the password used to authenticate against the SOCKS5 proxy
configured via
+ * {@link #socks5ProxyAddress(InetSocketAddress)}. Only used when a
non-blank username has
+ * been configured.
+ *
+ * @param socks5ProxyPassword the SOCKS5 proxy password
+ * @return the admin builder instance
+ */
+ PulsarAdminBuilder socks5ProxyPassword(String socks5ProxyPassword);
+
/**
* Provide a set of shared client resources to be reused by this client.
* <p>
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index f94f6890736..46ebc1ec04e 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -30,6 +31,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -52,6 +54,9 @@ public class PulsarAdminBuilderImpl implements
PulsarAdminBuilder {
public PulsarAdminBuilderImpl() {
this.conf = new ClientConfigurationData();
this.conf.setConnectionsPerBroker(16);
+ // Admin traffic is HTTP-only; default the scope to HTTP_ONLY so that
a configured
+ // SOCKS5 proxy is applied to HTTP requests without requiring an
explicit scope call.
+ this.conf.setSocks5ProxyScope(Socks5ProxyScope.HTTP_ONLY);
}
private PulsarAdminBuilderImpl(ClientConfigurationData conf) {
@@ -297,6 +302,24 @@ public class PulsarAdminBuilderImpl implements
PulsarAdminBuilder {
return this;
}
+ @Override
+ public PulsarAdminBuilder socks5ProxyAddress(InetSocketAddress
socks5ProxyAddress) {
+ this.conf.setSocks5ProxyAddress(socks5ProxyAddress);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder socks5ProxyUsername(String socks5ProxyUsername) {
+ this.conf.setSocks5ProxyUsername(socks5ProxyUsername);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder socks5ProxyPassword(String socks5ProxyPassword) {
+ this.conf.setSocks5ProxyPassword(socks5ProxyPassword);
+ return this;
+ }
+
@Override
public PulsarAdminBuilder sharedResources(PulsarClientSharedResources
sharedResources) {
this.sharedResources = (PulsarClientSharedResourcesImpl)
sharedResources;
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 2835d6d4251..fe79ee51529 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -27,6 +27,7 @@ import static
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.PERMANE
import static
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.SEE_OTHER_303;
import static
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.TEMPORARY_REDIRECT_307;
import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
+import com.google.common.annotations.VisibleForTesting;
import com.spotify.futures.ConcurrencyReducer;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.DefaultHttpHeaders;
@@ -61,6 +62,7 @@ import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
@@ -84,10 +86,13 @@ import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Realm;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.SslEngineFactory;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
import org.asynchttpclient.uri.Uri;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
@@ -239,6 +244,7 @@ public class AsyncHttpConnector implements Connector,
AsyncHttpRequestExecutor {
}
});
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
+ configureSocks5ProxyIfNeeded(confBuilder, conf);
}
protected AsyncHttpClient createAsyncHttpClient(AsyncHttpClientConfig
asyncHttpClientConfig) {
@@ -572,6 +578,43 @@ public class AsyncHttpConnector implements Connector,
AsyncHttpRequestExecutor {
return "Pulsar-Admin";
}
+ /**
+ * Configure SOCKS5 proxy for the underlying Netty-based async-http-client
if
+ * {@link ClientConfigurationData#getSocks5ProxyAddress()} is set. The
configuration keys
+ * (socks5ProxyAddress / socks5ProxyUsername / socks5ProxyPassword) are
shared with the
+ * pulsar-client module so that admin and client behave consistently.
+ *
+ * <p>async-http-client's {@link ProxyServer} with {@link
ProxyType#SOCKS_V5} is backed by
+ * Netty's {@code Socks5ProxyHandler}, which is injected into the channel
pipeline when
+ * establishing a new connection.
+ */
+ @VisibleForTesting
+ static void
configureSocks5ProxyIfNeeded(DefaultAsyncHttpClientConfig.Builder confBuilder,
+ ClientConfigurationData conf) {
+ if (conf == null) {
+ return;
+ }
+ InetSocketAddress socks5Address = conf.getSocks5ProxyAddress();
+ if (socks5Address == null) {
+ return;
+ }
+ if (!conf.getSocks5ProxyScope().appliesToHttp()) {
+ return;
+ }
+ ProxyServer.Builder proxyBuilder =
+ new ProxyServer.Builder(socks5Address.getHostString(),
socks5Address.getPort())
+ .setProxyType(ProxyType.SOCKS_V5);
+ String socks5Username = conf.getSocks5ProxyUsername();
+ if (StringUtils.isNotBlank(socks5Username)) {
+ Realm realm = new Realm.Builder(socks5Username,
conf.getSocks5ProxyPassword())
+ .setScheme(Realm.AuthScheme.BASIC)
+ .build();
+ proxyBuilder.setRealm(realm);
+ }
+ confBuilder.setProxyServer(proxyBuilder.build());
+ log.info().attr("proxy", socks5Address).log("Pulsar admin client is
using SOCKS5 proxy");
+ }
+
@Override
public void close() {
try {
diff --git
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index 259ebcb055a..7912353b3cb 100644
---
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -44,6 +44,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientSharedResources;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyServerSelector;
+import org.asynchttpclient.proxy.ProxyType;
+import org.asynchttpclient.uri.Uri;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -293,6 +297,84 @@ public class PulsarAdminBuilderImplTest {
}).isInstanceOf(IllegalArgumentException.class);
}
+ /**
+ * Verifies that SOCKS5 proxy settings configured via the builder chain
are propagated all the
+ * way down to the underlying async-http-client {@link ProxyServer}.
+ */
+ @Test
+ public void testSocks5ProxyAddressIsConfiguredOnHttpClient() throws
PulsarClientException {
+ InetSocketAddress proxyAddress =
InetSocketAddress.createUnresolved("127.0.0.1", 1080);
+
+ @Cleanup
+ PulsarAdminImpl admin = (PulsarAdminImpl) PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:8080")
+ .socks5ProxyAddress(proxyAddress)
+ .build();
+
+ ProxyServer proxyServer = resolveProxyServer(admin);
+ assertThat(proxyServer).isNotNull();
+ assertThat(proxyServer.getProxyType()).isEqualTo(ProxyType.SOCKS_V5);
+ assertThat(proxyServer.getHost()).isEqualTo("127.0.0.1");
+ assertThat(proxyServer.getPort()).isEqualTo(1080);
+ assertThat(proxyServer.getRealm()).isNull();
+ }
+
+ /**
+ * Verifies that SOCKS5 proxy credentials configured via the builder chain
are propagated to
+ * the underlying async-http-client {@link ProxyServer} realm.
+ */
+ @Test
+ public void testSocks5ProxyWithCredentialsIsConfiguredOnHttpClient()
throws PulsarClientException {
+ InetSocketAddress proxyAddress =
InetSocketAddress.createUnresolved("proxy.example.com", 2080);
+
+ @Cleanup
+ PulsarAdminImpl admin = (PulsarAdminImpl) PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:8080")
+ .socks5ProxyAddress(proxyAddress)
+ .socks5ProxyUsername("alice")
+ .socks5ProxyPassword("s3cr3t")
+ .build();
+
+ ProxyServer proxyServer = resolveProxyServer(admin);
+ assertThat(proxyServer).isNotNull();
+ assertThat(proxyServer.getProxyType()).isEqualTo(ProxyType.SOCKS_V5);
+ assertThat(proxyServer.getHost()).isEqualTo("proxy.example.com");
+ assertThat(proxyServer.getPort()).isEqualTo(2080);
+ assertThat(proxyServer.getRealm()).isNotNull();
+ assertThat(proxyServer.getRealm().getPrincipal()).isEqualTo("alice");
+ assertThat(proxyServer.getRealm().getPassword()).isEqualTo("s3cr3t");
+ }
+
+ /**
+ * Verifies that when no SOCKS5 proxy address is configured, no proxy is
installed on the
+ * underlying async-http-client.
+ */
+ @Test
+ public void testNoSocks5ProxyByDefault() throws PulsarClientException {
+ @Cleanup
+ PulsarAdminImpl admin = (PulsarAdminImpl) PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:8080")
+ .build();
+
+ ProxyServer proxyServer = resolveProxyServer(admin);
+ assertThat(proxyServer).isNull();
+ }
+
+ /**
+ * Resolves the {@link ProxyServer} configured on the underlying
async-http-client. The
+ * async-http-client API exposes proxy configuration only through a
+ * {@link ProxyServerSelector}, so we invoke the selector with an
arbitrary target URI to
+ * retrieve the effective {@link ProxyServer} (or {@code null} when no
proxy is configured).
+ */
+ private static ProxyServer resolveProxyServer(PulsarAdminImpl admin) {
+ ProxyServerSelector selector =
+
admin.getAsyncHttpConnector().getHttpClient().getConfig().getProxyServerSelector();
+ if (selector == null) {
+ return null;
+ }
+ return selector.select(Uri.create("http://localhost:8080"));
+ }
+
private String secretAuthParams(String secret) {
return String.format("{\"secret\":\"%s\"}", secret);
}
diff --git
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorSocks5Test.java
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorSocks5Test.java
new file mode 100644
index 00000000000..378d23ba2de
--- /dev/null
+++
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorSocks5Test.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.net.InetSocketAddress;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Realm;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for the SOCKS5 proxy wiring added in {@link AsyncHttpConnector}.
+ */
+public class AsyncHttpConnectorSocks5Test {
+
+ /**
+ * When the configuration is {@code null}, the proxy-configuration helper
must be a no-op.
+ */
+ @Test
+ public void testConfigureSocks5ProxyIfNeededWithNullConf() throws
Exception {
+ DefaultAsyncHttpClientConfig.Builder builder = spy(new
DefaultAsyncHttpClientConfig.Builder());
+
+ AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, null);
+
+ verify(builder, never()).setProxyServer(any(ProxyServer.class));
+ verify(builder,
never()).setProxyServer(any(ProxyServer.Builder.class));
+ }
+
+ /**
+ * When {@code socks5ProxyAddress} is not set, no proxy should be
configured on the builder.
+ */
+ @Test
+ public void testConfigureSocks5ProxyIfNeededWithoutAddress() throws
Exception {
+ DefaultAsyncHttpClientConfig.Builder builder = spy(new
DefaultAsyncHttpClientConfig.Builder());
+ ClientConfigurationData conf = newAdminConf();
+
+ AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+ verify(builder, never()).setProxyServer(any(ProxyServer.class));
+ verify(builder,
never()).setProxyServer(any(ProxyServer.Builder.class));
+ }
+
+ /**
+ * When only the SOCKS5 address is provided, a {@link ProxyType#SOCKS_V5}
proxy server
+ * should be configured without any authentication realm.
+ */
+ @Test
+ public void testConfigureSocks5ProxyIfNeededWithAddressOnly() throws
Exception {
+ DefaultAsyncHttpClientConfig.Builder builder = spy(new
DefaultAsyncHttpClientConfig.Builder());
+ ClientConfigurationData conf = newAdminConf();
+ InetSocketAddress socks5Address =
InetSocketAddress.createUnresolved("127.0.0.1", 1080);
+ conf.setSocks5ProxyAddress(socks5Address);
+
+ AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+ ArgumentCaptor<ProxyServer> captor =
ArgumentCaptor.forClass(ProxyServer.class);
+ verify(builder).setProxyServer(captor.capture());
+
+ ProxyServer proxyServer = captor.getValue();
+ assertNotNull(proxyServer, "ProxyServer must have been configured");
+ assertEquals(proxyServer.getProxyType(), ProxyType.SOCKS_V5);
+ assertEquals(proxyServer.getHost(), "127.0.0.1");
+ assertEquals(proxyServer.getPort(), 1080);
+ assertNull(proxyServer.getRealm(), "Realm must not be set when no
username is provided");
+ }
+
+ /**
+ * When both address and credentials are provided, a {@link Realm} with
BASIC scheme should be
+ * attached to the proxy server so that Netty's Socks5ProxyHandler
performs username/password
+ * authentication.
+ */
+ @Test
+ public void testConfigureSocks5ProxyIfNeededWithCredentials() throws
Exception {
+ DefaultAsyncHttpClientConfig.Builder builder = spy(new
DefaultAsyncHttpClientConfig.Builder());
+ ClientConfigurationData conf = newAdminConf();
+ InetSocketAddress socks5Address =
InetSocketAddress.createUnresolved("proxy.example.com", 2080);
+ conf.setSocks5ProxyAddress(socks5Address);
+ conf.setSocks5ProxyUsername("user1");
+ conf.setSocks5ProxyPassword("p@ssw0rd");
+
+ AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+ ArgumentCaptor<ProxyServer> captor =
ArgumentCaptor.forClass(ProxyServer.class);
+ verify(builder).setProxyServer(captor.capture());
+
+ ProxyServer proxyServer = captor.getValue();
+ assertNotNull(proxyServer);
+ assertEquals(proxyServer.getProxyType(), ProxyType.SOCKS_V5);
+ assertEquals(proxyServer.getHost(), "proxy.example.com");
+ assertEquals(proxyServer.getPort(), 2080);
+
+ Realm realm = proxyServer.getRealm();
+ assertNotNull(realm, "Realm must be set when a username is provided");
+ assertEquals(realm.getPrincipal(), "user1");
+ assertEquals(realm.getPassword(), "p@ssw0rd");
+ assertEquals(realm.getScheme(), Realm.AuthScheme.BASIC);
+ }
+
+ /**
+ * When the configured {@link Socks5ProxyScope} does not cover HTTP
traffic (e.g.
+ * {@link Socks5ProxyScope#BINARY_ONLY}), the helper must skip wiring the
proxy on the
+ * async-http-client builder even if a SOCKS5 address is provided.
+ */
+ @Test
+ public void testConfigureSocks5ProxyIfNeededSkippedWhenScopeBinaryOnly()
throws Exception {
+ DefaultAsyncHttpClientConfig.Builder builder = spy(new
DefaultAsyncHttpClientConfig.Builder());
+ ClientConfigurationData conf = new ClientConfigurationData();
+
conf.setSocks5ProxyAddress(InetSocketAddress.createUnresolved("127.0.0.1",
1080));
+ // explicitly force BINARY_ONLY to verify the HTTP-scope guard
+ conf.setSocks5ProxyScope(Socks5ProxyScope.BINARY_ONLY);
+
+ AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+ verify(builder, never()).setProxyServer(any(ProxyServer.class));
+ verify(builder,
never()).setProxyServer(any(ProxyServer.Builder.class));
+ }
+
+ /**
+ * A blank username must be treated the same as no credentials: no realm
should be attached.
+ */
+ @Test
+ public void testConfigureSocks5ProxyIfNeededWithBlankUsername() throws
Exception {
+ DefaultAsyncHttpClientConfig.Builder builder = spy(new
DefaultAsyncHttpClientConfig.Builder());
+ ClientConfigurationData conf = newAdminConf();
+
conf.setSocks5ProxyAddress(InetSocketAddress.createUnresolved("127.0.0.1",
1080));
+ conf.setSocks5ProxyUsername(" ");
+ conf.setSocks5ProxyPassword("ignored");
+
+ AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+ ArgumentCaptor<ProxyServer> captor =
ArgumentCaptor.forClass(ProxyServer.class);
+ verify(builder).setProxyServer(captor.capture());
+
+ ProxyServer proxyServer = captor.getValue();
+ assertNotNull(proxyServer);
+ assertNull(proxyServer.getRealm(), "Realm must not be set when the
username is blank");
+ }
+
+ /**
+ * End-to-end smoke test: building a real {@link AsyncHttpConnector} with
SOCKS5 configured
+ * should not throw and the connector should expose a non-null http client
that can be closed
+ * cleanly. The SOCKS5 handler is only wired into the Netty pipeline when
an actual connection
+ * is attempted, so no real SOCKS5 server is required here.
+ */
+ @Test
+ public void testAsyncHttpConnectorConstructionWithSocks5() throws
Exception {
+ ClientConfigurationData conf = newAdminConf();
+ conf.setServiceUrl("http://localhost:8080");
+ conf.setAuthentication(new AuthenticationDisabled());
+
conf.setSocks5ProxyAddress(InetSocketAddress.createUnresolved("127.0.0.1",
1080));
+ conf.setSocks5ProxyUsername("admin");
+ conf.setSocks5ProxyPassword("admin");
+
+ AsyncHttpConnector connector = new AsyncHttpConnector(1000, 1000,
1000, 60, conf, false, null);
+ try {
+ assertNotNull(connector.getHttpClient(), "AsyncHttpClient must be
initialized");
+ } finally {
+ connector.close();
+ }
+ }
+
+ /**
+ * Build a {@link ClientConfigurationData} that matches what {@code
PulsarAdminBuilderImpl}
+ * produces for admin clients. Admin traffic is HTTP-only, so the default
SOCKS5 scope is
+ * {@link Socks5ProxyScope#HTTP_ONLY}.
+ */
+ private static ClientConfigurationData newAdminConf() {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setSocks5ProxyScope(Socks5ProxyScope.HTTP_ONLY);
+ return conf;
+ }
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 7ac063d227b..69770b8b00f 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
-
/**
* Builder interface that is used to configure and construct a {@link
PulsarClient} instance.
*
@@ -723,6 +722,22 @@ public interface ClientBuilder extends Serializable,
Cloneable {
*/
ClientBuilder socks5ProxyPassword(String socks5ProxyPassword);
+ /**
+ * Set the scope that controls which connections are routed through the
SOCKS5 proxy.
+ *
+ * <p>The default is {@link Socks5ProxyScope#BINARY_ONLY}, which preserves
the pre-existing
+ * behavior where the SOCKS5 proxy only applied to Pulsar binary protocol
connections to brokers.
+ * HTTP lookup and failover HTTP clients inside {@code PulsarClient} were
not affected.
+ *
+ * <p>Set to {@link Socks5ProxyScope#HTTP_ONLY} or {@link
Socks5ProxyScope#BOTH} to also route
+ * HTTP/HTTPS lookup traffic and failover HTTP clients through the SOCKS5
proxy.
+ *
+ * @param socks5ProxyScope the scope selector; must not be {@code null}
+ * @return the client builder instance
+ * @see Socks5ProxyScope
+ */
+ ClientBuilder socks5ProxyScope(Socks5ProxyScope socks5ProxyScope);
+
/**
* Set the SSL Factory Plugin for custom implementation to create SSL
Context and SSLEngine.
* @param sslFactoryPlugin ssl factory class name
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Socks5ProxyScope.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Socks5ProxyScope.java
new file mode 100644
index 00000000000..46d032ab8bd
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Socks5ProxyScope.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Selector that controls which kinds of connections should be routed through
the configured
+ * SOCKS5 proxy when a proxy address is set on a Pulsar client or admin
builder.
+ *
+ * <p>Historically, the SOCKS5 proxy configured on {@code PulsarClient} only
affected the Pulsar
+ * binary protocol connections to brokers, and the HTTP lookup / failover
clients inside
+ * {@code PulsarClient} silently ignored it. The scope selector makes this
behavior explicit and
+ * also allows users to route HTTP traffic (HTTP lookups, failover HTTP
clients, and
+ * {@code PulsarAdmin} REST calls) through the same SOCKS5 proxy.
+ *
+ * <p>For {@code PulsarClient}, the default is {@link #BINARY_ONLY} so
existing behavior is
+ * preserved. For {@code PulsarAdmin}, the default is {@link #HTTP_ONLY}
because admin traffic is
+ * HTTP by nature.
+ */
[email protected]
[email protected]
+public enum Socks5ProxyScope {
+
+ /**
+ * Apply the SOCKS5 proxy only to Pulsar binary protocol connections to
brokers. HTTP
+ * lookups and HTTP-based failover connections ignore the proxy. This
matches the pre-existing
+ * behavior of {@code PulsarClient} and is the default for client builders.
+ */
+ BINARY_ONLY,
+
+ /**
+ * Apply the SOCKS5 proxy only to HTTP traffic, i.e. HTTP/HTTPS lookups,
failover HTTP
+ * clients, and REST calls issued by {@code PulsarAdmin}. Pulsar binary
protocol connections
+ * to brokers do not use the proxy.
+ */
+ HTTP_ONLY,
+
+ /**
+ * Apply the SOCKS5 proxy to both Pulsar binary protocol connections and
HTTP traffic.
+ */
+ BOTH;
+
+ /**
+ * @return {@code true} if this scope routes Pulsar binary protocol
connections through SOCKS5.
+ */
+ public boolean appliesToBinary() {
+ return this == BINARY_ONLY || this == BOTH;
+ }
+
+ /**
+ * @return {@code true} if this scope routes HTTP/HTTPS traffic through
SOCKS5.
+ */
+ public boolean appliesToHttp() {
+ return this == HTTP_ONLY || this == BOTH;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 6432083c555..d2fe15bbc94 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -37,6 +37,7 @@ import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticat
import org.apache.pulsar.client.api.PulsarClientSharedResources;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -474,6 +475,12 @@ public class ClientBuilderImpl implements ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder socks5ProxyScope(Socks5ProxyScope socks5ProxyScope) {
+ conf.setSocks5ProxyScope(socks5ProxyScope);
+ return this;
+ }
+
@Override
public ClientBuilder sslFactoryPlugin(String sslFactoryPlugin) {
if (StringUtils.isBlank(sslFactoryPlugin)) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 63e1fa62607..c12be520284 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -39,11 +39,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory;
@@ -55,10 +57,13 @@ import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Realm;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.SslEngineFactory;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
@CustomLog
@@ -143,6 +148,7 @@ public class HttpClient implements Closeable {
}
confBuilder.setEventLoopGroup(eventLoopGroup);
confBuilder.setNettyTimer(timer);
+ configureSocks5ProxyIfNeeded(confBuilder, conf);
AsyncHttpClientConfig config = confBuilder.build();
httpClient = new DefaultAsyncHttpClient(config);
@@ -382,4 +388,38 @@ public class HttpClient implements Closeable {
}
}
+ /**
+ * Configure SOCKS5 proxy on the async-http-client builder when the proxy
address is set and
+ * the configured {@link Socks5ProxyScope} includes HTTP traffic.
+ *
+ * <p>The default scope for {@code PulsarClient} is {@link
Socks5ProxyScope#BINARY_ONLY}, so
+ * HTTP lookups and failover HTTP clients will NOT use the proxy unless
the caller explicitly
+ * sets the scope to {@link Socks5ProxyScope#HTTP_ONLY} or {@link
Socks5ProxyScope#BOTH}.
+ */
+ private static void
configureSocks5ProxyIfNeeded(DefaultAsyncHttpClientConfig.Builder confBuilder,
+ ClientConfigurationData
conf) {
+ if (conf == null) {
+ return;
+ }
+ InetSocketAddress socks5Address = conf.getSocks5ProxyAddress();
+ if (socks5Address == null) {
+ return;
+ }
+ if (!conf.getSocks5ProxyScope().appliesToHttp()) {
+ return;
+ }
+ ProxyServer.Builder proxyBuilder =
+ new ProxyServer.Builder(socks5Address.getHostString(),
socks5Address.getPort())
+ .setProxyType(ProxyType.SOCKS_V5);
+ String socks5Username = conf.getSocks5ProxyUsername();
+ if (StringUtils.isNotBlank(socks5Username)) {
+ Realm realm = new Realm.Builder(socks5Username,
conf.getSocks5ProxyPassword())
+ .setScheme(Realm.AuthScheme.BASIC)
+ .build();
+ proxyBuilder.setRealm(realm);
+ }
+ confBuilder.setProxyServer(proxyBuilder.build());
+ log.info().attr("proxy", socks5Address).log("Pulsar client HTTP lookup
is using SOCKS5 proxy");
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 20614f714ae..fdd672b587b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
import lombok.CustomLog;
import lombok.Getter;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
@@ -57,6 +58,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
private final InetSocketAddress socks5ProxyAddress;
private final String socks5ProxyUsername;
private final String socks5ProxyPassword;
+ private final Socks5ProxyScope socks5ProxyScope;
private final ClientConfigurationData conf;
private final Map<String, PulsarSslFactory> pulsarSslFactoryMap;
@@ -71,6 +73,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
this.socks5ProxyAddress = conf.getSocks5ProxyAddress();
this.socks5ProxyUsername = conf.getSocks5ProxyUsername();
this.socks5ProxyPassword = conf.getSocks5ProxyPassword();
+ this.socks5ProxyScope = conf.getSocks5ProxyScope();
this.conf = conf.clone();
if (tlsEnabled) {
this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
@@ -154,7 +157,8 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
CompletableFuture<Channel> initSocks5IfConfig(Channel ch) {
CompletableFuture<Channel> initSocks5Future = new
CompletableFuture<>();
- if (socks5ProxyAddress != null) {
+ // Only apply SOCKS5 to the binary protocol path when the scope
includes binary connections.
+ if (socks5ProxyAddress != null && socks5ProxyScope.appliesToBinary()) {
ch.eventLoop().execute(() -> {
try {
Socks5ProxyHandler socks5ProxyHandler =
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index df6e01a73f5..73e5eb2e368 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -42,6 +42,7 @@ import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.util.Secret;
import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
@@ -427,6 +428,16 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
@Secret
private String socks5ProxyPassword;
+ @ApiModelProperty(
+ name = "socks5ProxyScope",
+ value = "Selector that controls which connections go through the
SOCKS5 proxy. "
+ + "BINARY_ONLY (default for PulsarClient) only routes
Pulsar binary protocol connections; "
+ + "HTTP_ONLY only routes HTTP/HTTPS traffic (HTTP lookups,
failover HTTP clients, admin REST); "
+ + "BOTH routes both. This preserves backward compatibility
with the pre-existing behavior "
+ + "where the SOCKS5 proxy on PulsarClient only applied to
the binary protocol."
+ )
+ private Socks5ProxyScope socks5ProxyScope = Socks5ProxyScope.BINARY_ONLY;
+
@ApiModelProperty(
name = "description",
value = "The extra description of the client version. The length
cannot exceed 64."