This is an automated email from the ASF dual-hosted git repository.

Jason918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ed3af4cc8 [improve][client] Add SOCKS5 proxy support for PulsarAdmin 
and for PulsarClient HTTP lookups (#25575)
03ed3af4cc8 is described below

commit 03ed3af4cc8fb9c18a310245280e3dc2fcb18dd3
Author: xiaolong ran <[email protected]>
AuthorDate: Tue Apr 28 10:08:20 2026 +0800

    [improve][client] Add SOCKS5 proxy support for PulsarAdmin and for 
PulsarClient HTTP lookups (#25575)
    
    Signed-off-by: xiaolongran <[email protected]>
---
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  30 ++++
 .../admin/internal/PulsarAdminBuilderImpl.java     |  23 +++
 .../admin/internal/http/AsyncHttpConnector.java    |  43 +++++
 .../admin/internal/PulsarAdminBuilderImplTest.java |  82 +++++++++
 .../http/AsyncHttpConnectorSocks5Test.java         | 200 +++++++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |  17 +-
 .../apache/pulsar/client/api/Socks5ProxyScope.java |  74 ++++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      |   7 +
 .../org/apache/pulsar/client/impl/HttpClient.java  |  40 +++++
 .../client/impl/PulsarChannelInitializer.java      |   6 +-
 .../client/impl/conf/ClientConfigurationData.java  |  11 ++
 11 files changed, 531 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index c53e4416c00..a252ce8ae20 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin;
 
+import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -395,6 +396,35 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdminBuilder description(String description);
 
+    /**
+     * Set the SOCKS5 proxy address to be used by the Pulsar Admin client for 
outgoing HTTP(S)
+     * requests. When set, all admin traffic is tunneled through the given 
SOCKS5 proxy.
+     *
+     * @param socks5ProxyAddress the SOCKS5 proxy address (host + port), or 
{@code null} to disable
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder socks5ProxyAddress(InetSocketAddress 
socks5ProxyAddress);
+
+    /**
+     * Set the username used to authenticate against the SOCKS5 proxy 
configured via
+     * {@link #socks5ProxyAddress(InetSocketAddress)}. If the username is 
{@code null} or blank,
+     * no authentication will be performed against the proxy.
+     *
+     * @param socks5ProxyUsername the SOCKS5 proxy username
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder socks5ProxyUsername(String socks5ProxyUsername);
+
+    /**
+     * Set the password used to authenticate against the SOCKS5 proxy 
configured via
+     * {@link #socks5ProxyAddress(InetSocketAddress)}. Only used when a 
non-blank username has
+     * been configured.
+     *
+     * @param socks5ProxyPassword the SOCKS5 proxy password
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder socks5ProxyPassword(String socks5ProxyPassword);
+
     /**
      * Provide a set of shared client resources to be reused by this client.
      * <p>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index f94f6890736..46ebc1ec04e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +31,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
 import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -52,6 +54,9 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
     public PulsarAdminBuilderImpl() {
         this.conf = new ClientConfigurationData();
         this.conf.setConnectionsPerBroker(16);
+        // Admin traffic is HTTP-only; default the scope to HTTP_ONLY so that 
a configured
+        // SOCKS5 proxy is applied to HTTP requests without requiring an 
explicit scope call.
+        this.conf.setSocks5ProxyScope(Socks5ProxyScope.HTTP_ONLY);
     }
 
     private PulsarAdminBuilderImpl(ClientConfigurationData conf) {
@@ -297,6 +302,24 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
         return this;
     }
 
+    @Override
+    public PulsarAdminBuilder socks5ProxyAddress(InetSocketAddress 
socks5ProxyAddress) {
+        this.conf.setSocks5ProxyAddress(socks5ProxyAddress);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder socks5ProxyUsername(String socks5ProxyUsername) {
+        this.conf.setSocks5ProxyUsername(socks5ProxyUsername);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder socks5ProxyPassword(String socks5ProxyPassword) {
+        this.conf.setSocks5ProxyPassword(socks5ProxyPassword);
+        return this;
+    }
+
     @Override
     public PulsarAdminBuilder sharedResources(PulsarClientSharedResources 
sharedResources) {
         this.sharedResources = (PulsarClientSharedResourcesImpl) 
sharedResources;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 2835d6d4251..fe79ee51529 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -27,6 +27,7 @@ import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.PERMANE
 import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.SEE_OTHER_303;
 import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.TEMPORARY_REDIRECT_307;
 import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
+import com.google.common.annotations.VisibleForTesting;
 import com.spotify.futures.ConcurrencyReducer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
@@ -61,6 +62,7 @@ import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.Validate;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
@@ -84,10 +86,13 @@ import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Realm;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
 import org.asynchttpclient.SslEngineFactory;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
 import org.asynchttpclient.uri.Uri;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.client.ClientRequest;
@@ -239,6 +244,7 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
             }
         });
         
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
+        configureSocks5ProxyIfNeeded(confBuilder, conf);
     }
 
     protected AsyncHttpClient createAsyncHttpClient(AsyncHttpClientConfig 
asyncHttpClientConfig) {
@@ -572,6 +578,43 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         return "Pulsar-Admin";
     }
 
+    /**
+     * Configure SOCKS5 proxy for the underlying Netty-based async-http-client 
if
+     * {@link ClientConfigurationData#getSocks5ProxyAddress()} is set. The 
configuration keys
+     * (socks5ProxyAddress / socks5ProxyUsername / socks5ProxyPassword) are 
shared with the
+     * pulsar-client module so that admin and client behave consistently.
+     *
+     * <p>async-http-client's {@link ProxyServer} with {@link 
ProxyType#SOCKS_V5} is backed by
+     * Netty's {@code Socks5ProxyHandler}, which is injected into the channel 
pipeline when
+     * establishing a new connection.
+     */
+    @VisibleForTesting
+    static void 
configureSocks5ProxyIfNeeded(DefaultAsyncHttpClientConfig.Builder confBuilder,
+                                             ClientConfigurationData conf) {
+        if (conf == null) {
+            return;
+        }
+        InetSocketAddress socks5Address = conf.getSocks5ProxyAddress();
+        if (socks5Address == null) {
+            return;
+        }
+        if (!conf.getSocks5ProxyScope().appliesToHttp()) {
+            return;
+        }
+        ProxyServer.Builder proxyBuilder =
+                new ProxyServer.Builder(socks5Address.getHostString(), 
socks5Address.getPort())
+                        .setProxyType(ProxyType.SOCKS_V5);
+        String socks5Username = conf.getSocks5ProxyUsername();
+        if (StringUtils.isNotBlank(socks5Username)) {
+            Realm realm = new Realm.Builder(socks5Username, 
conf.getSocks5ProxyPassword())
+                    .setScheme(Realm.AuthScheme.BASIC)
+                    .build();
+            proxyBuilder.setRealm(realm);
+        }
+        confBuilder.setProxyServer(proxyBuilder.build());
+        log.info().attr("proxy", socks5Address).log("Pulsar admin client is 
using SOCKS5 proxy");
+    }
+
     @Override
     public void close() {
         try {
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index 259ebcb055a..7912353b3cb 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -44,6 +44,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyServerSelector;
+import org.asynchttpclient.proxy.ProxyType;
+import org.asynchttpclient.uri.Uri;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -293,6 +297,84 @@ public class PulsarAdminBuilderImplTest {
         }).isInstanceOf(IllegalArgumentException.class);
     }
 
+    /**
+     * Verifies that SOCKS5 proxy settings configured via the builder chain 
are propagated all the
+     * way down to the underlying async-http-client {@link ProxyServer}.
+     */
+    @Test
+    public void testSocks5ProxyAddressIsConfiguredOnHttpClient() throws 
PulsarClientException {
+        InetSocketAddress proxyAddress = 
InetSocketAddress.createUnresolved("127.0.0.1", 1080);
+
+        @Cleanup
+        PulsarAdminImpl admin = (PulsarAdminImpl) PulsarAdmin.builder()
+                .serviceHttpUrl("http://localhost:8080";)
+                .socks5ProxyAddress(proxyAddress)
+                .build();
+
+        ProxyServer proxyServer = resolveProxyServer(admin);
+        assertThat(proxyServer).isNotNull();
+        assertThat(proxyServer.getProxyType()).isEqualTo(ProxyType.SOCKS_V5);
+        assertThat(proxyServer.getHost()).isEqualTo("127.0.0.1");
+        assertThat(proxyServer.getPort()).isEqualTo(1080);
+        assertThat(proxyServer.getRealm()).isNull();
+    }
+
+    /**
+     * Verifies that SOCKS5 proxy credentials configured via the builder chain 
are propagated to
+     * the underlying async-http-client {@link ProxyServer} realm.
+     */
+    @Test
+    public void testSocks5ProxyWithCredentialsIsConfiguredOnHttpClient() 
throws PulsarClientException {
+        InetSocketAddress proxyAddress = 
InetSocketAddress.createUnresolved("proxy.example.com", 2080);
+
+        @Cleanup
+        PulsarAdminImpl admin = (PulsarAdminImpl) PulsarAdmin.builder()
+                .serviceHttpUrl("http://localhost:8080";)
+                .socks5ProxyAddress(proxyAddress)
+                .socks5ProxyUsername("alice")
+                .socks5ProxyPassword("s3cr3t")
+                .build();
+
+        ProxyServer proxyServer = resolveProxyServer(admin);
+        assertThat(proxyServer).isNotNull();
+        assertThat(proxyServer.getProxyType()).isEqualTo(ProxyType.SOCKS_V5);
+        assertThat(proxyServer.getHost()).isEqualTo("proxy.example.com");
+        assertThat(proxyServer.getPort()).isEqualTo(2080);
+        assertThat(proxyServer.getRealm()).isNotNull();
+        assertThat(proxyServer.getRealm().getPrincipal()).isEqualTo("alice");
+        assertThat(proxyServer.getRealm().getPassword()).isEqualTo("s3cr3t");
+    }
+
+    /**
+     * Verifies that when no SOCKS5 proxy address is configured, no proxy is 
installed on the
+     * underlying async-http-client.
+     */
+    @Test
+    public void testNoSocks5ProxyByDefault() throws PulsarClientException {
+        @Cleanup
+        PulsarAdminImpl admin = (PulsarAdminImpl) PulsarAdmin.builder()
+                .serviceHttpUrl("http://localhost:8080";)
+                .build();
+
+        ProxyServer proxyServer = resolveProxyServer(admin);
+        assertThat(proxyServer).isNull();
+    }
+
+    /**
+     * Resolves the {@link ProxyServer} configured on the underlying 
async-http-client. The
+     * async-http-client API exposes proxy configuration only through a
+     * {@link ProxyServerSelector}, so we invoke the selector with an 
arbitrary target URI to
+     * retrieve the effective {@link ProxyServer} (or {@code null} when no 
proxy is configured).
+     */
+    private static ProxyServer resolveProxyServer(PulsarAdminImpl admin) {
+        ProxyServerSelector selector =
+                
admin.getAsyncHttpConnector().getHttpClient().getConfig().getProxyServerSelector();
+        if (selector == null) {
+            return null;
+        }
+        return selector.select(Uri.create("http://localhost:8080";));
+    }
+
     private String secretAuthParams(String secret) {
         return String.format("{\"secret\":\"%s\"}", secret);
     }
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorSocks5Test.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorSocks5Test.java
new file mode 100644
index 00000000000..378d23ba2de
--- /dev/null
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorSocks5Test.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.net.InetSocketAddress;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Realm;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for the SOCKS5 proxy wiring added in {@link AsyncHttpConnector}.
+ */
+public class AsyncHttpConnectorSocks5Test {
+
+    /**
+     * When the configuration is {@code null}, the proxy-configuration helper 
must be a no-op.
+     */
+    @Test
+    public void testConfigureSocks5ProxyIfNeededWithNullConf() throws 
Exception {
+        DefaultAsyncHttpClientConfig.Builder builder = spy(new 
DefaultAsyncHttpClientConfig.Builder());
+
+        AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, null);
+
+        verify(builder, never()).setProxyServer(any(ProxyServer.class));
+        verify(builder, 
never()).setProxyServer(any(ProxyServer.Builder.class));
+    }
+
+    /**
+     * When {@code socks5ProxyAddress} is not set, no proxy should be 
configured on the builder.
+     */
+    @Test
+    public void testConfigureSocks5ProxyIfNeededWithoutAddress() throws 
Exception {
+        DefaultAsyncHttpClientConfig.Builder builder = spy(new 
DefaultAsyncHttpClientConfig.Builder());
+        ClientConfigurationData conf = newAdminConf();
+
+        AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+        verify(builder, never()).setProxyServer(any(ProxyServer.class));
+        verify(builder, 
never()).setProxyServer(any(ProxyServer.Builder.class));
+    }
+
+    /**
+     * When only the SOCKS5 address is provided, a {@link ProxyType#SOCKS_V5} 
proxy server
+     * should be configured without any authentication realm.
+     */
+    @Test
+    public void testConfigureSocks5ProxyIfNeededWithAddressOnly() throws 
Exception {
+        DefaultAsyncHttpClientConfig.Builder builder = spy(new 
DefaultAsyncHttpClientConfig.Builder());
+        ClientConfigurationData conf = newAdminConf();
+        InetSocketAddress socks5Address = 
InetSocketAddress.createUnresolved("127.0.0.1", 1080);
+        conf.setSocks5ProxyAddress(socks5Address);
+
+        AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+        ArgumentCaptor<ProxyServer> captor = 
ArgumentCaptor.forClass(ProxyServer.class);
+        verify(builder).setProxyServer(captor.capture());
+
+        ProxyServer proxyServer = captor.getValue();
+        assertNotNull(proxyServer, "ProxyServer must have been configured");
+        assertEquals(proxyServer.getProxyType(), ProxyType.SOCKS_V5);
+        assertEquals(proxyServer.getHost(), "127.0.0.1");
+        assertEquals(proxyServer.getPort(), 1080);
+        assertNull(proxyServer.getRealm(), "Realm must not be set when no 
username is provided");
+    }
+
+    /**
+     * When both address and credentials are provided, a {@link Realm} with 
BASIC scheme should be
+     * attached to the proxy server so that Netty's Socks5ProxyHandler 
performs username/password
+     * authentication.
+     */
+    @Test
+    public void testConfigureSocks5ProxyIfNeededWithCredentials() throws 
Exception {
+        DefaultAsyncHttpClientConfig.Builder builder = spy(new 
DefaultAsyncHttpClientConfig.Builder());
+        ClientConfigurationData conf = newAdminConf();
+        InetSocketAddress socks5Address = 
InetSocketAddress.createUnresolved("proxy.example.com", 2080);
+        conf.setSocks5ProxyAddress(socks5Address);
+        conf.setSocks5ProxyUsername("user1");
+        conf.setSocks5ProxyPassword("p@ssw0rd");
+
+        AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+        ArgumentCaptor<ProxyServer> captor = 
ArgumentCaptor.forClass(ProxyServer.class);
+        verify(builder).setProxyServer(captor.capture());
+
+        ProxyServer proxyServer = captor.getValue();
+        assertNotNull(proxyServer);
+        assertEquals(proxyServer.getProxyType(), ProxyType.SOCKS_V5);
+        assertEquals(proxyServer.getHost(), "proxy.example.com");
+        assertEquals(proxyServer.getPort(), 2080);
+
+        Realm realm = proxyServer.getRealm();
+        assertNotNull(realm, "Realm must be set when a username is provided");
+        assertEquals(realm.getPrincipal(), "user1");
+        assertEquals(realm.getPassword(), "p@ssw0rd");
+        assertEquals(realm.getScheme(), Realm.AuthScheme.BASIC);
+    }
+
+    /**
+     * When the configured {@link Socks5ProxyScope} does not cover HTTP 
traffic (e.g.
+     * {@link Socks5ProxyScope#BINARY_ONLY}), the helper must skip wiring the 
proxy on the
+     * async-http-client builder even if a SOCKS5 address is provided.
+     */
+    @Test
+    public void testConfigureSocks5ProxyIfNeededSkippedWhenScopeBinaryOnly() 
throws Exception {
+        DefaultAsyncHttpClientConfig.Builder builder = spy(new 
DefaultAsyncHttpClientConfig.Builder());
+        ClientConfigurationData conf = new ClientConfigurationData();
+        
conf.setSocks5ProxyAddress(InetSocketAddress.createUnresolved("127.0.0.1", 
1080));
+        // explicitly force BINARY_ONLY to verify the HTTP-scope guard
+        conf.setSocks5ProxyScope(Socks5ProxyScope.BINARY_ONLY);
+
+        AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+        verify(builder, never()).setProxyServer(any(ProxyServer.class));
+        verify(builder, 
never()).setProxyServer(any(ProxyServer.Builder.class));
+    }
+
+    /**
+     * A blank username must be treated the same as no credentials: no realm 
should be attached.
+     */
+    @Test
+    public void testConfigureSocks5ProxyIfNeededWithBlankUsername() throws 
Exception {
+        DefaultAsyncHttpClientConfig.Builder builder = spy(new 
DefaultAsyncHttpClientConfig.Builder());
+        ClientConfigurationData conf = newAdminConf();
+        
conf.setSocks5ProxyAddress(InetSocketAddress.createUnresolved("127.0.0.1", 
1080));
+        conf.setSocks5ProxyUsername("   ");
+        conf.setSocks5ProxyPassword("ignored");
+
+        AsyncHttpConnector.configureSocks5ProxyIfNeeded(builder, conf);
+
+        ArgumentCaptor<ProxyServer> captor = 
ArgumentCaptor.forClass(ProxyServer.class);
+        verify(builder).setProxyServer(captor.capture());
+
+        ProxyServer proxyServer = captor.getValue();
+        assertNotNull(proxyServer);
+        assertNull(proxyServer.getRealm(), "Realm must not be set when the 
username is blank");
+    }
+
+    /**
+     * End-to-end smoke test: building a real {@link AsyncHttpConnector} with 
SOCKS5 configured
+     * should not throw and the connector should expose a non-null http client 
that can be closed
+     * cleanly. The SOCKS5 handler is only wired into the Netty pipeline when 
an actual connection
+     * is attempted, so no real SOCKS5 server is required here.
+     */
+    @Test
+    public void testAsyncHttpConnectorConstructionWithSocks5() throws 
Exception {
+        ClientConfigurationData conf = newAdminConf();
+        conf.setServiceUrl("http://localhost:8080";);
+        conf.setAuthentication(new AuthenticationDisabled());
+        
conf.setSocks5ProxyAddress(InetSocketAddress.createUnresolved("127.0.0.1", 
1080));
+        conf.setSocks5ProxyUsername("admin");
+        conf.setSocks5ProxyPassword("admin");
+
+        AsyncHttpConnector connector = new AsyncHttpConnector(1000, 1000, 
1000, 60, conf, false, null);
+        try {
+            assertNotNull(connector.getHttpClient(), "AsyncHttpClient must be 
initialized");
+        } finally {
+            connector.close();
+        }
+    }
+
+    /**
+     * Build a {@link ClientConfigurationData} that matches what {@code 
PulsarAdminBuilderImpl}
+     * produces for admin clients. Admin traffic is HTTP-only, so the default 
SOCKS5 scope is
+     * {@link Socks5ProxyScope#HTTP_ONLY}.
+     */
+    private static ClientConfigurationData newAdminConf() {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setSocks5ProxyScope(Socks5ProxyScope.HTTP_ONLY);
+        return conf;
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 7ac063d227b..69770b8b00f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
-
 /**
  * Builder interface that is used to configure and construct a {@link 
PulsarClient} instance.
  *
@@ -723,6 +722,22 @@ public interface ClientBuilder extends Serializable, 
Cloneable {
      */
     ClientBuilder socks5ProxyPassword(String socks5ProxyPassword);
 
+    /**
+     * Set the scope that controls which connections are routed through the 
SOCKS5 proxy.
+     *
+     * <p>The default is {@link Socks5ProxyScope#BINARY_ONLY}, which preserves 
the pre-existing
+     * behavior where the SOCKS5 proxy only applied to Pulsar binary protocol 
connections to brokers.
+     * HTTP lookup and failover HTTP clients inside {@code PulsarClient} were 
not affected.
+     *
+     * <p>Set to {@link Socks5ProxyScope#HTTP_ONLY} or {@link 
Socks5ProxyScope#BOTH} to also route
+     * HTTP/HTTPS lookup traffic and failover HTTP clients through the SOCKS5 
proxy.
+     *
+     * @param socks5ProxyScope the scope selector; must not be {@code null}
+     * @return the client builder instance
+     * @see Socks5ProxyScope
+     */
+    ClientBuilder socks5ProxyScope(Socks5ProxyScope socks5ProxyScope);
+
     /**
      * Set the SSL Factory Plugin for custom implementation to create SSL 
Context and SSLEngine.
      * @param sslFactoryPlugin ssl factory class name
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Socks5ProxyScope.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Socks5ProxyScope.java
new file mode 100644
index 00000000000..46d032ab8bd
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Socks5ProxyScope.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Selector that controls which kinds of connections should be routed through 
the configured
+ * SOCKS5 proxy when a proxy address is set on a Pulsar client or admin 
builder.
+ *
+ * <p>Historically, the SOCKS5 proxy configured on {@code PulsarClient} only 
affected the Pulsar
+ * binary protocol connections to brokers, and the HTTP lookup / failover 
clients inside
+ * {@code PulsarClient} silently ignored it. The scope selector makes this 
behavior explicit and
+ * also allows users to route HTTP traffic (HTTP lookups, failover HTTP 
clients, and
+ * {@code PulsarAdmin} REST calls) through the same SOCKS5 proxy.
+ *
+ * <p>For {@code PulsarClient}, the default is {@link #BINARY_ONLY} so 
existing behavior is
+ * preserved. For {@code PulsarAdmin}, the default is {@link #HTTP_ONLY} 
because admin traffic is
+ * HTTP by nature.
+ */
[email protected]
[email protected]
+public enum Socks5ProxyScope {
+
+    /**
+     * Apply the SOCKS5 proxy only to Pulsar binary protocol connections to 
brokers. HTTP
+     * lookups and HTTP-based failover connections ignore the proxy. This 
matches the pre-existing
+     * behavior of {@code PulsarClient} and is the default for client builders.
+     */
+    BINARY_ONLY,
+
+    /**
+     * Apply the SOCKS5 proxy only to HTTP traffic, i.e. HTTP/HTTPS lookups, 
failover HTTP
+     * clients, and REST calls issued by {@code PulsarAdmin}. Pulsar binary 
protocol connections
+     * to brokers do not use the proxy.
+     */
+    HTTP_ONLY,
+
+    /**
+     * Apply the SOCKS5 proxy to both Pulsar binary protocol connections and 
HTTP traffic.
+     */
+    BOTH;
+
+    /**
+     * @return {@code true} if this scope routes Pulsar binary protocol 
connections through SOCKS5.
+     */
+    public boolean appliesToBinary() {
+        return this == BINARY_ONLY || this == BOTH;
+    }
+
+    /**
+     * @return {@code true} if this scope routes HTTP/HTTPS traffic through 
SOCKS5.
+     */
+    public boolean appliesToHttp() {
+        return this == HTTP_ONLY || this == BOTH;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 6432083c555..d2fe15bbc94 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticat
 import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -474,6 +475,12 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder socks5ProxyScope(Socks5ProxyScope socks5ProxyScope) {
+        conf.setSocks5ProxyScope(socks5ProxyScope);
+        return this;
+    }
+
     @Override
     public ClientBuilder sslFactoryPlugin(String sslFactoryPlugin) {
         if (StringUtils.isBlank(sslFactoryPlugin)) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 63e1fa62607..c12be520284 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -39,11 +39,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.CustomLog;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory;
@@ -55,10 +57,13 @@ import org.asynchttpclient.AsyncHttpClientConfig;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Realm;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
 import org.asynchttpclient.SslEngineFactory;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.proxy.ProxyServer;
+import org.asynchttpclient.proxy.ProxyType;
 
 
 @CustomLog
@@ -143,6 +148,7 @@ public class HttpClient implements Closeable {
         }
         confBuilder.setEventLoopGroup(eventLoopGroup);
         confBuilder.setNettyTimer(timer);
+        configureSocks5ProxyIfNeeded(confBuilder, conf);
         AsyncHttpClientConfig config = confBuilder.build();
         httpClient = new DefaultAsyncHttpClient(config);
 
@@ -382,4 +388,38 @@ public class HttpClient implements Closeable {
         }
     }
 
+    /**
+     * Configure SOCKS5 proxy on the async-http-client builder when the proxy 
address is set and
+     * the configured {@link Socks5ProxyScope} includes HTTP traffic.
+     *
+     * <p>The default scope for {@code PulsarClient} is {@link 
Socks5ProxyScope#BINARY_ONLY}, so
+     * HTTP lookups and failover HTTP clients will NOT use the proxy unless 
the caller explicitly
+     * sets the scope to {@link Socks5ProxyScope#HTTP_ONLY} or {@link 
Socks5ProxyScope#BOTH}.
+     */
+    private static void 
configureSocks5ProxyIfNeeded(DefaultAsyncHttpClientConfig.Builder confBuilder,
+                                                     ClientConfigurationData 
conf) {
+        if (conf == null) {
+            return;
+        }
+        InetSocketAddress socks5Address = conf.getSocks5ProxyAddress();
+        if (socks5Address == null) {
+            return;
+        }
+        if (!conf.getSocks5ProxyScope().appliesToHttp()) {
+            return;
+        }
+        ProxyServer.Builder proxyBuilder =
+                new ProxyServer.Builder(socks5Address.getHostString(), 
socks5Address.getPort())
+                        .setProxyType(ProxyType.SOCKS_V5);
+        String socks5Username = conf.getSocks5ProxyUsername();
+        if (StringUtils.isNotBlank(socks5Username)) {
+            Realm realm = new Realm.Builder(socks5Username, 
conf.getSocks5ProxyPassword())
+                    .setScheme(Realm.AuthScheme.BASIC)
+                    .build();
+            proxyBuilder.setRealm(realm);
+        }
+        confBuilder.setProxyServer(proxyBuilder.build());
+        log.info().attr("proxy", socks5Address).log("Pulsar client HTTP lookup 
is using SOCKS5 proxy");
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 20614f714ae..fdd672b587b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
 import lombok.CustomLog;
 import lombok.Getter;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
@@ -57,6 +58,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final InetSocketAddress socks5ProxyAddress;
     private final String socks5ProxyUsername;
     private final String socks5ProxyPassword;
+    private final Socks5ProxyScope socks5ProxyScope;
     private final ClientConfigurationData conf;
     private final Map<String, PulsarSslFactory> pulsarSslFactoryMap;
 
@@ -71,6 +73,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         this.socks5ProxyAddress = conf.getSocks5ProxyAddress();
         this.socks5ProxyUsername = conf.getSocks5ProxyUsername();
         this.socks5ProxyPassword = conf.getSocks5ProxyPassword();
+        this.socks5ProxyScope = conf.getSocks5ProxyScope();
         this.conf = conf.clone();
         if (tlsEnabled) {
             this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
@@ -154,7 +157,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
     CompletableFuture<Channel> initSocks5IfConfig(Channel ch) {
         CompletableFuture<Channel> initSocks5Future = new 
CompletableFuture<>();
-        if (socks5ProxyAddress != null) {
+        // Only apply SOCKS5 to the binary protocol path when the scope 
includes binary connections.
+        if (socks5ProxyAddress != null && socks5ProxyScope.appliesToBinary()) {
             ch.eventLoop().execute(() -> {
                 try {
                     Socks5ProxyHandler socks5ProxyHandler =
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index df6e01a73f5..73e5eb2e368 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -42,6 +42,7 @@ import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.api.Socks5ProxyScope;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.util.Secret;
 import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
@@ -427,6 +428,16 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     @Secret
     private String socks5ProxyPassword;
 
+    @ApiModelProperty(
+            name = "socks5ProxyScope",
+            value = "Selector that controls which connections go through the 
SOCKS5 proxy. "
+                    + "BINARY_ONLY (default for PulsarClient) only routes 
Pulsar binary protocol connections; "
+                    + "HTTP_ONLY only routes HTTP/HTTPS traffic (HTTP lookups, 
failover HTTP clients, admin REST); "
+                    + "BOTH routes both. This preserves backward compatibility 
with the pre-existing behavior "
+                    + "where the SOCKS5 proxy on PulsarClient only applied to 
the binary protocol."
+    )
+    private Socks5ProxyScope socks5ProxyScope = Socks5ProxyScope.BINARY_ONLY;
+
     @ApiModelProperty(
             name = "description",
             value = "The extra description of the client version. The length 
cannot exceed 64."


Reply via email to